Spark performanceoptimization and 调优

LearningSpark performanceoptimization策略, includingresourcemanagement, data倾斜processing, 序列化optimization, cache策略 and job调优etc. in 容

Spark performanceoptimization and 调优

1. performanceoptimizationoverview

Sparkperformanceoptimization is improvingSparkjob执行efficiency 关键环节. in large-scaledataprocessing场景in, 合理 optimization策略可以显著improvingjob 执行速度, 降 low resource消耗, improvingsystem 可scale性.

1.1 performanceoptimization 目标

  • improvingjob执行速度: reducingjob 总执行时间
  • 降 low resource消耗: reducingCPU, memory, disk and network using
  • improvingresource利用率: 充分利用clusterresource, 避免resource浪费
  • improvingsystem可scale性: supportprocessing更large-scale data

1.2 performanceoptimization 层次

Sparkperformanceoptimization可以分 for many 个层次:

  • resourcemanagementoptimization: 调整clusterresourceconfiguration, such asexecutor数量, memory big small , CPUcore数etc.
  • job级别optimization: 调整job configurationparameter, such asparallel度, 序列化方式, cache策略etc.
  • dataprocessingoptimization: optimizationdata store, partition, 序列化 and 传输
  • algorithmsoptimization: optimizationRDD转换operation, reducingdata shuffle and network传输

2. resourcemanagementoptimization

resourcemanagement is Sparkperformanceoptimization Basics, 合理 resourceconfiguration可以充分利用clusterresource, improvingjob执行efficiency.

2.1 Executor configuration

Executor is Sparkjob 执行单元, 每个Executorrun in cluster 一个node on , 负责执行task and storedata.

# 设置每个Executor memory big  small  (默认1g) 
--executor-memory 4g

# 设置每个Executor CPUcore数 (默认1) 
--executor-cores 4

# 设置Executor 数量
--num-executors 10

2.2 Driver configuration

Driver is Sparkjob 主process, 负责job scheduling and 协调.

# 设置Driver memory big  small  (默认1g) 
--driver-memory 2g

# 设置Driver CPUcore数 (仅 in cluster模式 under  has 效) 
--driver-cores 2

2.3 memoryconfiguration

Spark Executor memory分 for 三个部分:

  • storememory (Storage Memory) : 用于cacheRDDdata and 广播variable, 默认占Executormemory 60%
  • 执行memory (Execution Memory) : 用于执行task时 临时datastore, 默认占Executormemory 20%
  • othermemory (Other Memory) : 用于storeSpark in 部object and userdatastructure, 默认占Executormemory 20%
# 设置storememory比例 (默认0.6) 
--conf spark.store.memoryFraction=0.5

# 设置执行memory比例 (默认0.2) 
--conf spark.shuffle.memoryFraction=0.3

3. data倾斜processing

data倾斜 is Sparkjobincommon performance瓶颈, 指 is data分布不均匀, 导致某些task执行时间过 long , 拖 slow 整个job 执行速度.

3.1 data倾斜 表现

  • job执行时间过 long , 尤其 is in shuffle阶段
  • big 部分task很 fast completion, 但 few 数task执行时间很 long
  • Executormemory溢出, 特别 is in shuffleoperation after

3.2 data倾斜 原因

  • data本身分布不均匀, such as某些key data量远 big 于otherkey
  • 不当 shuffleoperation, such asgroupByKey, reduceByKeyetc.
  • dataclass型不一致, 导致相同逻辑key被识别 for 不同 物理key

3.3 data倾斜 解决method

3.3.1 增加parallel度

增加shuffleoperation parallel度, 可以将倾斜 data分散 to 更 many taskin:

# 设置shuffleoperation parallel度 (默认200) 
--conf spark.sql.shuffle.partitions=400

#  or  in codein设置
sc.setLocalProperty("spark.default.parallelism", "400")
3.3.2 using更 good shuffle算子

选择更合适 shuffle算子, such asusingreduceByKey代替groupByKey, 因 for reduceByKey会 in map端for局部aggregate, reducingshuffledata量:

// 不推荐: groupByKey会将所 has data发送 to reduce端
val grouped = rdd.groupByKey()

// 推荐: reduceByKey会 in map端for局部aggregate
val reduced = rdd.reduceByKey(_ + _)
3.3.3 data预processing

in data进入Spark之 before , for datafor预processing, 消除data倾斜:

  • filter掉exceptiondata
  • for 倾斜 keyfor拆分 or merge
  • using随机 before 缀 or after 缀分散倾斜 key
3.3.4 广播 big 表

for 于joinoperation, such as果其in一个表很 small , 可以将其广播 to 所 has Executor, 避免shuffleoperation:

import org.apache.spark.sql.functions.broadcast

// 广播 small 表, 避免shuffle
val joined = largeDF.join(broadcast(smallDF), Seq("key"))
3.3.5 自定义partition

using自定义partition器, 将倾斜 key分散 to 不同 partition:

import org.apache.spark.Partitioner

