Spark SQL and DataFrame
1. Spark SQL overview
Spark SQL is Spark用于processingstructure化data module, 它providing了两种主要 programmingabstraction: DataFrame and Dataset, 以及用于执行SQLquery commands行interface and JDBC/ODBCserver. Spark SQL 主要特点including:
- 统一data访问: 可以using相同 API访问不同 datasources, such asHDFS, Hive, Avro, Parquet, JSONetc..
- 混合执行: 可以 in 同一个application程序in混合usingSQLquery and Spark otherAPI (such asRDD, DataFrame, Dataset) .
- optimization执行: in 置了基于代价 queryoptimization器 (Catalyst optimizationr) , 可以自动optimizationquery计划.
- 兼容Hive: supportHiveQL语法 and Hive UDF, 可以直接访问现 has Hive表.
- 标准连接: providing了JDBC and ODBCinterface, 可以 and 传统 BItool集成.
2. Spark SQL architecture
Spark SQL architecture主要including以 under component:
- Catalyst optimizationr: 基于代价 queryoptimization器, 负责将SQLquery转换 for high 效 执行计划.
- Spark SQL core: processingSQLquery corecomponent, including解析器, analysis器, optimization器 and 执行器.
- DataFrame API: providing了面向object programminginterface, 用于processingstructure化data.
- Dataset API: 结合了RDD 强class型features and DataFrame optimization执行features.
- Hivesupport: 允许访问现 has Hive表 and usingHiveQL语法.
- JDBC/ODBCserver: 允许 out 部toolthrough标准 JDBC and ODBCinterface访问Spark SQL.
提示
Spark SQL optimization器 (Catalyst optimizationr) is 其core竞争力之一, 它可以自动optimizationquery计划, including谓词 under 推, 列裁剪, 常量fold, Join重sortetc.optimizationtechniques, from 而improvingqueryperformance.
3. DataFrame Introduction
DataFrame is a distributed datacollection, 它将data组织成命名列 形式, class似于relationships型datalibraryin 表. DataFrame具 has 以 under 特点:
- structure化: DataFramein data具 has 预定义 schema, 即列名 and dataclass型.
- 不可变: DataFrame一旦creation, 就不能被modify, 只能through转换operationcreation new DataFrame.
- distributed: DataFrame datastore in cluster many 个node on , 可以parallelprocessing.
- optimization执行: DataFrame operation会被Catalyst optimizationr自动optimization, 执行efficiency high 于RDD.
- support many 种datasources: 可以 from many 种datasourcescreationDataFrame, such asfile, datalibrary, RDDetc..
4. creation DataFrame
in Sparkin, 可以through以 under 几种方式creationDataFrame:
4.1 from filecreation
可以 from 各种file格式creationDataFrame, such asJSON, CSV, Parquet, Avroetc..
// Scalaexample
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate()
// from JSONfilecreationDataFrame
val df1 = spark.read.json("hdfs://path/to/people.json")
// from CSVfilecreationDataFrame
val df2 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("hdfs://path/to/people.csv")
// from ParquetfilecreationDataFrame
val df3 = spark.read.parquet("hdfs://path/to/people.parquet")
// Pythonexample
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Spark SQL Example") \
.master("local[*]") \
.getOrCreate()
# from JSONfilecreationDataFrame
df1 = spark.read.json("hdfs://path/to/people.json")
# from CSVfilecreationDataFrame
df2 = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("hdfs://path/to/people.csv")
# from ParquetfilecreationDataFrame
df3 = spark.read.parquet("hdfs://path/to/people.parquet")
4.2 from RDDcreation
可以 from RDDcreationDataFrame, has 两种方式:
- using反射推断Schema: 适用于已知dataclass型 circumstances.
- 显式指定Schema: 适用于未知dataclass型 or 需要精确控制Schema circumstances.
// Scalaexample
// 定义Case Class
case class Person(name: String, age: Int)
// from RDDcreationDataFrame (using反射推断Schema)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = spark.createDataFrame(rdd)
// from RDDcreationDataFrame (显式指定Schema)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val rdd = spark.sparkContext.parallelize(Seq(Row("Alice", 25), Row("Bob", 30)))
val schema = StructType(Seq(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
))
val df = spark.createDataFrame(rdd, schema)
// Pythonexample
# from RDDcreationDataFrame (using反射推断Schema)
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([Row(name="Alice", age=25), Row(name="Bob", age=30)])
df = spark.createDataFrame(rdd)
# from RDDcreationDataFrame (显式指定Schema)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
rdd = spark.sparkContext.parallelize([("Alice", 25), ("Bob", 30)])
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
4.3 from Hive表creation
可以直接 from 现 has Hive表creationDataFrame.
// Scalaexample
val df = spark.table("hive_table_name")
// or 者usingSQLquery
val df = spark.sql("SELECT * FROM hive_table_name")
// Pythonexample
df = spark.table("hive_table_name")
# or 者usingSQLquery
df = spark.sql("SELECT * FROM hive_table_name")
5. DataFrame operation
DataFramesupport两种class型 operation: 转换operation (Transformation) and 行动operation (Action) .
5.1 转换operation
转换operation is 指 from 一个DataFrame生成一个 new DataFrame operation, 转换operation is 惰性 , 只 has 当行动operation被执行时, 转换operation才会practical执行. common 转换operationincluding:
- select(): 选择指定 列.
- filter(): filter出满足条件 行.
- where(): and filter()functions相同, 用于filter行.
- groupBy(): 按照指定 列forgroup.
- orderBy(): 按照指定 列forsort.
- join(): 将两个DataFrame按照指定 条件连接.
- withColumn(): 添加 or modify列.
- drop(): delete指定 列.
- distinct(): 返回去重 after DataFrame.
- limit(): 返回 before n行data.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")
// 选择指定 列
val selectedDF = df.select("name", "age")
// filter出年龄 big 于25 人
val filteredDF = df.filter(df("age") > 25)
// 按照年龄group, 计算每组 人数
val groupedDF = df.groupBy("age").count()
// 按照年龄sort
val sortedDF = df.orderBy(df("age").desc)
// 添加 new 列
val newDF = df.withColumn("age_plus_10", df("age") + 10)
// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")
# 选择指定 列
selected_df = df.select("name", "age")
# filter出年龄 big 于25 人
filtered_df = df.filter(df["age"] > 25)
# 按照年龄group, 计算每组 人数
grouped_df = df.groupBy("age").count()
# 按照年龄sort
sorted_df = df.orderBy(df["age"].desc())
# 添加 new 列
new_df = df.withColumn("age_plus_10", df["age"] + 10)
5.2 行动operation
行动operation is 指触发计算并返回结果 to Driver程序 or 写入 out 部store operation. common 行动operationincluding:
- show(): 显示DataFrame in 容.
- count(): 返回DataFramein 行数.
- first(): 返回DataFramein 第一行.
- head(n): 返回DataFramein before n行.
- collect(): 将DataFramein 所 has datapull to Driver程序in, serving as一个array.
- write(): 将DataFrame写入 out 部store.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")
// 显示DataFrame in 容
df.show()
// 显示 before 10行
df.show(10)
// 返回行数
val count = df.count()
// 返回第一行
val firstRow = df.first()
// 返回 before 5行
val headRows = df.head(5)
// 将DataFrame写入Parquetfile
df.write.parquet("hdfs://path/to/output.parquet")
// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")
# 显示DataFrame in 容
df.show()
# 显示 before 10行
df.show(10)
# 返回行数
count = df.count()
# 返回第一行
first_row = df.first()
# 返回 before 5行
head_rows = df.head(5)
# 将DataFrame写入Parquetfile
df.write.parquet("hdfs://path/to/output.parquet")
6. Dataset API
Dataset is Spark 1.6引入 new abstraction, 它结合了RDD 强class型features and DataFrame optimization执行features. Dataset仅 in Scala and Javain可用, Python and Rin不support.
6.1 creation Dataset
可以through以 under 几种方式creationDataset:
// Scalaexample
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Dataset Example")
.master("local[*]")
.getOrCreate()
// import隐式转换
import spark.implicits._
// from collectioncreationDataset
val ds1 = Seq(("Alice", 25), ("Bob", 30)).toDS()
// 定义Case Class
case class Person(name: String, age: Int)
val ds2 = Seq(Person("Alice", 25), Person("Bob", 30)).toDS()
// from DataFrame转换 for Dataset
val df = spark.read.json("hdfs://path/to/people.json")
val ds3 = df.as[Person]
// from RDD转换 for Dataset
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds4 = rdd.toDS()
6.2 Dataset operation
Datasetsupport and DataFrameclass似 operation, 同时还supportRDD operation, such asmap, filter, reduceetc..
// Scalaexample
import spark.implicits._
case class Person(name: String, age: Int)
val ds = Seq(Person("Alice", 25), Person("Bob", 30), Person("Charlie", 35)).toDS()
// usingDataFrame风格 operation
val filteredDS1 = ds.filter(ds("age") > 25)
val selectedDS1 = ds.select("name")
// usingDataset风格 operation
val filteredDS2 = ds.filter(p => p.age > 25)
val selectedDS2 = ds.map(p => p.name)
// 显示结果
filteredDS2.show()
selectedDS2.show()
7. Spark SQL query
Spark SQLsupportusingSQL语法querydata, 可以through以 under 几种方式执行SQLquery:
7.1 usingspark.sql()method
可以usingspark.sql()method执行SQLquery, 返回结果 for DataFrame.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")
// 将DataFrameregister for 临时视graph
df.createOrReplaceTempView("people")
// 执行SQLquery
val resultDF = spark.sql("SELECT name, age FROM people WHERE age > 25 ORDER BY age DESC")
// 显示结果
resultDF.show()
// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")
# 将DataFrameregister for 临时视graph
df.createOrReplaceTempView("people")
# 执行SQLquery
result_df = spark.sql("SELECT name, age FROM people WHERE age > 25 ORDER BY age DESC")
# 显示结果
result_df.show()
7.2 usingSQLfile
可以 from out 部file加载SQLquery并执行.
// Scalaexample
import scala.io.Source
// from file读取SQLquery
val sqlQuery = Source.fromFile("path/to/query.sql").mkString
// 执行SQLquery
val resultDF = spark.sql(sqlQuery)
// 显示结果
resultDF.show()
// Pythonexample
with open("path/to/query.sql", "r") as f:
sql_query = f.read()
# 执行SQLquery
result_df = spark.sql(sql_query)
# 显示结果
result_df.show()
7.3 临时视graph and 全局临时视graph
Spark SQLsupport两种class型 临时视graph:
- 临时视graph (Temp View) : 仅 in 当 before SparkSessionin可见, 当SparkSession关闭时, 临时视graph会被delete.
- 全局临时视graph (Global Temp View) : in 所 has SparkSessionin可见, 需要usingglobal_temp. before 缀访问, 当Sparkapplication程序结束时, 全局临时视graph会被delete.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")
// creation临时视graph
df.createOrReplaceTempView("people")
// creation全局临时视graph
df.createOrReplaceGlobalTempView("global_people")
// query临时视graph
spark.sql("SELECT * FROM people").show()
// query全局临时视graph
spark.sql("SELECT * FROM global_temp.global_people").show()
// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")
# creation临时视graph
df.createOrReplaceTempView("people")
# creation全局临时视graph
df.createOrReplaceGlobalTempView("global_people")
# query临时视graph
spark.sql("SELECT * FROM people").show()
# query全局临时视graph
spark.sql("SELECT * FROM global_temp.global_people").show()
8. Spark SQL optimization
Spark SQL in 置了基于代价 queryoptimization器 (Catalyst optimizationr) , 可以自动optimizationquery计划. 以 under is 一些common optimizationtechniques:
- 谓词 under 推: 将filter条件push to datasources, reducingdata读取量.
- 列裁剪: 只读取queryin需要 列, reducingdata读取量.
- 常量fold: in 编译时计算常量表达式, reducingrun时计算量.
- Join重sort: 根据表 big small 重 new sortJoin顺序, reducingin间结果 big small .
- 广播Join: for 于 small 表, 将其广播 to 所 has node, 避免data洗牌.
- code生成: 将query计划编译 for Javabytecode, improving执行efficiency.
8.1 optimizationconfiguration
可以through以 under configurationparameteroptimizationSpark SQL performance:
// in spark-defaults.confin添加以 under configuration spark.sql.autoBroadcastJoinThreshold 10485760 # 广播Join 阈值, 默认10MB spark.sql.shuffle.partitions 200 # shufflepartition数量, 默认200 spark.sql.sources.partitionColumnTypeInference.enabled true # is 否自动推断partition列class型 spark.sql.caseSensitive false # is 否区分 big small 写, 默认false spark.sql.parquet.compression.codec snappy # Parquetfile 压缩编码, 默认snappy spark.sql.execution.arrow.enabled true # is 否启用Arrowoptimization, 默认true (Spark 3.0+)
9. DataFrame and RDD 比较
DataFrame and RDD is Spark 两种主要programmingabstraction, 它们各 has Pros and Cons:
| features | RDD | DataFrame |
|---|---|---|
| dataclass型 | 强class型 | 弱class型 (Schema) |
| optimization | 无自动optimization | in 置Catalystoptimization器 |
| API | low 级API, flexible | advancedAPI, 简洁 |
| performance | 较 low | 较 high |
| 适用场景 | 非structure化data, complex 计算 | structure化data, SQLquery |
10. Spark SQL best practices
in usingSpark SQL时, 应遵循以 under best practices:
- 优先usingDataFrame/Dataset: for 于structure化data, 优先usingDataFrame or Dataset API, 它们providing了更 good performanceoptimization.
- 合理设置partition数量: 根据data量 and cluster规模, 合理设置shufflepartition数量, 一般建议partition数量 for CPUcore数 2-3倍.
- using广播Join: for 于 small 表, using广播Join可以避免data洗牌, improvingperformance.
- 避免usingcollect(): for 于large-scaledata集, 应避免usingcollect()operation, 以免导致Driver程序memory溢出.
- usingParquet or ORC格式: for 于structure化data, 建议usingParquet or ORC格式, 它们providing了更 good 压缩率 and queryperformance.
- 合理usingcache: 当DataFrame被 many 次using时, 应usingcache() or persist()method将其持久化.
- usingpartition表: for 于large-scaledata集, usingpartition表可以reducingdata扫描量, improvingqueryperformance.
- optimizationJoin顺序: 根据表 big small , 合理调整Join顺序, 将 small 表放 in before 面.
实践练习
练习1: DataFramebasicoperation
- from JSONfilecreationDataFrame.
- 显示DataFrame Schema and before 10行data.
- 选择指定 列.
- filter出满足条件 行.
- 按照指定 列forgroup and sort.
- 添加 new 列.
- 将结果写入Parquetfile.
练习2: SQLquery
- 将DataFrameregister for 临时视graph.
- usingSQLquery选择指定 列 and filter条件.
- usingSQLqueryforgroup, aggregate and sort.
- usingSQLquery执行Joinoperation.
- usingSQLquerycreation new 表.
练习3: Datasetoperation
- 定义Case Class.
- from collectioncreationDataset.
- usingDataset APIforfilter, map and aggregateoperation.
- 将Dataset转换 for DataFrame and RDD.
练习4: performanceoptimization
- test不同 partition数量 for queryperformance 影响.
- test广播Join and 普通Join performancediff.
- testParquet and JSON格式 performancediff.
- usingEXPLAINcommands查看query计划, analysisoptimization效果.
11. commonissues及solution
11.1 Schema不匹配
issues: 读取file时出现Schema不匹配 error.
solution:
- usinginferSchema选项自动推断Schema.
- 显式指定Schema, 确保 and data格式匹配.
- usingmode选项processing不匹配 记录, such asPERMISSIVE (默认, ignoreerror记录) , DROPMALFORMED (discarderror记录) , FAILFAST (遇 to error记录立即失败) .
11.2 memory溢出
issues: 执行Spark SQLquery时出现OutOfMemoryError.
solution:
- 增加Driver or Executor memoryconfiguration.
- reducingshufflepartition数量.
- 避免usingcollect()operationprocessinglarge-scaledata.
- using广播Joinprocessing small 表.
11.3 query执行缓 slow
issues: Spark SQLquery执行缓 slow .
solution:
- checkdata is 否倾斜, using加盐, 自定义partition器etc.方式解决.
- 调整shufflepartition数量.
- using广播Joinprocessing small 表.
- check is 否 has data倾斜, usingEXPLAINcommands查看query计划.
- usingParquet or ORC格式storedata.
- 合理usingcache.
12. summarized
本tutorial详细介绍了Spark SQL basicconcepts and architecture, includingDataFrame and Dataset API, 以及Spark SQL queryoptimizationtechniques. Spark SQL is Spark用于processingstructure化data coremodule, 它providing了统一 data访问方式 and optimization 执行引擎, 可以processing各种structure化datasources.
through本tutorial Learning, 您应该able to:
- understandingSpark SQL basicconcepts and architecture.
- MasterDataFrame creation and operation.
- MasterDataset creation and operation.
- usingSpark SQL执行SQLquery.
- understandingSpark SQL optimizationtechniques.
- 遵循Spark SQL best practices.
in after 续tutorialin, 我们将深入LearningSpark Streaming, MLlib, GraphXetc.advancedcomponent, MasterSpark 各种application场景 and best practices.