RDD programmingmodel
1. RDD overview
RDD (Resilient Distributed Dataset, 弹性distributeddata集) is Spark coreabstraction, 代表一个不可变, 可partition, 元素可parallel计算 collection. RDD具 has 以 under 特点:
- 弹性: RDD可以自动 from nodefailureinrestore, through血统 (Lineage) mechanism重 new 计算loss datapartition.
- distributed: RDD datastore in cluster many 个node on , 可以parallelprocessing.
- 不可变: RDD一旦creation, 就不能被modify, 只能through转换operationcreation new RDD.
- partition: RDD data被划分 for many 个partition (Partition) , 每个partition可以 in 不同 node on parallelprocessing.
提示
RDD is Spark最底层 abstraction, providing了 low 级别 operationinterface. for 于 big many 数application程序, 建议using更 high 层级 API (such asDataFrame and Dataset) , 它们providing了更 good performanceoptimization and 更简洁 programminginterface.
2. RDD creation
in Sparkin, 可以through以 under 几种方式creationRDD:
2.1 from collectioncreation
可以 from Driver程序in collection (such aslist, array) creationRDD, 适用于test and small 规模dataprocessing.
// Scalaexample
val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.makeRDD(List("a", "b", "c"))
// Pythonexample
rdd1 = sc.parallelize(range(1, 11))
rdd2 = sc.parallelize(["a", "b", "c"])
2.2 from out 部storecreation
可以 from HDFS, 本地filesystem, S3etc. out 部storesystemcreationRDD, 这 is produceenvironmentin最常用 方式.
// Scalaexample
val rdd = sc.textFile("hdfs://path/to/file.txt")
val rdd2 = sc.textFile("file:///local/path/to/file.txt", minPartitions=4)
// Pythonexample
rdd = sc.textFile("hdfs://path/to/file.txt")
rdd2 = sc.textFile("file:///local/path/to/file.txt", minPartitions=4)
2.3 from otherRDD转换而来
可以through for 已 has RDD执行转换operation, creation new RDD.
// Scalaexample val rdd1 = sc.parallelize(1 to 10) val rdd2 = rdd1.map(x => x * 2) // Pythonexample rdd1 = sc.parallelize(range(1, 11)) rdd2 = rdd1.map(lambda x: x * 2)
3. RDD operationclass型
RDDsupport两种class型 operation: 转换operation (Transformation) and 行动operation (Action) .
3.1 转换operation
转换operation is 指 from 一个RDD生成一个 new RDD operation, 转换operation is 惰性 , 只 has 当行动operation被执行时, 转换operation才会practical执行. common 转换operationincluding:
- map(func): for RDDin 每个元素applicationfuncfunction, 返回 new RDD.
- filter(func): filter出RDDin满足funcfunction条件 元素, 返回 new RDD.
- flatMap(func): for RDDin 每个元素applicationfuncfunction, 将结果扁平化 for new RDD.
- union(otherRDD): 返回两个RDD 并集.
- intersection(otherRDD): 返回两个RDD 交集.
- distinct(): 返回去重 after RDD.
- groupBy(func): 按照funcfunction return value for RDD元素forgroup.
- reduceByKey(func): for 键值 for RDD按照键forgroup, 然 after for 每个组applicationfuncfunction.
- sortByKey(ascending=true): for 键值 for RDD按照键forsort.
- join(otherRDD): for 两个键值 for RDDfor in 连接.
// Scalaexample
val rdd = sc.parallelize(1 to 10)
val mappedRDD = rdd.map(x => x * 2) // 转换operation: 每个元素乘以2
val filteredRDD = rdd.filter(x => x % 2 == 0) // 转换operation: filter出偶数
val flatMappedRDD = sc.parallelize(List("hello world", "hi spark")).flatMap(_.split(" ")) // 转换operation: 扁平化 for 单词
// Pythonexample
rdd = sc.parallelize(range(1, 11))
mapped_rdd = rdd.map(lambda x: x * 2)
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
flat_mapped_rdd = sc.parallelize(["hello world", "hi spark"]).flatMap(lambda x: x.split(" "))
3.2 行动operation
行动operation is 指触发计算并返回结果 to Driver程序 or 写入 out 部store operation. common 行动operationincluding:
- count(): 返回RDDin元素 数量.
- collect(): 将RDDin 所 has 元素返回给Driver程序, serving as一个array.
- first(): 返回RDDin 第一个元素.
- take(n): 返回RDDin before n个元素.
- reduce(func): usingfuncfunction for RDDin 元素foraggregate.
- fold(zeroValue)(func): class似于reduce, 但providing了一个初始值.
- foreach(func): for RDDin 每个元素applicationfuncfunction, 但不返回结果.
- saveAsTextFile(path): 将RDDin 元素保存 for 文本file.
- saveAsSequenceFile(path): 将RDDin 元素保存 for SequenceFile.
// Scalaexample
val rdd = sc.parallelize(1 to 10)
val count = rdd.count() // 行动operation: 返回元素数量
val first = rdd.first() // 行动operation: 返回第一个元素
val sum = rdd.reduce(_ + _) // 行动operation: 计算元素总 and
rdd.saveAsTextFile("hdfs://path/to/output") // 行动operation: 保存 to HDFS
// Pythonexample
rdd = sc.parallelize(range(1, 11))
count = rdd.count()
first = rdd.first()
sum_val = rdd.reduce(lambda x, y: x + y)
rdd.saveAsTextFile("hdfs://path/to/output")
注意
collect()operation会将RDDin 所 has datapull to Driver程序in, for 于large-scaledata集, 可能会导致Driver程序memory溢出. in produceenvironmentin, 应避免usingcollect()operationprocessinglarge-scaledata.
4. RDD 持久化
当RDD被 many 次using时, 可以将其持久化 to memory or diskin, 避免重复计算, improvingperformance.
4.1 持久化method
可以usingcache() or persist()method将RDD持久化:
// Scalaexample
val rdd = sc.textFile("hdfs://path/to/large/file.txt")
rdd.cache() // etc.同于persist(StorageLevel.MEMORY_ONLY)
// using不同 store级别
import org.apache.spark.store.StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK) // memory不足时溢出 to disk
// Pythonexample
rdd = sc.textFile("hdfs://path/to/large/file.txt")
rdd.cache()
rdd.persist(StorageLevel.MEMORY_AND_DISK)
4.2 store级别
Sparkproviding了 many 种store级别, 用于控制RDD 持久化方式:
- MEMORY_ONLY: 将RDDstore in memoryin, 不序列化, performance最 high , 但memory占用 big .
- MEMORY_ONLY_SER: 将RDDstore in memoryin, 序列化, memory占用 small , 但需要反序列化开销.
- MEMORY_AND_DISK: 优先store in memoryin, memory不足时溢出 to disk.
- MEMORY_AND_DISK_SER: 优先store in memoryin, memory不足时溢出 to disk, using序列化.
- DISK_ONLY: 仅store in diskin.
- OFF_HEAP: store in 堆 out memoryin, 需要configurationTachyon.
4.3 取消持久化
可以usingunpersist()method取消RDD 持久化, 释放resource:
// Scalaexample rdd.unpersist() // Pythonexample rdd.unpersist()
5. RDD partition
partition is RDDparallelprocessing basic单位, 每个partition for 应一个Task. 合理 partition策略可以improvingparallel度 and reducingdatamove.
5.1 partition数量
Spark默认 partition数量由以 under 因素决定:
- from collectioncreationRDD: 默认partition数量 for cluster CPUcore数.
- from filecreationRDD: 默认partition数量 for file 块数 ( for 于HDFS, 默认块 big small for 128MB) .
- 转换operation: big 部分转换operation会保持原RDD partition数量, 除非显式指定.
5.2 partitionoperation
可以using以 under method查看 and operationRDD partition:
// Scalaexample val rdd = sc.parallelize(1 to 100, 5) // creation5个partition RDD val numPartitions = rdd.getNumPartitions // 获取partition数量 val partitioner = rdd.partitioner // 获取partition器 // 重partition val repartitionedRDD = rdd.repartition(10) // 增加partition数量, 会导致shuffle val coalescedRDD = rdd.coalesce(3) // reducingpartition数量, 避免shuffle // Pythonexample rdd = sc.parallelize(range(1, 101), 5) num_partitions = rdd.getNumPartitions repartitioned_rdd = rdd.repartition(10) coalesced_rdd = rdd.coalesce(3)
5.3 自定义partition器
for 于键值 for RDD, 可以using自定义partition器, 根据业务requirements for dataforpartition:
// Scalaexample
import org.apache.spark.Partitioner
// 自定义partition器
class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
// 根据键 features返回partitionID
key.hashCode % numParts
}
}
// using自定义partition器
val pairsRDD = sc.parallelize(List((1, "a"), (2, "b"), (3, "c"), (4, "d")))
val customPartitionedRDD = pairsRDD.partitionBy(new CustomPartitioner(2))
6. RDD 依赖relationships
RDD之间存 in 依赖relationships, 分 for 窄依赖 and 宽依赖:
6.1 窄依赖
窄依赖 is 指父RDD 每个partition只被子RDD 一个partitionusing. 例such as: map, filter, union, coalesceetc.operation.
6.2 宽依赖
宽依赖 is 指父RDD 每个partition被子RDD many 个partitionusing, 会导致data洗牌 (Shuffle) . 例such as: groupByKey, reduceByKey, sortByKey, joinetc.operation.
提示
宽依赖会导致data洗牌, is Sparkjobperformance 主要瓶颈之一. in designSparkapplication程序时, 应尽量reducing宽依赖operation 数量, or through合理 partition策略optimization洗牌过程.
7. RDD 血统mechanism
血统 (Lineage) is 指RDD之间 依赖relationshipsgraph, 用于 in nodefailure时restoreloss datapartition. 当某个partitionloss时, Spark可以根据血统graph, 重 new 计算该partition data, 而不需要重 new 计算整个RDD.
血统mechanism is RDD弹性 core, 它使得Spark可以 in 不copydata circumstances under , implementation high 效 fault toleranceprocessing.
8. RDD programmingbest practices
in usingRDDprogramming时, 应遵循以 under best practices:
- 优先usingadvancedAPI: for 于 big many 数application程序, 建议usingDataFrame and Dataset API, 它们providing了更 good performanceoptimization and 更简洁 programminginterface.
- 合理设置partition数量: partition数量应 and cluster CPUcore数相匹配, 一般建议partition数量 for CPUcore数 2-3倍.
- 避免usingcollect(): for 于large-scaledata集, 应避免usingcollect()operation, 以免导致Driver程序memory溢出.
- 及时持久化RDD: 当RDD被 many 次using时, 应及时将其持久化 to memory or diskin, 避免重复计算.
- using广播variable: for 于 small data集, 可以using广播variable (Broadcast Variable) 将data广播 to 所 has node, reducingnetwork传输.
- using累加器: for 于需要 in distributedenvironmentinfor累加 variable, 应using累加器 (Accumulator) , 而不 is 普通variable.
- 避免 small fileissues: processing big 量 small file时, 应usingcoalesce() or repartition()reducingpartition数量, or usingHadoop CombineFileInputFormatmerge small file.
- optimization洗牌operation: for 于宽依赖operation, 可以through合理 partition策略, 调整shuffle缓冲区 big small etc.方式optimization洗牌过程.
实践练习
练习1: basicRDDoperation
- from collectioncreation一个package含1 to 100 RDD.
- usingmapoperation将每个元素乘以2.
- usingfilteroperationfilter出偶数.
- usingreduceoperation计算所 has 元素 总 and .
- usingtakeoperation获取 before 10个元素.
练习2: 文本fileprocessing
- from HDFS or 本地filesystem读取一个文本file.
- usingflatMapoperation将文本拆分 for 单词.
- usingmap and reduceByKeyoperation计算每个单词 出现次数.
- usingsortByKeyoperation按照单词出现次数降序sort.
- 将结果保存 to HDFS or 本地filesystem.
练习3: RDD持久化
- creation一个package含1 to 1000000 RDD.
- for RDD执行 many 次转换 and 行动operation, 记录执行时间.
- usingcache()method将RDD持久化 to memoryin.
- 再次执行相同 转换 and 行动operation, 记录执行时间.
- 比较两次执行时间, analysis持久化 for performance 影响.
练习4: RDDpartitionoperation
- creation一个package含1 to 1000 RDD, 指定partition数量 for 4.
- usingrepartition()method将partition数量增加 to 8.
- usingcoalesce()method将partition数量reducing to 2.
- 查看每个partition in 容, understandingrepartition and coalesce 区别.
练习5: 自定义partition器
- creation一个键值 for RDD, package含100个键值 for .
- implementation一个自定义partition器, 将键 for 偶数 键值 for 分 to 一个partition, 键 for 奇数 键值 for 分 to 另一个partition.
- using自定义partition器 for RDDforpartition.
- verificationpartition结果 is 否符合预期.
9. commonissues及solution
9.1 memory溢出
issues: 执行RDDoperation时出现OutOfMemoryError.
solution:
- 增加Driver or Executor memoryconfiguration.
- reducingRDD partition数量.
- using持久化级别MEMORY_AND_DISK, 将data溢出 to disk.
- 避免usingcollect()operationprocessinglarge-scaledata.
9.2 执行速度 slow
issues: RDDoperation执行速度 slow .
solution:
- check is 否 has 重复计算, 及时持久化RDD.
- optimization洗牌operation, 调整partition数量 and shuffle缓冲区 big small .
- checkdata倾斜issues, using加盐, 自定义partition器etc.方式解决.
- 考虑using更 high 层级 API (such asDataFrame and Dataset) .
9.3 data倾斜
issues: 某个partition data量远 big 于otherpartition, 导致task执行时间不均衡.
solution:
- for 倾斜 键for加盐processing, 将一个 big partition拆分 for many 个 small partition.
- using自定义partition器, 将data均匀分布 to 各个partition.
- using随机 before 缀, 将倾斜 键分散 to 不同 partition.
- for 于aggregateoperation, 可以usingreduceByKeyLocallyetc.operation先 in 本地aggregate, 再全局aggregate.
10. summarized
本tutorial详细介绍了Spark RDD programmingmodel, includingRDD concepts, creation方式, operationclass型, 持久化, partition, 依赖relationships and 血统mechanismetc.. RDD is Spark最底层 abstraction, providing了 low 级别 operationinterface, for 于understandingSpark working principles非常 important .
虽然RDDproviding了强 big functions, 但 for 于 big many 数application程序, 建议using更 high 层级 API (such asDataFrame and Dataset) , 它们providing了更 good performanceoptimization and 更简洁 programminginterface. in after 续tutorialin, 我们将深入LearningSpark SQL, Spark Streaming, MLlibetc.advancedcomponent, MasterSpark 各种application场景 and best practices.