程序优化

尽可能减少和避免shuffle

(1) map task中,将内存中的数据以文件形式写到磁盘中
(2) reduce task 中,通过网络I/O读取map task中溢写的文件,进行聚合,由于join操作前后分区策略不一致造成shuffle,数据量较少(一般低于3G)可以使用广播变量机制在同一个stage中完成join操作

未优化前

   val rddData1 = sc.parallelize(Array(("Alice", 15), ("Bob", 18), ("Thomas", 20), ("Catalina", 25)))
   val rddData2 = sc.parallelize(Array(("Alice", "Female"), ("Thomas", "Male"), ("Tom", "Male")))

   val rddData3 = rddData1.join(rddData2, 3)
   println(rddData3.collect.mkString(","))

优化后,避免shuffle

val data1 = Map(("Alice", 15), ("Bob", 18), ("Thomas", 20), ("Catalina", 25))
    val rddData2 = sc.parallelize(Array(("Alice", "Female"), ("Thomas", "Male"), ("Tom", "Male")))

    val rddData1Broadcast = sc.broadcast(data1)
    val rddData3 = rddData2.map(t => {
      val data1Map = rddData1Broadcast.value
      if(data1Map.contains(t._1)){
        (t._1, (data1Map(t._1), t._2))
      }else{
        null
      }
    }).filter(_ != null)
    println(rddData3.collect().mkString(","))
使用Kryo 作为序列化方案

在编程中,涉及到跨进程通信(例如节点之间数据传输),通信时传输数据必须进行序列化

程序中使用

(1)在RDD中的各类转换操作内引用外部变量必须能够序列化

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
class Person(_name: String) extends Serializable {
  def name = _name
  override def toString: String = _name
}
object Chapter10_1_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Chapter10_1_2")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Person]))

    val sc = new SparkContext(conf)

    val arrPerson = new ArrayBuffer[Person]()

    for(i <- 1 to 999999){
      arrPerson += new Person("姓名" + i)
    }

    val rddData1 = sc.parallelize(arrPerson, 2)
    rddData1.persist(StorageLevel.MEMORY_ONLY_SER)
    rddData1.collect()

    Thread.sleep(3600 * 1000)
  }
}

(2)自定义类型作为RDD泛型

   val visitorRDD = sc.parallelize[Person](
      Array(
        ("Bob", 15),
        ("Thomas", 28),
        ("Tom", 18),
        ("Galen", 35),
        ("Catalina", 12),
        ("Karen", 9),
        ("Boris", 20)),
      3)
对比原始序列化方案与调优方案

使用ObjectOutputStream,ObjectInputStream API 实现序列化与反序列化,spark默认采用这种方案,但是不高效,因此spark提供了基于Kryo方案


import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object Test{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Test")
      .set("spark.executor.memory", "6g")
      .set("spark.driver.memory", "6g")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Person]))
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(1 to 10)

    rdd.foreach(i => {
      //单条处理数据
    })

    rdd.foreachPartition(iterator => {
      for(i <- iterator){
        //批量处理数据
      }
    })

  }
}

尽可能批量操作数据

在spark中,无论是转换操作还是输出操作,遍历RDD,DataFrame,DataSet中每个元素时,尽量先按照分区遍历,然后批量处理当前分区中数据,用来提升处理效率,例如rdd.ForeachPartition代替rdd.foreach,用rdd.mapPartition代替 rdd.map ,用ForeachBatch Sink代替 Foreach Sink

但是直接面向分区操作,一次性将分区中所有数据加载到内存中,若分区数据量较大,则GC无法回收,会出现内存溢出

合理设置分区数

在spark输出数据时,每一个分区产生一个文件,若某些分区数据不存在,会产生空文件,在适当时机使用coalesce操作将多个分区合并,有利于文件管理

合理设置批处理间隔

在spark页面查看数据的total delay ,选择合适的批处理时间

数据优化

对于数据的分布不均,到处单个节点的算力不均衡,程序长时间运行,可能会拖垮节点,降低处理效率

自定义partitioner缓解数据倾斜

import org.apache.spark.util.Utils
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

import scala.util.Random


class UserPartitioner(partitions: Int) extends Partitioner{
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  val random = new Random()

  def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = key match {
    case null => nonNegativeMod((random.nextFloat() * 100000).toInt.hashCode, numPartitions)
    case _ => nonNegativeMod((key.toString + "_" + (random.nextFloat() * 100000).toInt).hashCode, numPartitions)
  }

  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }
}

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Test")
    val sc = new SparkContext(conf)

    val arr = Array(
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户B", "www.baidu.com"),
      ("用户B", "www.baidu.com")
    )

    val rddData = sc
      .parallelize(arr)
      .partitionBy(new UserPartitioner(2))

    val result = rddData.map(t => {
      (t._1, "http://" + t._2)
    })

    println(result.collect.mkString(","))

    Thread.sleep(3600 * 1000)
  }
}

数据补全

1.缓存维度数据在spark中(维度数据较少情况)
2.数据补全.先将数据补全,避免频繁和外部系统交互

资源优化

spark.driver.cores: 设置Driver进程数,只在cluster 模式下有效
spark.driver.memory: 设置Driver进程内存,建议通过脚本提交 --driver-memory方式配置

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