// 自定义partition器
class CustomPartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  
  override def getPartition(key: Any): Int = {
    val k = key.toString
    //  for specifickeyfor特殊processing
    if (k == "skewed_key") {
      // 将倾斜 key分散 to  many 个partition
      (k.hashCode & Int.MaxValue) % (numParts / 2)
    } else {
      (k.hashCode & Int.MaxValue) % numParts
    }
  }
}

// using自定义partition器
val partitionedRDD = rdd.partitionBy(new CustomPartitioner(100))

4. 序列化optimization

序列化 is Sparkjobin不可避免 operation, 尤其 is in datashuffle and network传输时. optimization序列化可以reducingdata big small , improving传输速度, 降 low CPU消耗.

4.1 序列化方式选择

Sparksupport两种主要 序列化方式:

4.1.1 Java序列化
  • 默认 序列化方式
  • support所 has Javaobject
  • 序列化速度较 slow , 序列化 after data较 big
4.1.2 Kryo序列化
  • 比Java序列化更 fast , 序列化 after data更 small (通常 is Java序列化 1/10)
  • 需要register需要序列化 class
  • 不support所 has Javaobject
# 启用Kryo序列化
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer

# register需要序列化 class
--conf spark.kryo.registrator=com.example.MyKryoRegistrator

# 设置Kryo序列化 缓冲区 big  small 
--conf spark.kryoserializer.buffer.max=128m

4.2 自定义Kryoregister器

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    // register自定义class
    kryo.register(classOf[com.example.User])
    kryo.register(classOf[scala.collection.mutable.HashMap[String, Int]])
    // register常用collectionclass型
    kryo.register(classOf[Array[Byte]])
    kryo.register(classOf[Array[String]])
  }
}

5. cache策略optimization

cache is Sparkperformanceoptimization important 手段, through将频繁using datacache in memoryin, 可以reducing重复计算, improvingjob执行efficiency.

5.1 cache级别选择

Sparkproviding了 many 种cache级别:

// 仅cache in memoryin, 默认级别
rdd.cache()
rdd.persist(StorageLevel.MEMORY_ONLY)

// cache in memoryin, such as果memory不足则序列化 to disk
rdd.persist(StorageLevel.MEMORY_AND_DISK)

// cache in memoryin, 并for序列化
rdd.persist(StorageLevel.MEMORY_ONLY_SER)

// cache in memory and diskin, 并for序列化
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

// cache in diskin
rdd.persist(StorageLevel.DISK_ONLY)

5.2 cache策略 best practices

  • 只cache频繁using data: 避免cache只using一次 data
  • 选择合适 cache级别: 根据data big small and using频率选择cache级别
  • 及时释放cache: 不再using data要及时释放cache
  • usingcachemonitor: throughSpark UImonitorcache usingcircumstances
// cachedata
val cachedRDD = rdd.persist(StorageLevel.MEMORY_AND_DISK)

//  many 次usingcache data
val result1 = cachedRDD.map(...).reduce(...)
val result2 = cachedRDD.filter(...).count()

// 不再using时释放cache
cachedRDD.unpersist()

6. job级别optimization

job级别optimization is 指调整job configurationparameter, improvingjob执行efficiency.

6.1 parallel度设置

parallel度 is 指同时执行 task数量, 合理 parallel度可以充分利用clusterresource.

# 设置默认parallel度
--conf spark.default.parallelism=400

# 设置SQL shuffleparallel度
--conf spark.sql.shuffle.partitions=400

6.2 Shuffle optimization

Shuffle is Sparkjobin最耗时 operation之一, optimizationShuffle可以显著improvingjob执行efficiency.

# 设置Shuffledata 压缩方式 (默认snappy) 
--conf spark.shuffle.compress=true
--conf spark.shuffle.spill.compress=true

# 设置Shufflefile cache in memoryin 百分比 (默认0.2) 
--conf spark.shuffle.memoryFraction=0.3

# 设置Shuffledata sort方式 (默认SORT) 
--conf spark.shuffle.sort.bypassMergeThreshold=200

# 设置Shuffle manager (默认SortShufflemanagementr) 
--conf spark.shuffle.manager=tungsten-sort

6.3 fault tolerancemechanismoptimization

Spark fault tolerancemechanism会生成 big 量 check点data, optimizationfault tolerancemechanism可以reducingdiskI/O.

# 启用推测执行, 当某个task执行缓 slow 时, 启动一个backuptask
--conf spark.speculation=true

# 设置check点Table of Contents
--conf spark.checkpoint.dir=hdfs://path/to/checkpoint

7. dataprocessingoptimization

dataprocessingoptimization is 指optimizationRDD转换operation, reducingdata shuffle and network传输.

7.1 避免不必要 转换

reducing不必要 RDD转换operation, merge many 个转换 for 一个, reducingin间结果 生成.

// 不推荐:  many 个独立 转换operation
val rdd1 = rdd.map(...)
val rdd2 = rdd1.filter(...)
val rdd3 = rdd2.map(...)

// 推荐: merge转换operation
val rdd3 = rdd.map(...).filter(...).map(...)

7.2 using窄依赖转换

