Spark Streaming 实时processing
1. Spark Streaming overview
Spark Streaming is Spark用于processing实时流data module, 它providing了 high throughput, 可fault tolerance 实时流processingcapacity. Spark Streaming可以processing来自各种datasources 流data, such asKafka, Flume, Twitter, ZeroMQ, TCP套接字etc., 并将processing结果保存 to filesystem, datalibrary or 仪表盘etc..
Spark Streaming 主要特点including:
- high throughput: able toprocessing每秒数百万级别 event.
- 可fault tolerance: throughRDD 血统mechanism, able to自动 from nodefailureinrestore.
- 可scale: 可以scale to 数百个node.
- and Sparkecosystem集成: 可以 and Spark othermodule (such asSpark SQL, MLlib, GraphX) 无缝集成, implementation流processing and 批processing, 机器Learning 结合.
- support many 种programminglanguage: supportScala, Java, Python and R.
2. Spark Streaming architecture
Spark Streaming architecture主要including以 under component:
- 输入sources: 产生流data 来sources, such asKafka, Flume, TCP套接字etc..
- Receiver: 接收输入sources data, 并将其store to Spark memoryin.
- DStream: 离散流 (Discretized Stream) , is Spark Streaming coreabstraction, 代表一系列连续 RDD.
- processing引擎: usingSpark core引擎processingDStream, 执行转换 and 行动operation.
- 输出接收器: 将processing结果输出 to out 部system, such asfilesystem, datalibrary, 仪表盘etc..
Spark Streaming working principles is 将连续 流data分割成 small 批次 (Batch) , 然 after usingSpark 批processing引擎processing这些批次. 这种processing方式被称 for 微批processing (Micro-Batch Processing) , 它 latency通常 in 几百毫秒 to 几秒之间.
提示
in Spark 2.0之 after , Spark引入了structure化流processing (Structured Streaming) , 它 is 基于DataFrame and Dataset API advanced流processingAPI, providing了更 low latency (毫秒级) and 更 simple programmingmodel. for 于 new 流processingapplication, 建议usingstructure化流processing.
3. DStream API
DStream (Discretized Stream) is Spark Streaming coreabstraction, 代表一系列连续 RDD, 每个RDDpackage含一定时间间隔 in data. DStreamsupport两种class型 operation: 转换operation (Transformation) and 输出operation (Output Operation) .
3.1 creationDStream
可以through以 under 几种方式creationDStream:
// Scalaexample
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// creationStreamingContext, 批processing间隔 for 1秒
val ssc = new StreamingContext(sparkConf, Seconds(1))
// from TCP套接字creationDStream
val lines = ssc.socketTextStream("localhost", 9999)
// from KafkacreationDStream
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-group",
"auto.offset.reset" -> "latest",
"enable.auto.submitting" -> (false: java.lang.Boolean)
)
val topics = Array("topic1", "topic2")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// Pythonexample
from pyspark.streaming import StreamingContext
# creationStreamingContext, 批processing间隔 for 1秒
ssc = StreamingContext(sparkContext, 1)
# from TCP套接字creationDStream
lines = ssc.socketTextStream("localhost", 9999)
3.2 DStream转换operation
DStream 转换operation and RDD 转换operationclass似, includingmap, filter, flatMap, reduceByKeyetc.. 转换operation可以分 for 无status转换 and has status转换:
- 无status转换: 每个批次 processing不依赖于之 before 批次 data, such asmap, filter, flatMapetc..
- has status转换: 每个批次 processing依赖于之 before 批次 data, such asupdateStateByKey, reduceByKeyAndWindowetc..
// Scalaexample
val lines = ssc.socketTextStream("localhost", 9999)
// 无status转换: 将每行文本拆分 for 单词
val words = lines.flatMap(_.split(" "))
// 无status转换: 将每个单词map for (单词, 1)
val pairs = words.map(word => (word, 1))
// 无status转换: 计算每个单词 出现次数
val wordCounts = pairs.reduceByKey(_ + _)
// has status转换: usingupdateStateByKeymaintenance每个单词 累计计数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val runCounts = pairs.updateStateByKey(updateFunc)
// has status转换: using窗口operation计算滑动窗口 in 单词计数
val windowedWordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(10), Seconds(5))
// Pythonexample
lines = ssc.socketTextStream("localhost", 9999)
# 无status转换: 将每行文本拆分 for 单词
words = lines.flatMap(lambda line: line.split(" "))
# 无status转换: 将每个单词map for (单词, 1)
pairs = words.map(lambda word: (word, 1))
# 无status转换: 计算每个单词 出现次数
word_counts = pairs.reduceByKey(lambda a, b: a + b)
# has status转换: usingupdateStateByKeymaintenance每个单词 累计计数
def update_func(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
run_counts = pairs.updateStateByKey(update_func)
# has status转换: using窗口operation计算滑动窗口 in 单词计数
windowed_word_counts = pairs.reduceByKeyAndWindow(lambda a, b: a + b, 10, 5)
3.3 DStream输出operation
输出operation is 将DStream processing结果输出 to out 部system operation, such asprint, saveAsTextFiles, foreachRDDetc.. 输出operation is 触发计算 行动operation, 只 has 执行了输出operation, DStream 转换operation才会practical执行.
// Scalaexample
val wordCounts = pairs.reduceByKey(_ + _)
// 输出 to 控制台
wordCounts.print()
// 保存 for 文本file
wordCounts.saveAsTextFiles("hdfs://path/to/output/prefix", "suffix")
// 输出 to datalibrary or other out 部system
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 连接 to datalibrary
val connection = createConnection()
partitionOfRecords.foreach { record =>
// 执行datalibraryoperation
val sql = s"INSERT INTO word_counts (word, count) VALUES ('${record._1}', ${record._2})"
connection.createStatement().execute(sql)
}
// 关闭连接
connection.close()
}
}
// Pythonexample
word_counts = pairs.reduceByKey(lambda a, b: a + b)
# 输出 to 控制台
word_counts.pprint()
# 保存 for 文本file
word_counts.saveAsTextFiles("hdfs://path/to/output/prefix", "suffix")
# 输出 to datalibrary or other out 部system
word_counts.foreachRDD(lambda rdd: rdd.foreachPartition(save_to_database))
3.4 启动 and 停止StreamingContext
creation good DStream and 输出operation after , 需要启动StreamingContext, 开始接收 and processingdata. 当processingcompletion after , 需要停止StreamingContext.
// Scalaexample // 启动StreamingContext ssc.start() // etc.待processingcompletion ssc.awaitTermination() // 停止StreamingContext ssc.stop() // Pythonexample # 启动StreamingContext ssc.start() # etc.待processingcompletion ssc.awaitTermination() # 停止StreamingContext ssc.stop()
4. structure化流processing (Structured Streaming)
structure化流processing is Spark 2.0引入 advanced流processingAPI, 它基于DataFrame and Dataset API, providing了更 low latency (毫秒级) and 更 simple programmingmodel. structure化流processing将流data视 for continuously追加 表, using and 静态data相同 query方式processing流data.
structure化流processing 主要特点including:
- 声明式API: usingSQL or DataFrame/Dataset APIwriting流processingquery, 无需关注底层 流processing细节.
- low latency: support毫秒级 latency.
- Exactly-Once语义: 确保每个event只被processing一次, 即使 in nodefailure circumstances under .
- and 静态data兼容: 可以using相同 codeprocessing静态data and 流data.
- 自动optimization: in 置了基于代价 queryoptimization器, 可以自动optimizationquery计划.
4.1 creation流DataFrame
可以through以 under 几种方式creation流DataFrame:
// Scalaexample
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
// creationSparkSession
val spark = SparkSession.builder()
.appName("Structured Streaming Example")
.master("local[*]")
.getOrCreate()
// from TCP套接字creation流DataFrame
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// from Kafkacreation流DataFrame
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
// Pythonexample
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.streaming import Trigger
# creationSparkSession
spark = SparkSession.builder \
.appName("Structured Streaming Example") \
.master("local[*]") \
.getOrCreate()
# from TCP套接字creation流DataFrame
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
4.2 流DataFrameoperation
流DataFramesupport and 静态DataFrame相同 operation, including选择列, filter, group, aggregate, sortetc..
// Scalaexample
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将每行文本拆分 for 单词
val words = lines.select(explode(split(col("value"), " ")).alias("word"))
// 计算每个单词 出现次数
val wordCounts = words.groupBy("word").count()
// Pythonexample
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 将每行文本拆分 for 单词
words = lines.select(explode(split(col("value"), " ")).alias("word"))
# 计算每个单词 出现次数
word_counts = words.groupBy("word").count()
4.3 输出流结果
可以将流processing结果输出 to 以 under 位置:
- 控制台: 用于debug and test.
- memory: 将结果store in memoryin 表in, 用于交互式query.
- filesystem: 将结果写入filesystem, such asHDFS, S3etc..
- Kafka: 将结果写入Kafka主题.
- ForeachWriter: 自定义输出逻辑, 将结果写入datalibrary or other out 部system.
// Scalaexample
// 输出 to 控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
// 输出 to filesystem
val query = wordCounts.writeStream
.outputMode("append")
.format("parquet")
.option("path", "hdfs://path/to/output")
.option("checkpointLocation", "hdfs://path/to/checkpoint")
.start()
// 输出 to Kafka
val query = wordCounts.writeStream
.outputMode("update")
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output_topic")
.option("checkpointLocation", "hdfs://path/to/checkpoint")
.start()
// Pythonexample
# 输出 to 控制台
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# 输出 to filesystem
query = word_counts.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "hdfs://path/to/output") \
.option("checkpointLocation", "hdfs://path/to/checkpoint") \
.start()
# 输出 to Kafka
query = word_counts.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output_topic") \
.option("checkpointLocation", "hdfs://path/to/checkpoint") \
.start()
4.4 输出模式
structure化流processingsupport三种输出模式:
- complete模式: 将整个结果表输出 to out 部system. 适用于package含aggregateoperation query.
- Append模式: 只将 new 添加 to 结果表in 行输出 to out 部system. 适用于不package含aggregateoperation or package含追加aggregate (such ascount, sum) query.
- Update模式: 只将结果表in发生变化 行输出 to out 部system. 适用于package含aggregateoperation query.
4.5 触发间隔
可以throughTrigger设置流processing 触发间隔, 控制query 执行频率:
// Scalaexample
// 固定间隔触发, 每5秒执行一次
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
// 连续触发, 尽可能 fast 地执行
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
// Pythonexample
# 固定间隔触发, 每5秒执行一次
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="5 seconds") \
.start()
# 连续触发, 尽可能 fast 地执行
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(continuous="1 second") \
.start()
5. Spark Streaming and structure化流processing 比较
Spark Streaming and structure化流processing is Spark 两种流processingAPI, 它们各 has Pros and Cons:
| features | Spark Streaming | structure化流processing |
|---|---|---|
| abstraction | DStream | DataFrame/Dataset |
| processing方式 | 微批processing (latency几百毫秒 to 几秒) | 微批processing or 连续processing (latency毫秒级) |
| programmingmodel | commands式API | 声明式API |
| fault tolerance语义 | At-Least-Once or Exactly-Once | Exactly-Once |
| and 静态data兼容 | 不兼容, 需要单独writingcode | 兼容, 可以using相同 codeprocessing静态data and 流data |
| optimization | 基于RDD optimization | 基于Catalyst optimizationr optimization |
| 推荐using | 仅用于maintenance old application | 用于 new 流processingapplication |
6. Spark Streaming best practices
in usingSpark Streaming时, 应遵循以 under best practices:
- 合理设置批processing间隔: 批processing间隔应根据data量 and processingcapacityfor调整, 一般建议 in 500毫秒 to 几秒之间.
- 合理设置partition数量: partition数量应 and cluster CPUcore数相匹配, 一般建议partition数量 for CPUcore数 2-3倍.
- usingCheckpoint: for 于 has status 流processingapplication, 应设置CheckpointTable of Contents, 以便 in nodefailure时restorestatus.
- usingExactly-Once语义: for 于需要精确计算 application, 应usingExactly-Once语义, 确保每个event只被processing一次.
- optimizationReceiver: for 于usingReceiver application, 应合理设置Receiver 数量 and 位置, 避免成 for performance瓶颈.
- usingDirect Kafka API: for 于Kafkadatasources, 建议usingDirect Kafka API, 它providing了更 good performance and Exactly-Once语义.
- 避免usingglobal windowoperation: global windowoperation会将所 has data发送 to 单个nodeforprocessing, 可能导致performance瓶颈.
- 合理设置memory: 流processingapplication需要足够 memory来store流data and status, 应合理设置Driver and Executor memory.
7. structure化流processingbest practices
in usingstructure化流processing时, 应遵循以 under best practices:
- usingAppend输出模式: Append输出模式 is 最 has 效 输出模式, 应优先using.
- 合理设置触发间隔: 触发间隔应根据data量 and processingcapacityfor调整, for 于 low latencyapplication, 可以using连续触发模式.
- usingCheckpoint: 必须设置CheckpointTable of Contents, 以便 in nodefailure时restorestatus.
- optimizationShufflepartition数量: 根据data量 and cluster规模, 合理设置shufflepartition数量.
- usingBroadcast Join: for 于 small 表, usingBroadcast Join可以避免data洗牌, improvingperformance.
- 避免usingexpensiveoperation: such asorderBy, distinctetc.operation, 这些operation会导致performance瓶颈.
- usingWatermarkprocessinglatencydata: for 于存 in latency data, usingWatermark可以自动processinglatencydata, 避免status无限增 long .
实践练习
练习1: usingDStreamfor单词计数
- creation一个Spark Streamingapplication, from TCP套接字接收文本data.
- usingDStream APIfor单词计数.
- usingupdateStateByKeymaintenance每个单词 累计计数.
- using窗口operation计算滑动窗口 in 单词计数.
- 将结果输出 to 控制台 and filesystem.
练习2: usingstructure化流processingfor实时dataanalysis
- creation一个structure化流processingapplication, from Kafka接收JSON格式 eventdata.
- usingDataFrame/Dataset API解析JSONdata, 提取字段.
- for实时dataanalysis, such as计算每个class别 event数量, 平均event big small etc..
- usingWatermarkprocessinglatencydata.
- 将结果输出 to 控制台 and Kafka.
练习3: 集成Spark SQL and 流processing
- creation一个structure化流processingapplication, from Kafka接收销售data.
- usingSpark SQLanalysis销售data, 计算每个产品 销售额, 销售数量etc..
- 将analysis结果输出 to memory表in.
- usingSpark SQLquerymemory表, 生成实时仪表盘.
8. commonissues及solution
8.1 data倾斜
issues: 流processingapplicationin 某个taskprocessing时间过 long , 导致latency增加.
solution:
- check is 否存 in data倾斜, such as某个键 data量远 big 于other键.
- using加盐techniques, 将倾斜 键拆分 for many 个子键, 分散 to 不同 partition.
- 调整partition数量, 增加parallel度.
8.2 memory不足
issues: 流processingapplication出现OutOfMemoryError.
solution:
- 增加Driver or Executor memoryconfiguration.
- reducing批processing间隔, 降 low 每个批次 data量.
- optimizationstatusmanagement, such asusing更 small status or 定期cleanstatus.
- using更 high 效 序列化方式, such asKryo.
8.3 latency增加
issues: 流processingapplication latency逐渐增加.
solution:
- check is 否存 in data积压, such as输入速率 big 于processing速率.
- 增加cluster resource, such asCPUcore数 or memory.
- optimizationprocessing逻辑, reducing每个批次 processing时间.
- 调整批processing间隔 or 触发间隔.
8.4 CheckpointTable of Contentsissues
issues: application重启 after 无法 from Checkpointrestore.
solution:
- 确保CheckpointTable of Contents存 in 且 has 写入permission.
- 确保application code and 依赖没 has 变化, 否则可能导致Checkpoint不兼容.
- 定期clean old Checkpointfile, 避免占用过 many store空间.
9. summarized
本tutorial详细介绍了Spark Streaming and structure化流processing basicconcepts, architecture and usingmethod. Spark Streaming is 基于DStream 流processingAPI, 适用于传统 流processingapplication; structure化流processing is 基于DataFrame/Dataset API advanced流processingAPI, providing了更 low latency and 更 simple programmingmodel, 适用于 new 流processingapplication.
through本tutorial Learning, 您应该able to:
- understandingSpark Streaming and structure化流processing basicconcepts and architecture.
- MasterDStream API usingmethod.
- Masterstructure化流processing usingmethod.
- understanding两种流processingAPI 区别 and 适用场景.
- 遵循流processing best practices.
in after 续tutorialin, 我们将深入LearningSpark 机器LearninglibraryMLlib and graphprocessinglibraryGraphX, MasterSpark 各种application场景 and best practices.