spark-数据集(RDD,DataFrame,DataSet)

发布于:2025-03-19 ⋅ 阅读:(16) ⋅ 点赞:(0)

基本概念

RDD

RDD是Spark提供的核心抽象,是理解Spark最最核心的概念,是Resillient Distributed Dataset,即弹性分布式数据集。所以名字中就包含了RDD的所有的特征:弹性、分布式、数据集。

DataFrame

DataFrame的前身是SchemaRDD。 Spark1.3更名为DataFrame。不继承RDD,自己实现了RDD的大部分功能。与RDD类似,DataFrame也是一个分布式数据集:

DataFrame可以看做分布式Row对象的集合,提供了由列组成的详细Schema信息,使其可以得到优化。DataFrame 不仅有比RDD更多的算子,还可以进行执行计划的优化

DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema

DataFrame也支持嵌套数据类型(struct、 array和map)。DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。DataFrame的劣势在于在编译期缺少类型安全检查,导致运行时出错

DataSet

DataSet是在Spark1.6中添加的新的接口;与RDD相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表;与DataFrame相比,保存了类型信息,是强类型的,提供了编译时类型检查;    调用Dataset的方法先会生成逻辑计划,然后Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行。     DataSet包含了DataFrame的功能,在Spark2.0中两者得到了统一:  DataFrame表示为DataSet[Row],即DataSet的子集。

Row & Schema

在理解上,可以理解DataFrame = RDD[Row] + Schema;DataFrame 的前身是 SchemaRDD ,Row是一个泛化的无类型 JVM object,可以理解为一行具体的数据集合

  def main(args: Array[String]): Unit = {
    // 导入 Row 类
    import org.apache.spark.sql.Row
    val row = Row(1, "abc", 1.2)
    // Row 的访问方法
    println(row(0))
    println(row(1))
    println(row(2))
  }

执行结果

而DataFrame是带有Schema信息的RDD,Spark通过Schema就能够读懂数据。

什么是schema?

DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,DataFrame中的数据结构信息,即为schema。

StructField("name", StringType, false)  字段名,字段类型,是否为空

def main(args: Array[String]): Unit = {
  // 多种方式定义schema,其核心是StructType
  import org.apache.spark.sql.types._
  // 来自官方帮助文档
  val schema1 = StructType(StructField("name", StringType, false) ::
    StructField("age", IntegerType, false) ::
    StructField("height", IntegerType,false) :: Nil)
  
  val schema2 = StructType( Seq(StructField("name", StringType, false),
    StructField("age", IntegerType, false),
    StructField("height", IntegerType, false))
  )
  
  val schema3 = StructType( List(StructField("name", StringType, false),
    StructField("age",IntegerType, false),
    StructField("height",IntegerType, false))
  )

  val schema4 = new StructType()
    .add(StructField("name", StringType, false))
    .add(StructField("age", IntegerType, false))
    .add(StructField("height", IntegerType, false))
  
  val schema5 = new StructType()
    .add("name", StringType, true, "comment1")
    .add("age", IntegerType, false, "comment2")
    .add("height", IntegerType, true, "comment3")
}

打印schema1-5 执行结果

RDD、DataFrame、Dataset的异同

三者的共性:

RDD、DataFrame、Dataset都是 Spark 平台下的分布式弹性数据集,为处理海量数据提供便利

三者都有许多相同的概念,如分区、持久化、容错等;有许多共同的函数,如map、filter,sortBy等

三者都有惰性机制,只有在遇到 Action 算子时,才会开始真正的计算

对DataFrame和Dataset进行操作许多操作都需要这个包进行支持,如:import spark.implicits._ 

三者的区别:

与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值

DataFrame与Dataset均支持 SparkSQL 的操作

Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同;

DataFrame 定义为 Dataset[Row]。每一行的类型是Row,然后在Row包含具体的字段信息

Dataset每一行的类型都是一个case class,在自定义了case class之后可以很自由的获得每一行的信息;

IDEA创建代码

