Spark performanceoptimization and 调优
1. performanceoptimizationoverview
Sparkperformanceoptimization is improvingSparkjob执行efficiency 关键环节. in large-scaledataprocessing场景in, 合理 optimization策略可以显著improvingjob 执行速度, 降 low resource消耗, improvingsystem 可scale性.
1.1 performanceoptimization 目标
- improvingjob执行速度: reducingjob 总执行时间
- 降 low resource消耗: reducingCPU, memory, disk and network using
- improvingresource利用率: 充分利用clusterresource, 避免resource浪费
- improvingsystem可scale性: supportprocessing更large-scale data
1.2 performanceoptimization 层次
Sparkperformanceoptimization可以分 for many 个层次:
- resourcemanagementoptimization: 调整clusterresourceconfiguration, such asexecutor数量, memory big small , CPUcore数etc.
- job级别optimization: 调整job configurationparameter, such asparallel度, 序列化方式, cache策略etc.
- dataprocessingoptimization: optimizationdata store, partition, 序列化 and 传输
- algorithmsoptimization: optimizationRDD转换operation, reducingdata shuffle and network传输
2. resourcemanagementoptimization
resourcemanagement is Sparkperformanceoptimization Basics, 合理 resourceconfiguration可以充分利用clusterresource, improvingjob执行efficiency.
2.1 Executor configuration
Executor is Sparkjob 执行单元, 每个Executorrun in cluster 一个node on , 负责执行task and storedata.
# 设置每个Executor memory big small (默认1g) --executor-memory 4g # 设置每个Executor CPUcore数 (默认1) --executor-cores 4 # 设置Executor 数量 --num-executors 10
2.2 Driver configuration
Driver is Sparkjob 主process, 负责job scheduling and 协调.
# 设置Driver memory big small (默认1g) --driver-memory 2g # 设置Driver CPUcore数 (仅 in cluster模式 under has 效) --driver-cores 2
2.3 memoryconfiguration
Spark Executor memory分 for 三个部分:
- storememory (Storage Memory) : 用于cacheRDDdata and 广播variable, 默认占Executormemory 60%
- 执行memory (Execution Memory) : 用于执行task时 临时datastore, 默认占Executormemory 20%
- othermemory (Other Memory) : 用于storeSpark in 部object and userdatastructure, 默认占Executormemory 20%
# 设置storememory比例 (默认0.6) --conf spark.store.memoryFraction=0.5 # 设置执行memory比例 (默认0.2) --conf spark.shuffle.memoryFraction=0.3
3. data倾斜processing
data倾斜 is Sparkjobincommon performance瓶颈, 指 is data分布不均匀, 导致某些task执行时间过 long , 拖 slow 整个job 执行速度.
3.1 data倾斜 表现
- job执行时间过 long , 尤其 is in shuffle阶段
- big 部分task很 fast completion, 但 few 数task执行时间很 long
- Executormemory溢出, 特别 is in shuffleoperation after
3.2 data倾斜 原因
- data本身分布不均匀, such as某些key data量远 big 于otherkey
- 不当 shuffleoperation, such asgroupByKey, reduceByKeyetc.
- dataclass型不一致, 导致相同逻辑key被识别 for 不同 物理key
3.3 data倾斜 解决method
3.3.1 增加parallel度
增加shuffleoperation parallel度, 可以将倾斜 data分散 to 更 many taskin:
# 设置shuffleoperation parallel度 (默认200)
--conf spark.sql.shuffle.partitions=400
# or in codein设置
sc.setLocalProperty("spark.default.parallelism", "400")
3.3.2 using更 good shuffle算子
选择更合适 shuffle算子, such asusingreduceByKey代替groupByKey, 因 for reduceByKey会 in map端for局部aggregate, reducingshuffledata量:
// 不推荐: groupByKey会将所 has data发送 to reduce端 val grouped = rdd.groupByKey() // 推荐: reduceByKey会 in map端for局部aggregate val reduced = rdd.reduceByKey(_ + _)
3.3.3 data预processing
in data进入Spark之 before , for datafor预processing, 消除data倾斜:
- filter掉exceptiondata
- for 倾斜 keyfor拆分 or merge
- using随机 before 缀 or after 缀分散倾斜 key
3.3.4 广播 big 表
for 于joinoperation, such as果其in一个表很 small , 可以将其广播 to 所 has Executor, 避免shuffleoperation:
import org.apache.spark.sql.functions.broadcast
// 广播 small 表, 避免shuffle
val joined = largeDF.join(broadcast(smallDF), Seq("key"))
3.3.5 自定义partition
using自定义partition器, 将倾斜 key分散 to 不同 partition:
import org.apache.spark.Partitioner
// 自定义partition器
class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val k = key.toString
// for specifickeyfor特殊processing
if (k == "skewed_key") {
// 将倾斜 key分散 to many 个partition
(k.hashCode & Int.MaxValue) % (numParts / 2)
} else {
(k.hashCode & Int.MaxValue) % numParts
}
}
}
// using自定义partition器
val partitionedRDD = rdd.partitionBy(new CustomPartitioner(100))
4. 序列化optimization
序列化 is Sparkjobin不可避免 operation, 尤其 is in datashuffle and network传输时. optimization序列化可以reducingdata big small , improving传输速度, 降 low CPU消耗.
4.1 序列化方式选择
Sparksupport两种主要 序列化方式:
4.1.1 Java序列化
- 默认 序列化方式
- support所 has Javaobject
- 序列化速度较 slow , 序列化 after data较 big
4.1.2 Kryo序列化
- 比Java序列化更 fast , 序列化 after data更 small (通常 is Java序列化 1/10)
- 需要register需要序列化 class
- 不support所 has Javaobject
# 启用Kryo序列化 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer # register需要序列化 class --conf spark.kryo.registrator=com.example.MyKryoRegistrator # 设置Kryo序列化 缓冲区 big small --conf spark.kryoserializer.buffer.max=128m
4.2 自定义Kryoregister器
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
// register自定义class
kryo.register(classOf[com.example.User])
kryo.register(classOf[scala.collection.mutable.HashMap[String, Int]])
// register常用collectionclass型
kryo.register(classOf[Array[Byte]])
kryo.register(classOf[Array[String]])
}
}
5. cache策略optimization
cache is Sparkperformanceoptimization important 手段, through将频繁using datacache in memoryin, 可以reducing重复计算, improvingjob执行efficiency.
5.1 cache级别选择
Sparkproviding了 many 种cache级别:
// 仅cache in memoryin, 默认级别 rdd.cache() rdd.persist(StorageLevel.MEMORY_ONLY) // cache in memoryin, such as果memory不足则序列化 to disk rdd.persist(StorageLevel.MEMORY_AND_DISK) // cache in memoryin, 并for序列化 rdd.persist(StorageLevel.MEMORY_ONLY_SER) // cache in memory and diskin, 并for序列化 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) // cache in diskin rdd.persist(StorageLevel.DISK_ONLY)
5.2 cache策略 best practices
- 只cache频繁using data: 避免cache只using一次 data
- 选择合适 cache级别: 根据data big small and using频率选择cache级别
- 及时释放cache: 不再using data要及时释放cache
- usingcachemonitor: throughSpark UImonitorcache usingcircumstances
// cachedata val cachedRDD = rdd.persist(StorageLevel.MEMORY_AND_DISK) // many 次usingcache data val result1 = cachedRDD.map(...).reduce(...) val result2 = cachedRDD.filter(...).count() // 不再using时释放cache cachedRDD.unpersist()
6. job级别optimization
job级别optimization is 指调整job configurationparameter, improvingjob执行efficiency.
6.1 parallel度设置
parallel度 is 指同时执行 task数量, 合理 parallel度可以充分利用clusterresource.
# 设置默认parallel度 --conf spark.default.parallelism=400 # 设置SQL shuffleparallel度 --conf spark.sql.shuffle.partitions=400
6.2 Shuffle optimization
Shuffle is Sparkjobin最耗时 operation之一, optimizationShuffle可以显著improvingjob执行efficiency.
# 设置Shuffledata 压缩方式 (默认snappy) --conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true # 设置Shufflefile cache in memoryin 百分比 (默认0.2) --conf spark.shuffle.memoryFraction=0.3 # 设置Shuffledata sort方式 (默认SORT) --conf spark.shuffle.sort.bypassMergeThreshold=200 # 设置Shuffle manager (默认SortShufflemanagementr) --conf spark.shuffle.manager=tungsten-sort
6.3 fault tolerancemechanismoptimization
Spark fault tolerancemechanism会生成 big 量 check点data, optimizationfault tolerancemechanism可以reducingdiskI/O.
# 启用推测执行, 当某个task执行缓 slow 时, 启动一个backuptask --conf spark.speculation=true # 设置check点Table of Contents --conf spark.checkpoint.dir=hdfs://path/to/checkpoint
7. dataprocessingoptimization
dataprocessingoptimization is 指optimizationRDD转换operation, reducingdata shuffle and network传输.
7.1 避免不必要 转换
reducing不必要 RDD转换operation, merge many 个转换 for 一个, reducingin间结果 生成.
// 不推荐: many 个独立 转换operation val rdd1 = rdd.map(...) val rdd2 = rdd1.filter(...) val rdd3 = rdd2.map(...) // 推荐: merge转换operation val rdd3 = rdd.map(...).filter(...).map(...)
7.2 using窄依赖转换
窄依赖转换 (such asmap, filter, unionetc.) 不会产生shuffleoperation, 执行efficiency更 high ; 而宽依赖转换 (such asgroupByKey, reduceByKey, joinetc.) 会产生shuffleoperation, 执行efficiency较 low .
7.3 合理using广播variable
广播variable可以将 big object广播 to 所 has Executor, 避免 in task之间传递 big object, reducingnetwork传输.
// creation广播variable
val broadcastVar = sc.broadcast(largeMap)
// in taskinusing广播variable
val result = rdd.map { x =>
// from 广播variablein获取data, 避免network传输
val value = broadcastVar.value.getOrElse(x, 0)
(x, value)
}
7.4 避免usingcollectoperation
collectoperation会将RDDin 所 has data收集 to Driver端, 可能导致Drivermemory溢出. for 于 big data集, 应usingtake, first or saveAsTextFileetc.operation.
// 不推荐: for 于 big data集, 可能导致Drivermemory溢出
val allData = rdd.collect()
// 推荐: 只获取 before N条data
val first10 = rdd.take(10)
// 推荐: 将data保存 to filesystem
rdd.saveAsTextFile("hdfs://path/to/output")
8. Spark SQL optimization
Spark SQL is Sparkprocessingstructure化data module, 也 has 一些specific optimization策略.
8.1 data格式选择
选择合适 data格式可以improvingdata 读写efficiency:
- Parquet: 列式store, support压缩, 适合 complex query
- ORC: 列式store, support压缩 and index, 适合large-scaledata
- Avro: 行式store, support模式演变, 适合data交换
// 读取Parquetfile
val df = spark.read.parquet("hdfs://path/to/parquet")
// 写入Parquetfile, 启用snappy压缩
df.write.format("parquet")
.option("compression", "snappy")
.save("hdfs://path/to/output")
8.2 谓词 under 推
谓词 under 推 is 指将filter条件 under 推 to datasources, reducingdata读取量.
// 谓词 under 推: filter条件会被 under 推 to datasources val filtered = df.filter($"age" > 30).select($"name", $"age")
8.3 cacheDataFrame
for 于频繁using DataFrame, 可以将其cache, reducing重复计算.
// cacheDataFrame df.cache() // usingcache DataFrame val result1 = df.filter(...) val result2 = df.groupBy(...).agg(...)
9. monitor and 调优tool
monitor is performanceoptimization important 手段, throughmonitor可以Understandjob 执行circumstances, 找出performance瓶颈.
9.1 Spark UI
Spark UI is Spark in 置 monitortool, 可以查看job 执行circumstances, including:
- job DAGgraph
- task 执行时间
- Executor resourceusingcircumstances
- Shuffledata量
9.2 loganalysis
Spark会生成详细 log, throughanalysislog可以Understandjob 执行circumstances and errorinformation.
9.3 第三方monitortool
除了Spark UI out , 还 has 一些第三方monitortool可以用于Sparkmonitor:
- Ganglia: distributedmonitorsystem, 可以monitorcluster resourceusingcircumstances
- Prometheus: open-sourcemonitorsystem, support many 维度dataquery
- Grafana: datavisualizationtool, 可以生成各种monitorgraph表
实践练习
练习1: resourceconfiguration调优
run一个Sparkjob, through调整Executor数量, memory big small and CPUcore数, 观察job执行时间 变化.
练习2: data倾斜processing
creation一个存 in data倾斜 RDD, 尝试using不同 data倾斜processingmethod, 比较processing before after job执行时间.
练习3: 序列化optimization
比较Java序列化 and Kryo序列化 performancediff, 观察序列化时间 and 序列化 after data big small .
练习4: cache策略optimization
run一个package含 many 个RDD转换operation job, 尝试using不同 cache级别, 比较job执行时间 变化.
练习5: Shuffleoptimization
run一个package含Shuffleoperation job, through调整Shuffleparallel度 and Shuffle manager, 观察job执行时间 变化.
10. summarized
本tutorial介绍了Sparkperformanceoptimization 各种策略, including:
- resourcemanagementoptimization: 调整Executor and Driver configuration, 合理分配memoryresource
- data倾斜processing: using增加parallel度, 更 good shuffle算子, data预processing, 广播 big 表 and 自定义partitionetc.method
- 序列化optimization: usingKryo序列化, register常用class, reducing序列化时间 and data big small
- cache策略optimization: 选择合适 cache级别, 及时释放cache
- job级别optimization: 调整parallel度, Shuffleparameter and fault tolerancemechanism
- dataprocessingoptimization: 避免不必要 转换, using窄依赖转换, 合理using广播variable
- Spark SQLoptimization: 选择合适 data格式, using谓词 under 推, cacheDataFrame
- monitor and 调优tool: usingSpark UI, loganalysis and 第三方monitortool
Sparkperformanceoptimization is a 持续 过程, 需要根据具体 job and data特点, 选择合适 optimization策略. throughcontinuously monitor and 调优, 可以显著improvingSparkjob 执行efficiency, 降 low resource消耗, improvingsystem 可scale性.