Spark SQL and DataFrame

LearningSpark SQL basicconcepts and architecture, MasterDataFrame and Dataset API, 以及Spark SQL queryoptimizationtechniques

Spark SQL and DataFrame

1. Spark SQL overview

Spark SQL is Spark用于processingstructure化data module, 它providing了两种主要 programmingabstraction: DataFrame and Dataset, 以及用于执行SQLquery commands行interface and JDBC/ODBCserver. Spark SQL 主要特点including:

  • 统一data访问: 可以using相同 API访问不同 datasources, such asHDFS, Hive, Avro, Parquet, JSONetc..
  • 混合执行: 可以 in 同一个application程序in混合usingSQLquery and Spark otherAPI (such asRDD, DataFrame, Dataset) .
  • optimization执行: in 置了基于代价 queryoptimization器 (Catalyst optimizationr) , 可以自动optimizationquery计划.
  • 兼容Hive: supportHiveQL语法 and Hive UDF, 可以直接访问现 has Hive表.
  • 标准连接: providing了JDBC and ODBCinterface, 可以 and 传统 BItool集成.

2. Spark SQL architecture

Spark SQL architecture主要including以 under component:

  • Catalyst optimizationr: 基于代价 queryoptimization器, 负责将SQLquery转换 for high 效 执行计划.
  • Spark SQL core: processingSQLquery corecomponent, including解析器, analysis器, optimization器 and 执行器.
  • DataFrame API: providing了面向object programminginterface, 用于processingstructure化data.
  • Dataset API: 结合了RDD 强class型features and DataFrame optimization执行features.
  • Hivesupport: 允许访问现 has Hive表 and usingHiveQL语法.
  • JDBC/ODBCserver: 允许 out 部toolthrough标准 JDBC and ODBCinterface访问Spark SQL.

提示

Spark SQL optimization器 (Catalyst optimizationr) is 其core竞争力之一, 它可以自动optimizationquery计划, including谓词 under 推, 列裁剪, 常量fold, Join重sortetc.optimizationtechniques, from 而improvingqueryperformance.

3. DataFrame Introduction

DataFrame is a distributed datacollection, 它将data组织成命名列 形式, class似于relationships型datalibraryin 表. DataFrame具 has 以 under 特点:

  • structure化: DataFramein data具 has 预定义 schema, 即列名 and dataclass型.
  • 不可变: DataFrame一旦creation, 就不能被modify, 只能through转换operationcreation new DataFrame.
  • distributed: DataFrame datastore in cluster many 个node on , 可以parallelprocessing.
  • optimization执行: DataFrame operation会被Catalyst optimizationr自动optimization, 执行efficiency high 于RDD.
  • support many 种datasources: 可以 from many 种datasourcescreationDataFrame, such asfile, datalibrary, RDDetc..

4. creation DataFrame

in Sparkin, 可以through以 under 几种方式creationDataFrame:

4.1 from filecreation

可以 from 各种file格式creationDataFrame, such asJSON, CSV, Parquet, Avroetc..

// Scalaexample
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Spark SQL Example")
  .master("local[*]")
  .getOrCreate()

//  from JSONfilecreationDataFrame
val df1 = spark.read.json("hdfs://path/to/people.json")

//  from CSVfilecreationDataFrame
val df2 = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("hdfs://path/to/people.csv")

//  from ParquetfilecreationDataFrame
val df3 = spark.read.parquet("hdfs://path/to/people.parquet")

// Pythonexample
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark SQL Example") \
    .master("local[*]") \
    .getOrCreate()

#  from JSONfilecreationDataFrame
df1 = spark.read.json("hdfs://path/to/people.json")

#  from CSVfilecreationDataFrame
df2 = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("hdfs://path/to/people.csv")

#  from ParquetfilecreationDataFrame
df3 = spark.read.parquet("hdfs://path/to/people.parquet")

4.2 from RDDcreation

可以 from RDDcreationDataFrame, has 两种方式:

  1. using反射推断Schema: 适用于已知dataclass型 circumstances.
  2. 显式指定Schema: 适用于未知dataclass型 or 需要精确控制Schema circumstances.
// Scalaexample
// 定义Case Class
case class Person(name: String, age: Int)

//  from RDDcreationDataFrame (using反射推断Schema) 
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = spark.createDataFrame(rdd)

//  from RDDcreationDataFrame (显式指定Schema) 
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val rdd = spark.sparkContext.parallelize(Seq(Row("Alice", 25), Row("Bob", 30)))
val schema = StructType(Seq(
  StructField("name", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true)
))
val df = spark.createDataFrame(rdd, schema)