由range生成ds

  def range(start : scala.Long, end : scala.Long) : org.apache.spark.sql.Dataset[java.lang.Long] = { /* compiled code */ }

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder()
      .appName("range to ds")
      .master("local")
      .getOrCreate()
    // 隐式转换
    import spark.implicits._

    val numDS = spark.range(5, 100, 5)
    // orderBy 转换操作;desc:function;show:Action
    import org.apache.spark.sql.functions._
    numDS.orderBy(desc("id")).show(5)
    // 统计信息
    numDS.describe().show
    // 显示schema信息
    numDS.printSchema
    // 使用RDD执行同样的操作
    numDS.rdd.map(_.toInt).stats
    // 检查分区数
    numDS.rdd.getNumPartitions
  }
由集合生成df
  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder()
      .appName("list to df")
      .master("local")
      .getOrCreate()
    // 隐式转换
    import spark.implicits._


    val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    val df1 = spark.createDataFrame(lst)
    // 改单个字段名时简便
    .withColumnRenamed("_1", "name1")
      .withColumnRenamed("_2", "age1")
      .withColumnRenamed("_3", "height1")
    df1.orderBy("age1").show(10)
    // desc是函数,在IDEA中使用是需要导包

    import org.apache.spark.sql.functions._

    df1.orderBy(desc("age1")).show(10)
    // 修改整个DF的列名
    val df2 = spark.createDataFrame(lst).toDF("name", "age", "height")

  }

执行结果

由集合生成ds
  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder()
      .appName("seq to ds")
      .master("local")
      .getOrCreate()
    // 隐式转换
    import spark.implicits._


    // 注意 Seq 中元素的类型
    val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
    val ds1 = spark.createDataset(seq1)
    // 显示schema信息
    ds1.printSchema
    ds1.show
    // 可以看出有无schema的区别
    val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    val ds2 = spark.createDataset(seq2)
    // 显示schema信息
    ds2.printSchema
    ds2.show


  }
  case class Person(name: String, age: Int, height: Int)

执行结果(ds1)

执行结果(ds2)

由文件创建df
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("from file to df")
      .getOrCreate()

    // 隐式转换
    import spark.implicits._

    // 直接读取没有办法设置合适的字段名
    val csv = spark.read.csv("src/main/resources/data/people.csv")
    csv.printSchema()
    csv.show()

    // 注意不能使用分号,要使用空格
    val schema = "name String,age Int,height Int"
    // 显示的指定schema
    val csv1 = spark.read
      .schema(schema)
      .csv("src/main/resources/data/people.csv")
    csv1.printSchema()
    csv1.show()


    val csv2 = spark.read
      .option("delimiter", ",")
      .option("header", "true")
      // 根据首行可以推断出schema信息
      .option("inferschema","true")
      // 显示的指定schema
//      .schema(schema)
      .csv("src/main/resources/data/people.csv")

    csv2.printSchema()
    csv2.show()
  }
RDD, DF, DS转换
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local")
      .appName("transformation")
      .getOrCreate()

    // 隐式转换
    import spark.implicits._
    // 直接通过SparkSession获取SparkContext
    val sc = spark.sparkContext

    val arr = Array(("Jack", 28, 184), ("Andy", 16, 177), ("Bob", 42, 165))
   
    // rdd => df     toDF
    val df = spark.createDataFrame(arr).toDF("name", "age", "height")
    //    sc.makeRDD(arr).toDF("name","age","height")
    df.show()

    // df => rdd      rdd
    val rdd1: RDD[Row] = df.rdd
    rdd1.foreach(println)

    // rdd => ds      toDS
    val ds = sc.makeRDD(arr)
      .map(f => Student(f._1, f._2, f._3))
      .toDS()
    ds.show()

    // ds => rdd      rdd
    val rdd: RDD[Student] = ds.rdd
    rdd.foreach(println)

    // df => ds     as
    val value: Dataset[Row] = df.as("demo")
    value.show()

    // ds => df     toDF
    val frame: DataFrame = ds.toDF()
    frame.show()
  }

  case class Student(name: String, age: Int, height: Int)