15.1 从RDD到DataFrame
内容讲解
虽然RDD提供了强大的灵活性,但它缺少对数据结构的感知。对于结构化数据(如来自数据库的表、JSON文件、Parquet文件),RDD将其视为普通的Java/Python对象集合,无法利用数据内在的模式(Schema)信息进行优化。为了解决这个问题,Spark引入了DataFrame。
DataFrame是Spark SQL提供的核心数据抽象,它是一个带有Schema信息的、分布式的、表格型的数据集。你可以将DataFrame看作是关系型数据库中的一张表,或者R/Python中的一个data frame。
DataFrame相比RDD的优势:
- 结构化数据:DataFrame带有Schema,即每列都有名称和类型。这使得代码更具可读性,也更容易操作特定列。
- 性能优化:Spark SQL的Catalyst优化器能够理解DataFrame上的操作。它会分析用户的查询计划,并进行一系列复杂的优化,如谓词下推(Predicate Pushdown)、列剪裁(Column Pruning)等,然后生成高效的物理执行计划。这些优化对于RDD是无法做到的。
- 统一的数据源API:Spark SQL提供了一个统一的
DataFrameReader
和DataFrameWriter
接口,可以轻松地从多种数据源(JSON, CSV, Parquet, JDBC等)读取数据创建DataFrame,并将DataFrame写入到这些数据源中。 - 多语言支持:DataFrame API在Scala, Java, Python, R中是统一的,降低了学习成本。
Dataset:
Dataset是DataFrame的扩展,它在DataFrame的基础上增加了类型安全(Type Safety)。Dataset API只在Scala和Java中可用。一个DataFrame
实际上是Dataset[Row]
的类型别名,其中Row
是一个通用的、无类型的JVM对象。而一个Dataset[T]
(T是用户定义的Case Class或Java Bean)则在编译时就具有类型信息,可以避免某些运行时错误。
关系:RDD -> DataFrame -> Dataset
- RDD:无结构,灵活但无优化。
- DataFrame (
Dataset[Row]
):有结构(Schema),性能高(Catalyst优化),但无编译时类型安全。 - Dataset (
Dataset[T]
):有结构,性能高,且有编译时类型安全(仅限JVM语言)。
在Python中,由于其动态类型特性,DataFrame是主要的数据抽象。
15.2 DataFrame API与Spark SQL
内容讲解
操作DataFrame主要有两种方式:DataFrame API和纯SQL语句。
DataFrame API
这是一种领域特定语言(DSL),允许用户以编程的方式构建查询。它提供了一系列类似于SQL操作的方法。
- 选择与过滤:
select()
,filter()
,where()
- 聚合:
groupBy()
,agg()
,count()
,sum()
,avg()
- 排序:
orderBy()
,sort()
- 连接:
join()
- 列操作:
withColumn()
,drop()
Spark SQL
Spark SQL允许用户直接编写SQL语句来查询DataFrame。要使用SQL,你需要先将一个DataFrame注册为一个临时视图(Temporary View)。
df.createOrReplaceTempView("view_name")
:创建一个只在当前SparkSession
内有效的临时视图。df.createOrReplaceGlobalTempView("view_name")
:创建一个在整个Spark应用内都有效的全局临时视图,需要用global_temp.view_name
来引用。
之后,就可以使用spark.sql("SELECT ... FROM view_name ...")
来执行SQL查询,其返回结果仍然是一个DataFrame。
代码示例
使用Python (PySpark) 进行DataFrame操作
from pyspark.sql import SparkSession