// Pythonexample
#  from RDDcreationDataFrame (using反射推断Schema) 
from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([Row(name="Alice", age=25), Row(name="Bob", age=30)])
df = spark.createDataFrame(rdd)

#  from RDDcreationDataFrame (显式指定Schema) 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

rdd = spark.sparkContext.parallelize([("Alice", 25), ("Bob", 30)])
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)

4.3 from Hive表creation

可以直接 from 现 has Hive表creationDataFrame.

// Scalaexample
val df = spark.table("hive_table_name")

//  or 者usingSQLquery
val df = spark.sql("SELECT * FROM hive_table_name")

// Pythonexample
df = spark.table("hive_table_name")

#  or 者usingSQLquery
df = spark.sql("SELECT * FROM hive_table_name")

5. DataFrame operation

DataFramesupport两种class型 operation: 转换operation (Transformation) and 行动operation (Action) .

5.1 转换operation

转换operation is 指 from 一个DataFrame生成一个 new DataFrame operation, 转换operation is 惰性 , 只 has 当行动operation被执行时, 转换operation才会practical执行. common 转换operationincluding:

  • select(): 选择指定 列.
  • filter(): filter出满足条件 行.
  • where(): and filter()functions相同, 用于filter行.
  • groupBy(): 按照指定 列forgroup.
  • orderBy(): 按照指定 列forsort.
  • join(): 将两个DataFrame按照指定 条件连接.
  • withColumn(): 添加 or modify列.
  • drop(): delete指定 列.
  • distinct(): 返回去重 after DataFrame.
  • limit(): 返回 before n行data.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")

// 选择指定 列
val selectedDF = df.select("name", "age")

// filter出年龄 big 于25 人
val filteredDF = df.filter(df("age") > 25)

// 按照年龄group, 计算每组 人数
val groupedDF = df.groupBy("age").count()

// 按照年龄sort
val sortedDF = df.orderBy(df("age").desc)

// 添加 new 列
val newDF = df.withColumn("age_plus_10", df("age") + 10)

// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")

# 选择指定 列
selected_df = df.select("name", "age")

# filter出年龄 big 于25 人
filtered_df = df.filter(df["age"] > 25)

# 按照年龄group, 计算每组 人数
grouped_df = df.groupBy("age").count()

# 按照年龄sort
sorted_df = df.orderBy(df["age"].desc())

# 添加 new 列
new_df = df.withColumn("age_plus_10", df["age"] + 10)

5.2 行动operation

行动operation is 指触发计算并返回结果 to Driver程序 or 写入 out 部store operation. common 行动operationincluding:

  • show(): 显示DataFrame in 容.
  • count(): 返回DataFramein 行数.
  • first(): 返回DataFramein 第一行.
  • head(n): 返回DataFramein before n行.
  • collect(): 将DataFramein 所 has datapull to Driver程序in, serving as一个array.
  • write(): 将DataFrame写入 out 部store.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")

// 显示DataFrame  in 容
df.show()

// 显示 before 10行
df.show(10)

// 返回行数
val count = df.count()

// 返回第一行
val firstRow = df.first()

// 返回 before 5行
val headRows = df.head(5)

// 将DataFrame写入Parquetfile
df.write.parquet("hdfs://path/to/output.parquet")

// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")

# 显示DataFrame  in 容
df.show()

# 显示 before 10行
df.show(10)

# 返回行数
count = df.count()

# 返回第一行
first_row = df.first()

# 返回 before 5行
head_rows = df.head(5)

# 将DataFrame写入Parquetfile
df.write.parquet("hdfs://path/to/output.parquet")

6. Dataset API

Dataset is Spark 1.6引入 new abstraction, 它结合了RDD 强class型features and DataFrame optimization执行features. Dataset仅 in Scala and Javain可用, Python and Rin不support.

6.1 creation Dataset

可以through以 under 几种方式creationDataset:

// Scalaexample
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Dataset Example")
  .master("local[*]")
  .getOrCreate()

// import隐式转换
import spark.implicits._

//  from collectioncreationDataset
val ds1 = Seq(("Alice", 25), ("Bob", 30)).toDS()

// 定义Case Class
case class Person(name: String, age: Int)
val ds2 = Seq(Person("Alice", 25), Person("Bob", 30)).toDS()

//  from DataFrame转换 for Dataset
val df = spark.read.json("hdfs://path/to/people.json")
val ds3 = df.as[Person]

//  from RDD转换 for Dataset
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds4 = rdd.toDS()

6.2 Dataset operation

Datasetsupport and DataFrameclass似 operation, 同时还supportRDD operation, such asmap, filter, reduceetc..