窄依赖转换 (such asmap, filter, unionetc.) 不会产生shuffleoperation, 执行efficiency更 high ; 而宽依赖转换 (such asgroupByKey, reduceByKey, joinetc.) 会产生shuffleoperation, 执行efficiency较 low .

7.3 合理using广播variable

广播variable可以将 big object广播 to 所 has Executor, 避免 in task之间传递 big object, reducingnetwork传输.

// creation广播variable
val broadcastVar = sc.broadcast(largeMap)

//  in taskinusing广播variable
val result = rdd.map { x =>
  //  from 广播variablein获取data, 避免network传输
  val value = broadcastVar.value.getOrElse(x, 0)
  (x, value)
}

7.4 避免usingcollectoperation

collectoperation会将RDDin 所 has data收集 to Driver端, 可能导致Drivermemory溢出. for 于 big data集, 应usingtake, first or saveAsTextFileetc.operation.

// 不推荐:  for 于 big data集, 可能导致Drivermemory溢出
val allData = rdd.collect()

// 推荐: 只获取 before N条data
val first10 = rdd.take(10)

// 推荐: 将data保存 to filesystem
rdd.saveAsTextFile("hdfs://path/to/output")

8. Spark SQL optimization

Spark SQL is Sparkprocessingstructure化data module, 也 has 一些specific optimization策略.

8.1 data格式选择

选择合适 data格式可以improvingdata 读写efficiency:

  • Parquet: 列式store, support压缩, 适合 complex query
  • ORC: 列式store, support压缩 and index, 适合large-scaledata
  • Avro: 行式store, support模式演变, 适合data交换
// 读取Parquetfile
val df = spark.read.parquet("hdfs://path/to/parquet")

// 写入Parquetfile, 启用snappy压缩
df.write.format("parquet")
  .option("compression", "snappy")
  .save("hdfs://path/to/output")

8.2 谓词 under 推

谓词 under 推 is 指将filter条件 under 推 to datasources, reducingdata读取量.

// 谓词 under 推: filter条件会被 under 推 to datasources
val filtered = df.filter($"age" > 30).select($"name", $"age")

8.3 cacheDataFrame

for 于频繁using DataFrame, 可以将其cache, reducing重复计算.

// cacheDataFrame
df.cache()

// usingcache DataFrame
val result1 = df.filter(...)
val result2 = df.groupBy(...).agg(...)

9. monitor and 调优tool

monitor is performanceoptimization important 手段, throughmonitor可以Understandjob 执行circumstances, 找出performance瓶颈.

9.1 Spark UI

Spark UI is Spark in 置 monitortool, 可以查看job 执行circumstances, including:

  • job DAGgraph
  • task 执行时间
  • Executor resourceusingcircumstances
  • Shuffledata量

9.2 loganalysis

Spark会生成详细 log, throughanalysislog可以Understandjob 执行circumstances and errorinformation.

9.3 第三方monitortool

除了Spark UI out , 还 has 一些第三方monitortool可以用于Sparkmonitor:

  • Ganglia: distributedmonitorsystem, 可以monitorcluster resourceusingcircumstances
  • Prometheus: open-sourcemonitorsystem, support many 维度dataquery
  • Grafana: datavisualizationtool, 可以生成各种monitorgraph表

实践练习

练习1: resourceconfiguration调优

run一个Sparkjob, through调整Executor数量, memory big small and CPUcore数, 观察job执行时间 变化.

练习2: data倾斜processing

creation一个存 in data倾斜 RDD, 尝试using不同 data倾斜processingmethod, 比较processing before after job执行时间.

练习3: 序列化optimization

比较Java序列化 and Kryo序列化 performancediff, 观察序列化时间 and 序列化 after data big small .

练习4: cache策略optimization

run一个package含 many 个RDD转换operation job, 尝试using不同 cache级别, 比较job执行时间 变化.

练习5: Shuffleoptimization

run一个package含Shuffleoperation job, through调整Shuffleparallel度 and Shuffle manager, 观察job执行时间 变化.

10. summarized

本tutorial介绍了Sparkperformanceoptimization 各种策略, including:

  • resourcemanagementoptimization: 调整Executor and Driver configuration, 合理分配memoryresource
  • data倾斜processing: using增加parallel度, 更 good shuffle算子, data预processing, 广播 big 表 and 自定义partitionetc.method
  • 序列化optimization: usingKryo序列化, register常用class, reducing序列化时间 and data big small
  • cache策略optimization: 选择合适 cache级别, 及时释放cache
  • job级别optimization: 调整parallel度, Shuffleparameter and fault tolerancemechanism
  • dataprocessingoptimization: 避免不必要 转换, using窄依赖转换, 合理using广播variable
  • Spark SQLoptimization: 选择合适 data格式, using谓词 under 推, cacheDataFrame
  • monitor and 调优tool: usingSpark UI, loganalysis and 第三方monitortool

Sparkperformanceoptimization is a 持续 过程, 需要根据具体 job and data特点, 选择合适 optimization策略. throughcontinuously monitor and 调优, 可以显著improvingSparkjob 执行efficiency, 降 low resource消耗, improvingsystem 可scale性.