基本概念
RDD
RDD是Spark提供的核心抽象,是理解Spark最最核心的概念,是Resillient Distributed Dataset,即弹性分布式数据集。所以名字中就包含了RDD的所有的特征:弹性、分布式、数据集。
DataFrame
DataFrame的前身是SchemaRDD。 Spark1.3更名为DataFrame。不继承RDD,自己实现了RDD的大部分功能。与RDD类似,DataFrame也是一个分布式数据集:
DataFrame可以看做分布式Row对象的集合,提供了由列组成的详细Schema信息,使其可以得到优化。DataFrame 不仅有比RDD更多的算子,还可以进行执行计划的优化
DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
DataFrame也支持嵌套数据类型(struct、 array和map)。DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。DataFrame的劣势在于在编译期缺少类型安全检查,导致运行时出错
DataSet
DataSet是在Spark1.6中添加的新的接口;与RDD相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表;与DataFrame相比,保存了类型信息,是强类型的,提供了编译时类型检查; 调用Dataset的方法先会生成逻辑计划,然后Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行。 DataSet包含了DataFrame的功能,在Spark2.0中两者得到了统一: DataFrame表示为DataSet[Row],即DataSet的子集。
Row & Schema
在理解上,可以理解DataFrame = RDD[Row] + Schema;DataFrame 的前身是 SchemaRDD ,Row是一个泛化的无类型 JVM object,可以理解为一行具体的数据集合
def main(args: Array[String]): Unit = {
// 导入 Row 类
import org.apache.spark.sql.Row
val row = Row(1, "abc", 1.2)
// Row 的访问方法
println(row(0))
println(row(1))
println(row(2))
}
执行结果
而DataFrame是带有Schema信息的RDD,Spark通过Schema就能够读懂数据。
什么是schema?
DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,DataFrame中的数据结构信息,即为schema。
StructField("name", StringType, false) 字段名,字段类型,是否为空
def main(args: Array[String]): Unit = {
// 多种方式定义schema,其核心是StructType
import org.apache.spark.sql.types._
// 来自官方帮助文档
val schema1 = StructType(StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("height", IntegerType,false) :: Nil)
val schema2 = StructType( Seq(StructField("name", StringType, false),
StructField("age", IntegerType, false),
StructField("height", IntegerType, false))
)
val schema3 = StructType( List(StructField("name", StringType, false),
StructField("age",IntegerType, false),
StructField("height",IntegerType, false))
)
val schema4 = new StructType()
.add(StructField("name", StringType, false))
.add(StructField("age", IntegerType, false))
.add(StructField("height", IntegerType, false))
val schema5 = new StructType()
.add("name", StringType, true, "comment1")
.add("age", IntegerType, false, "comment2")
.add("height", IntegerType, true, "comment3")
}
打印schema1-5 执行结果
RDD、DataFrame、Dataset的异同
三者的共性:
RDD、DataFrame、Dataset都是 Spark 平台下的分布式弹性数据集,为处理海量数据提供便利
三者都有许多相同的概念,如分区、持久化、容错等;有许多共同的函数,如map、filter,sortBy等
三者都有惰性机制,只有在遇到 Action 算子时,才会开始真正的计算
对DataFrame和Dataset进行操作许多操作都需要这个包进行支持,如:import spark.implicits._
三者的区别:
与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值
DataFrame与Dataset均支持 SparkSQL 的操作
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同;
DataFrame 定义为 Dataset[Row]。每一行的类型是Row,然后在Row包含具体的字段信息
Dataset每一行的类型都是一个case class,在自定义了case class之后可以很自由的获得每一行的信息;
IDEA创建代码
由range生成ds
def range(start : scala.Long, end : scala.Long) : org.apache.spark.sql.Dataset[java.lang.Long] = { /* compiled code */ }
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("range to ds")
.master("local")
.getOrCreate()
// 隐式转换
import spark.implicits._
val numDS = spark.range(5, 100, 5)
// orderBy 转换操作;desc:function;show:Action
import org.apache.spark.sql.functions._
numDS.orderBy(desc("id")).show(5)
// 统计信息
numDS.describe().show
// 显示schema信息
numDS.printSchema
// 使用RDD执行同样的操作
numDS.rdd.map(_.toInt).stats
// 检查分区数
numDS.rdd.getNumPartitions
}
由集合生成df
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("list to df")
.master("local")
.getOrCreate()
// 隐式转换
import spark.implicits._
val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = spark.createDataFrame(lst)
// 改单个字段名时简便
.withColumnRenamed("_1", "name1")
.withColumnRenamed("_2", "age1")
.withColumnRenamed("_3", "height1")
df1.orderBy("age1").show(10)
// desc是函数,在IDEA中使用是需要导包
import org.apache.spark.sql.functions._
df1.orderBy(desc("age1")).show(10)
// 修改整个DF的列名
val df2 = spark.createDataFrame(lst).toDF("name", "age", "height")
}
执行结果
由集合生成ds
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("seq to ds")
.master("local")
.getOrCreate()
// 隐式转换
import spark.implicits._
// 注意 Seq 中元素的类型
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
val ds1 = spark.createDataset(seq1)
// 显示schema信息
ds1.printSchema
ds1.show
// 可以看出有无schema的区别
val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val ds2 = spark.createDataset(seq2)
// 显示schema信息
ds2.printSchema
ds2.show
}
case class Person(name: String, age: Int, height: Int)
执行结果(ds1)
执行结果(ds2)
由文件创建df
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("from file to df")
.getOrCreate()
// 隐式转换
import spark.implicits._
// 直接读取没有办法设置合适的字段名
val csv = spark.read.csv("src/main/resources/data/people.csv")
csv.printSchema()
csv.show()
// 注意不能使用分号,要使用空格
val schema = "name String,age Int,height Int"
// 显示的指定schema
val csv1 = spark.read
.schema(schema)
.csv("src/main/resources/data/people.csv")
csv1.printSchema()
csv1.show()
val csv2 = spark.read
.option("delimiter", ",")
.option("header", "true")
// 根据首行可以推断出schema信息
.option("inferschema","true")
// 显示的指定schema
// .schema(schema)
.csv("src/main/resources/data/people.csv")
csv2.printSchema()
csv2.show()
}
RDD, DF, DS转换
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("transformation")
.getOrCreate()
// 隐式转换
import spark.implicits._
// 直接通过SparkSession获取SparkContext
val sc = spark.sparkContext
val arr = Array(("Jack", 28, 184), ("Andy", 16, 177), ("Bob", 42, 165))
// rdd => df toDF
val df = spark.createDataFrame(arr).toDF("name", "age", "height")
// sc.makeRDD(arr).toDF("name","age","height")
df.show()
// df => rdd rdd
val rdd1: RDD[Row] = df.rdd
rdd1.foreach(println)
// rdd => ds toDS
val ds = sc.makeRDD(arr)
.map(f => Student(f._1, f._2, f._3))
.toDS()
ds.show()
// ds => rdd rdd
val rdd: RDD[Student] = ds.rdd
rdd.foreach(println)
// df => ds as
val value: Dataset[Row] = df.as("demo")
value.show()
// ds => df toDF
val frame: DataFrame = ds.toDF()
frame.show()
}
case class Student(name: String, age: Int, height: Int)