// Scalaexample
import spark.implicits._

case class Person(name: String, age: Int)
val ds = Seq(Person("Alice", 25), Person("Bob", 30), Person("Charlie", 35)).toDS()

// usingDataFrame风格 operation
val filteredDS1 = ds.filter(ds("age") > 25)
val selectedDS1 = ds.select("name")

// usingDataset风格 operation
val filteredDS2 = ds.filter(p => p.age > 25)
val selectedDS2 = ds.map(p => p.name)

// 显示结果
filteredDS2.show()
selectedDS2.show()

7. Spark SQL query

Spark SQLsupportusingSQL语法querydata, 可以through以 under 几种方式执行SQLquery:

7.1 usingspark.sql()method

可以usingspark.sql()method执行SQLquery, 返回结果 for DataFrame.

// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")

// 将DataFrameregister for 临时视graph
df.createOrReplaceTempView("people")

// 执行SQLquery
val resultDF = spark.sql("SELECT name, age FROM people WHERE age > 25 ORDER BY age DESC")

// 显示结果
resultDF.show()

// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")

# 将DataFrameregister for 临时视graph
df.createOrReplaceTempView("people")

# 执行SQLquery
result_df = spark.sql("SELECT name, age FROM people WHERE age > 25 ORDER BY age DESC")

# 显示结果
result_df.show()

7.2 usingSQLfile

可以 from out 部file加载SQLquery并执行.

// Scalaexample
import scala.io.Source

//  from file读取SQLquery
val sqlQuery = Source.fromFile("path/to/query.sql").mkString

// 执行SQLquery
val resultDF = spark.sql(sqlQuery)

// 显示结果
resultDF.show()

// Pythonexample
with open("path/to/query.sql", "r") as f:
    sql_query = f.read()

# 执行SQLquery
result_df = spark.sql(sql_query)

# 显示结果
result_df.show()

7.3 临时视graph and 全局临时视graph

Spark SQLsupport两种class型 临时视graph:

  • 临时视graph (Temp View) : 仅 in 当 before SparkSessionin可见, 当SparkSession关闭时, 临时视graph会被delete.
  • 全局临时视graph (Global Temp View) : in 所 has SparkSessionin可见, 需要usingglobal_temp. before 缀访问, 当Sparkapplication程序结束时, 全局临时视graph会被delete.
// Scalaexample
val df = spark.read.json("hdfs://path/to/people.json")

// creation临时视graph
df.createOrReplaceTempView("people")

// creation全局临时视graph
df.createOrReplaceGlobalTempView("global_people")

// query临时视graph
spark.sql("SELECT * FROM people").show()

// query全局临时视graph
spark.sql("SELECT * FROM global_temp.global_people").show()

// Pythonexample
df = spark.read.json("hdfs://path/to/people.json")

# creation临时视graph
df.createOrReplaceTempView("people")

# creation全局临时视graph
df.createOrReplaceGlobalTempView("global_people")

# query临时视graph
spark.sql("SELECT * FROM people").show()

# query全局临时视graph
spark.sql("SELECT * FROM global_temp.global_people").show()

8. Spark SQL optimization

Spark SQL in 置了基于代价 queryoptimization器 (Catalyst optimizationr) , 可以自动optimizationquery计划. 以 under is 一些common optimizationtechniques:

  • 谓词 under 推: 将filter条件push to datasources, reducingdata读取量.
  • 列裁剪: 只读取queryin需要 列, reducingdata读取量.
  • 常量fold: in 编译时计算常量表达式, reducingrun时计算量.
  • Join重sort: 根据表 big small 重 new sortJoin顺序, reducingin间结果 big small .
  • 广播Join: for 于 small 表, 将其广播 to 所 has node, 避免data洗牌.
  • code生成: 将query计划编译 for Javabytecode, improving执行efficiency.

8.1 optimizationconfiguration

可以through以 under configurationparameteroptimizationSpark SQL performance:

//  in spark-defaults.confin添加以 under configuration
spark.sql.autoBroadcastJoinThreshold 10485760  # 广播Join 阈值, 默认10MB
spark.sql.shuffle.partitions 200  # shufflepartition数量, 默认200
spark.sql.sources.partitionColumnTypeInference.enabled true  #  is 否自动推断partition列class型
spark.sql.caseSensitive false  #  is 否区分 big  small 写, 默认false
spark.sql.parquet.compression.codec snappy  # Parquetfile 压缩编码, 默认snappy
spark.sql.execution.arrow.enabled true  #  is 否启用Arrowoptimization, 默认true (Spark 3.0+) 

9. DataFrame and RDD 比较

DataFrame and RDD is Spark 两种主要programmingabstraction, 它们各 has Pros and Cons:

features RDD DataFrame
dataclass型 强class型 弱class型 (Schema)
optimization 无自动optimization in 置Catalystoptimization器
API low 级API, flexible advancedAPI, 简洁
performance 较 low 较 high
适用场景 非structure化data, complex 计算 structure化data, SQLquery

10. Spark SQL best practices

in usingSpark SQL时, 应遵循以 under best practices:

  • 优先usingDataFrame/Dataset: for 于structure化data, 优先usingDataFrame or Dataset API, 它们providing了更 good performanceoptimization.
  • 合理设置partition数量: 根据data量 and cluster规模, 合理设置shufflepartition数量, 一般建议partition数量 for CPUcore数 2-3倍.
  • using广播Join: for 于 small 表, using广播Join可以避免data洗牌, improvingperformance.
  • 避免usingcollect(): for 于large-scaledata集, 应避免usingcollect()operation, 以免导致Driver程序memory溢出.
  • usingParquet or ORC格式: for 于structure化data, 建议usingParquet or ORC格式, 它们providing了更 good 压缩率 and queryperformance.
  • 合理usingcache: 当DataFrame被 many 次using时, 应usingcache() or persist()method将其持久化.
  • usingpartition表: for 于large-scaledata集, usingpartition表可以reducingdata扫描量, improvingqueryperformance.
  • optimizationJoin顺序: 根据表 big small , 合理调整Join顺序, 将 small 表放 in before 面.

实践练习

练习1: DataFramebasicoperation

  1. from JSONfilecreationDataFrame.
  2. 显示DataFrame Schema and before 10行data.
  3. 选择指定 列.
  4. filter出满足条件 行.
  5. 按照指定 列forgroup and sort.
  6. 添加 new 列.
  7. 将结果写入Parquetfile.

练习2: SQLquery

  1. 将DataFrameregister for 临时视graph.
  2. usingSQLquery选择指定 列 and filter条件.
  3. usingSQLqueryforgroup, aggregate and sort.
  4. usingSQLquery执行Joinoperation.
  5. usingSQLquerycreation new 表.

练习3: Datasetoperation

  1. 定义Case Class.
  2. from collectioncreationDataset.
  3. usingDataset APIforfilter, map and aggregateoperation.
  4. 将Dataset转换 for DataFrame and RDD.

练习4: performanceoptimization

  1. test不同 partition数量 for queryperformance 影响.
  2. test广播Join and 普通Join performancediff.
  3. testParquet and JSON格式 performancediff.
  4. usingEXPLAINcommands查看query计划, analysisoptimization效果.

11. commonissues及solution

11.1 Schema不匹配

issues: 读取file时出现Schema不匹配 error.
solution:

  • usinginferSchema选项自动推断Schema.
  • 显式指定Schema, 确保 and data格式匹配.
  • usingmode选项processing不匹配 记录, such asPERMISSIVE (默认, ignoreerror记录) , DROPMALFORMED (discarderror记录) , FAILFAST (遇 to error记录立即失败) .

11.2 memory溢出

issues: 执行Spark SQLquery时出现OutOfMemoryError.
solution:

  • 增加Driver or Executor memoryconfiguration.
  • reducingshufflepartition数量.
  • 避免usingcollect()operationprocessinglarge-scaledata.
  • using广播Joinprocessing small 表.

11.3 query执行缓 slow

issues: Spark SQLquery执行缓 slow .
solution:

  • checkdata is 否倾斜, using加盐, 自定义partition器etc.方式解决.
  • 调整shufflepartition数量.
  • using广播Joinprocessing small 表.
  • check is 否 has data倾斜, usingEXPLAINcommands查看query计划.
  • usingParquet or ORC格式storedata.
  • 合理usingcache.

12. summarized

本tutorial详细介绍了Spark SQL basicconcepts and architecture, includingDataFrame and Dataset API, 以及Spark SQL queryoptimizationtechniques. Spark SQL is Spark用于processingstructure化data coremodule, 它providing了统一 data访问方式 and optimization 执行引擎, 可以processing各种structure化datasources.

through本tutorial Learning, 您应该able to:

  • understandingSpark SQL basicconcepts and architecture.
  • MasterDataFrame creation and operation.
  • MasterDataset creation and operation.
  • usingSpark SQL执行SQLquery.
  • understandingSpark SQL optimizationtechniques.
  • 遵循Spark SQL best practices.

in after 续tutorialin, 我们将深入LearningSpark Streaming, MLlib, GraphXetc.advancedcomponent, MasterSpark 各种application场景 and best practices.