SparkSQL专题

发布于:2025-02-10 ⋅ 阅读:(47) ⋅ 点赞:(0)

前置内容

Spark的一个模块,用于处理结构化数据

特点

  1. 整合SQL、Spark,支持java,scala
  2. 数据源、不同格式的文件的连接获取数据方式相同
  3. 兼容HIve,可以进行整合
  4. 支持JDBC、ODBC

SparkSQL和HIve区别

区别 Hive SparkSQL
计算模型 磁盘迭代 内存迭代计算
元数据管理 有元数据管理 无元数据管理(但Spark本身有Catalog用于管理元数据)
底层运行框架 MapReduce SparkRDD(现在主要使用DataFrame和Dataset API)
SQL支持 支持SQL开发 支持SQL开发
SQL混合代码开发 不支持 支持(PySpark, Scala等)
Yarn支持 可以运行在Yarn上 可以运行在Yarn上

数据抽象

特性/组件 SparkCore - RDD SparkSQL - DataFrame SparkSQL - Dataset
引入版本 Spark 1.0 Spark 1.3 Spark 1.6
数据结构 无标准数据结构,存储任意类型数据 二维表数据结构,每行类型为Row 自定义数据结构,每行类型可指定(如case class)
数据访问 直接访问存储的数据 每列值需通过解析Row获取 可直接访问每行各字段
与Spark MLlib关系 一般与Spark MLlib同时使用 - -
SparkSQL支持 不支持SparkSQL操作 支持SparkSQL操作 支持SparkSQL操作
备注 - DataFrame是Dataset[Row]的特例 提供更高层次的抽象和类型安全

DataFrame

创建DF createDataFrame(RDD,DF结构)

需要定义StructType对象,指定DF所有列名和各自的类型

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object CreateDataFrame {
  def main(args: Array[String]): Unit = {
    //1.创建上下文配置文件对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("CreateDataFrame")
    //2.创建执行环境入口SparkSession对象
    /*
* 它的作用是检查当前SparkSession是否已经存在于SparkContext中(在Spark 2.x中,每个SparkContext最多只能有一个活跃的SparkSession)。
* 如果已经存在一个SparkSession,则 .getOrCreate() 方法会返回这个已存在的SparkSession实例。
* 如果不存在,它会根据之前通过 .config(conf) 方法设置的配置信息创建一个新的SparkSession实例,并返回这个新创建的实例。
* 这种设计允许Spark应用程序在需要时轻松地获取或创建一个SparkSession,而无需担心重复创建的问题。
    * */
    val sparkSession: SparkSession = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    //3.读取文件,映射创建RDD[Row]对象
    val rdd: RDD[Row] = sparkSession.sparkContext
      .textFile("data/sql/student.txt")
      .map(_.split(","))
      .map(array => Row(array(0).toInt, array(1).trim, array(2).toInt))
    //4.定义StructType对象,指定所有列名和各自的类型
    val schema: StructType = StructType(
      StructField("id", IntegerType, false) ::
        StructField("name", StringType, false) ::
        StructField("age", IntegerType, true) :: Nil)
    //5.基于rdd对象转为DataFrame
    val df: DataFrame = sparkSession.createDataFrame(rdd, schema)
    //6.打印df的表结构信息
    df.printSchema()
    //7.输出df中的数据
    df.show()
    //关闭
    sparkSession.stop()
  }
}

在这里插入图片描述

创建DF RDD.toDF(手动填入列名)

编写每个列的列名,将RDD中的数据依次填入

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object ToDFDemo1 {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]").setAppName("toDF")
    //2.创建SparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //4.添加隐式转换
    import sparkSession.implicits._
    //5.读取本地文件,并映射创建RDD
    val rdd: RDD[(Int, String, Int)] = sparkSession.sparkContext
      .textFile("data/sql/student.txt")
      //RDD[String]"1,tom,22"->RDD[Array[String]]
      .map(_.split(","))
      //RDD[Array[String]]->RDD[(Int, String, Int)]
      .map(arr => (arr(0).toInt, arr(1).trim, arr(2).toInt))
    //6.通过rdd.toDF(colNames: String*)
    //val df: DataFrame = rdd.toDF()//了解
    val df: DataFrame = rdd.toDF("id", "name", "age")
    //7.输出结构信息
    df.printSchema()
    //8.输出df中的数据
    /**show(numRows: Int, truncate: Boolean)
     * numRows:表示输出数据的行数,默认是20行.
     * truncate:表示输出时是否对列的值进行截取
     *   false:表示不截取
     *   true:表示截取,保留20个字符
     */
    //df.show()
    //df.show(2,false)
    df.show(10,false)
    //3.关闭spark
    sparkSession.stop()
  }
}

在这里插入图片描述

创建DF toDF(样例类)

创建一个scala的case class,作为DF的样例类
样例类:

package com.wunaiieq

//定义样例类
case class Student(id:Int,name:String,age:Int)

ToDF示例

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object ToDFDemo2 {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("toDF2")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.添加隐式转换
    import spark.implicits._
    //5.读取本地文件,并创建RDD[Student]
    val rdd: RDD[Student] = spark.sparkContext
      .textFile("data/sql/student.txt")
      .map(_.split(","))
      .map(arr => Student(arr(0).toInt, arr(1), arr(2).toInt))
    //6.通过rdd.toDF()转换为DataFrame
    val df: DataFrame = rdd.toDF()
    //7.输出df的结构信息和数据信息
    df.printSchema()
    df.show()
    //3.关闭spark
    spark.stop()
  }
}

DF转RDD DF.rdd

DataFrame本质是对RDD的封装
将一个DataFrame转换为RDD时,得到的RDD中的每个元素是一个row对象,表示df的一行,允许使用索引等方式获取这个row中的某个值。
逻辑上来看,可以将返回后的RDD看作一个二维数组

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object DataFrameToRdd {
  def main(args: Array[String]): Unit = {
    //1创建上下文环境配置对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("DataFrameToRdd")
    //2.创建执行环境入口对象SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //3.读取本地文本文件,映射并创建RDD[Row]对象
    val rdd: RDD[Student] = spark.sparkContext
      .textFile("data/sql/student.txt")
      .map(_.split(","))
      .map(ele => Student(ele(0).toInt, ele(1).trim, ele(2).toInt))
    //4.添加spark隐式转换
    import spark.implicits._
    //5.将Rdd转换为DF
    val dataFrame: DataFrame = rdd.toDF()
    //B1.将dataFrame转换为rdd对象
    val rdd1: RDD[Row] = dataFrame.rdd
    //B2.通过collect()获取rdd1中的数据
    val array: Array[Row] = rdd1.collect()
    //B3.输出结果
    println(array(1))
    val name: Any = array(0)(1)
    println(name.toString)
    println(array(0).getAs[String]("name"))
    println(array(0).getString(1))
    spark.stop()
  }
}

DataSet

dataset作为一个强类型的数据集合,需要提供对应的类型信息

创建DataSet 对象(任意类型).toDS

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}


object CreateDataSet {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("CreateDataSet")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.添加隐式转换
    import spark.implicits._
    //5.使用基本类型序列对象创建Dataset对象
    val ds1: Dataset[Int] = Seq(3, 6, 9, 12).toDS()
    //6.输出ds1的结构和数据
    ds1.printSchema()
    ds1.show()
    //7.使用样例类序列创建Dataset
    val ds2: Dataset[Student] = Seq(Student(1, "tuhao", 20), Student(2, "diaosi", 21)).toDS()
    ds2.printSchema()
    ds2.show()
    //8.使用样例类List创建Dataset
    val ds3: Dataset[Student] = List(Student(1, "tuhao", 20), Student(2, "diaosi", 21)).toDS()
    ds3.printSchema()
    ds3.show()
    //3.关闭spark
    spark.stop()
  }
}

在这里插入图片描述
在这里插入图片描述

DataSet 和RDD转换

1.RDD->DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。

2.DataSet->RDD
DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD。

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
object DataSetToRDD {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("DataSetToRDD")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.添加隐式转换
    import spark.implicits._
    //5.读取文件并使用样例类创建RDD对象
    val rdd: RDD[Student] = spark.sparkContext
      .textFile("data/sql/student.txt")
      //"1,tom,22"->Array("1","tom","22")
      .map(_.split(","))
      //Array(1,tom,22)->Student(1,"tom",22)
      .map(arr => Student(arr(0).toInt, arr(1), arr(2).toInt))
    //6.RDD->Dataset
    val ds: Dataset[Student] = rdd.toDS()
    ds.printSchema()
    ds.show()
    //7.DataSet->RDD
    val rdd1: RDD[Student] = ds.rdd
    //8.数组中的元素为Student类的对象,更加方便的获取数据
    val students: Array[Student] = rdd1.collect()
    println(students(0))
    println(students(0).id)
    println(students(0).name)
    println(students(0).age)
    //3.关闭spark
    spark.stop()
  }
}

在这里插入图片描述

DataSet 和DataFrame转换

DF是DS的特例,因此他们之间允许互相转换

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataFrameAndDataSet {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("DataFrameAndDataSet")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.添加隐式转换
    import spark.implicits._
    //5.读取文件并使用样例类创建RDD对象
    val rdd: RDD[Student] = spark.sparkContext
      .textFile("data/sql/student.txt")
      //"1,tom,22"->Array("1","tom","22")
      .map(_.split(","))
      //Array(1,tom,22)->Student(1,"tom",22)
      .map(arr => Student(arr(0).toInt, arr(1), arr(2).toInt))
    //B1.RDD->DataFrame
    val df: DataFrame = rdd.toDF()
    //B2.DataFrame->DataSet
    val ds: Dataset[Student] = df.as[Student]
    println("ds:"+ds)
    //B3.DataSet->DataFrame
    val df1: DataFrame = ds.toDF()
    println("df1:"+df1)
    //B4.RDD->DataSet
    val ds1: Dataset[Student] = rdd.toDS()
    println("ds1:"+ds1)
    //B5.DataSet->RDD
    val rdd1: RDD[Student] = ds.rdd
    //B6.DataFrame->RDD
    val rdd2: RDD[Row] = df.rdd
    //3.关闭spark
    spark.stop()
  }
}

在这里插入图片描述

读写文件

parquet

Parquet 是一种二进制文件格式,用于高效地存储和处理大规模数据集
idea无法直接读取此文件,可以下载一个插件
在这里插入图片描述
如果缺少parquet的示例文件,可以参考博客
博客

package com.wunaiieq.file

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object SSRWParquet {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSRWParquet")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.读取本地parquet文件,返回DataFrame对象
    //4.1方式一
    //format("parquet/csv/json/text/jdbc")指定读取文件的格式
    val df: DataFrame = spark.read.format("parquet")
     .load("data/sql/student.parquet")
    //4.2方式二
    //由于默认的读取的文件格式为parquet,所以还可以省略format("parquet")
    //如果spark.sql.sources.default被修改过,不能省略format("parquet")
    /*val df: DataFrame = spark.read
     .load("data/sql/student.parquet")*/
    //4.3方式三
//    val df: DataFrame = spark.read
//      .parquet("data/sql/student.parquet")
    df.printSchema()
    df.show()
    //5.写文件
    //   df.write.format("parquet")
    //    .save("data/sqlout/parquet")
    //由于默认的读取的文件格式为parquet,所以还可以省略format("parquet")
    //如果spark.sql.sources.default被修改过,不能省略format("parquet")
    //df.write.save("data/sqlout/parquet")
    //3.关闭spark
    spark.stop()
  }
}

在这里插入图片描述
在这里插入图片描述

text

text文件实际是txt文本文件,操作方法比较多,但最后基本都是调用format这个方法,记住一个就行

package com.wunaiieq.file

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object SSRText {
  def main(args: Array[String]): Unit = {
    //创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSRWParquet")
    //创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //format("parquet/csv/json/text/jdbc")指定读取文件的格式
    val df: DataFrame = spark.read.format("text")
     .load("data/sql/student.txt")

    df.printSchema()
    df.show()
    //5.写文件
    //append 如果存在则追加
    //overwrite 存在则覆盖
    //error 存在则抛出异常
    //ignore 文件存在则忽略
    df.write.mode("append").format("text")
    .save("data/sqlout/txt")
    //3.关闭spark
    spark.stop()
  }
}

JSON

package com.itbaizhan.sql


import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}


object SSRWJson {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSRWJson")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.读取本地json文件,返回DataFrame对象
    val df: DataFrame = spark.read.format("json")
     //设置被读取文件的字符集编码
     .option("encoding", "utf-8")
     .load("data/sql/student.json")
    //5.输出相关信息
    df.printSchema()
    df.show()
    //6.创建临时视图
    df.createTempView("student")
    //7.使用临时视图进行查询
    val dataFrame: DataFrame = spark.sql(
      """
        |select id,name,age
        |from student
        |where age between 23 and 50
        |""".stripMargin)
    dataFrame.show()
//    //8.写json文件(需要配置Hadoop)
//    df.write.mode("overwrite")
//      .format("json")
//      .save("data/sqlout/json")
//    //或者如下方式
//    df.write.mode("overwrite").json("data/sqlout/json")
    //3.关闭spark
    spark.stop()
  }
}

报错提示(已整理至专题)

Exception in thread “main” org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). 。。。。。
JSON文件必须紧凑排列,不要使用idea格式化
下面是几种JSON文件的格式,可以参考下,建议使用第4种,但此JSON文件idea会有格式错误的提示
JSON文件格式1
在这里插入图片描述
输出效果
在这里插入图片描述
JSON文件格式2
在这里插入图片描述
在这里插入图片描述
JSON文件格式3
在这里插入图片描述
在这里插入图片描述
JSON文件格式4
在这里插入图片描述
在这里插入图片描述

CSV

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}


object SSRWCsv {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSRWCsv")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.读取本地csv文件,返回DataFrame对象
    val df: DataFrame = spark.read.format("csv")
      //第一行为列名
      .option("header", true)
      //设置字段之间的分隔符,默认是“,”
      .option("delimiter", ";")
      //未设置前各个字段都是String类型,设置后匹配对应的类型
      .option("inferSchema","true")
      .option("encoding", "utf-8")
      //.load("data/sql/student.csv")
      .load("data/sql/student.csv")
    //val df: DataFrame = spark.read.csv("data/sql/student2.csv")
    df.printSchema()
    df.show()
    //5写文件
    df.write.mode(SaveMode.Overwrite)
      .option("header","true")
      .csv("data/sqlout/csv")
    //3.关闭spark
    spark.stop()
  }
}

在这里插入图片描述

JDBC(Mysql)

读取数据

package com.wunaiieq.file

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.util.Properties
object SSJdbcRead {
  def main(args: Array[String]): Unit = {
    //1创建上下文环境配置对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("jdbcRead")
    //2.创建执行环境入口对象SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    // 3.方式一:通过jdbc读取mysql数据库tmp_sql中的student
    //3.1.option("url", "jdbc连接的url字符串")
    //dbc:mysql://192.168.20.101:3306/tmp_sql? => jdbc:mysql://ip:port/dbname?
    //useSSL=false 是否使用SSL安全协议进行连接  false表示不使用,true表示使用
    //&useUnicode=true&characterEncoding=utf8传输数据使用字符集
    //编码,确保数据在传输过程中不出现乱码问题
    //   3.2.option("dbtable", "student")设置操作的表名称
    //   3.3.option("user", "root") 连接mysql数据库的用户名
    //   3.4.option("password", "password") 连接mysql数据库的密码
    //   3.5.load() 无参数  不需要指定具体的路径
    val df: DataFrame = spark.read.format("jdbc")
     .option("url", "jdbc:mysql://192.168.16.100:3306/tmp_sql?useSSL=false&useUnicode=true&characterEncoding=utf8")
     .option("driver", "com.mysql.jdbc.Driver")
     .option("user", "root")
     .option("password", "password")
     .option("dbtable", "student")
     .load()
    //4.方式二
//    val prop: Properties = new Properties()
//    prop.put("user", "root")
//    prop.put("password", "123456")
//    prop.put("driver", "com.mysql.jdbc.Driver")
//    val df: DataFrame = spark.read.jdbc("jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8",
//      "my_score", prop)
    df.printSchema()
    df.show()
    spark.stop()
  }
}

在这里插入图片描述
在这里插入图片描述

写入数据
创建一个样例类,作为写入模板
样例类

package com.wunaiieq

//定义成绩的样例类
case class Student(id:Int,name:String,age:Int)

写入类

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


import java.util.Properties
object SSJdbcWrite {
  def main(args: Array[String]): Unit = {
    //1创建上下文环境配置对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("jdbcWrite")
    //2.创建执行环境入口对象SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //3.添加隐式转换
    import spark.implicits._
    //4.使用样例类序列创建DataSet
    val ds: Dataset[Student] = Seq(Student(13,"A",15),
      Student(12,"B",12),Student(9,"C",83)).toDS()
    //5.将DataSet对象ds中数据写入到mysql的test实例的my_score表中
    //5.1方式一
    //A.mode("append") append表示追加写入,overwrite:表示覆盖写
    //B.format("jdbc") 使用jdbc协议和mysql进行通信
    //C.save() 无参数
    ds.write.mode("append").format("jdbc")
      .option("url", "jdbc:mysql://192.168.16.100:3306/tmp_sql?useSSL=false&useUnicode=true&characterEncoding=utf8")
      .option("user", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("password", "password")
      .option("dbtable", "student")
    .save()
//    //5.2方式二
//    val prop: Properties = new Properties()
//    prop.put("user", "root")
//    prop.put("password", "123456")
//    prop.put("driver", "com.mysql.jdbc.Driver")
//    ds.write.mode("append")
//      .jdbc("jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8",
//        "my_score",prop)
    spark.stop()
  }
}

数据清洗

数据去重dropDuplicates

有参数:保留的是每个 id 第一次出现的记录。如果有多个记录具有相同的 id 但不同的 name,则具体保留哪个 name 取决于哪个记录首先被处理,这通常与数据的分区和排序方式有关。

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object SSDropDuplicates {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSDropDuplicates")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.读取本地csv文件,返回DataFrame对象
    val df: DataFrame = spark.read.format("csv")
      //第一行为列名
      .option("header", true)
      //设置字段之间的分隔符,默认是“,”
      .option("delimiter", ",")
      //未设置前各个字段都是String类型,设置后匹配对应的类型
      .option("inferSchema","true")
      .option("encoding", "utf-8")
      .load("data/sql/student.csv")
    df.printSchema()
    df.show()
    println("---------无参数去重---------")
    //5,无参数去重,将所有列联合起来进行比较,只保留一条(第一条)
    df.dropDuplicates().show()
    //6.有参数去重,指定字段进行去重,保留的是每个 id 第一次出现的记录。如果有多个记录具有相同的 id 但不同的 name,则具体保留哪个 name 取决于哪个记录首先被处理,这通常与数据的分区和排序方式有关。
    println("---------指定字段进行去重---------")
    df.dropDuplicates("id").show()


    //3.关闭spark
    spark.stop()
  }
}

计算函数 functions

此处以explode为例
其他函数说明点此

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._


object SSFuntions {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSFuntions")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    import spark.implicits._
    //4.读取本地文件
    val df1: DataFrame = spark.read.text("data/sql/words.txt")
    //5."hello tom"=> "hello","tom"
    val colSplit: Column = split(df1("value"), " ")
    //6."hello","tom"=> "hello"
    //          "tom"
    val explodeColumn: Column = explode(colSplit)
    //7.对已经存在的列进行操作,返回一个新的列
    val df2: DataFrame = df1.withColumn("value", explodeColumn)
    //8.分组统计单词出现的次数,并降序排列
    df2.groupBy("value")
      .count()
      //为列名重命名
      .withColumnRenamed("value","word")
      .withColumnRenamed("count","cnt")
      //排序,按照单词出现的数量的倒叙排序
      .sort($"cnt".desc)
      .show()
    //3.关闭spark
    spark.stop()
  }
}

DSL(Domain Specific Language领域特定语言)

DSL:允许用户以类似 SQL 的方式在 Spark 中表达数据处理逻辑
以下为一个简单示例

package com.wunaiieq

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SparkSession}


object SSSQlApi {
  def main(args: Array[String]): Unit = {
    //1.创建配置文件对象
    val conf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("SSSQlApi")
    //2.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .config(conf).getOrCreate()
    //4.读取本地csv文件,返回DataFrame对象
    val df: DataFrame = spark.read.format("csv")
      //第一行为列名
      .option("header", true)
      //设置字段之间的分隔符,默认是“,”
      .option("delimiter", ",")
      //未设置前各个字段都是String类型,设置后匹配对应的类型
      .option("inferSchema","true")
      .option("encoding", "utf-8")
      .load("/input/student.csv")
    //5.将df注册为一个临时视图(表),只能在当前的SparkSession对象中使用
    df.createTempView("tb_student")
    //再次注册同名的视图,抛出异常TempTableAlreadyExistsException
    //Temporary view 'tb_score' already exists
    //df.createTempView("tb_score")
    //7.注册或替换临时视图 不存在则注册,存在则替换
    df.createOrReplaceTempView("tb_student")
    //8.注册全局的临时视图
    df.createOrReplaceGlobalTempView("tb_student")
    //6.执行查询操作
    spark.sql("select id,name,age from tb_student").show()

    //3.关闭spark
    spark.stop()
  }
}

自定义函数

UDF(User-Defined-Function)

概述

特性
一进:
输入:UDF 接受一个或多个输入参数。这些参数通常是 DataFrame 中的列。
含义:在“一进”的情况下,UDF 接收一个输入参数,例如一个列的值。这个输入参数可以是任何类型,具体取决于 UDF 的定义和用途。
一出:
输出:UDF 返回一个输出值。这个输出值通常是基于输入参数计算得到的。
含义:在“一出”的情况下,UDF 输出一个单一的结果,这个结果可以是任何类型,如整数、浮点数、字符串、数组等,具体取决于 UDF 的实现和预期用途。

参数1:UDF名称,可被用于SparkSQL的sql语句中
参数2:被注册成UDF的方法
参数3:声明UDF的返回值类型
sparkSession.udf.register(参数1,参数2,参数3)

package com.wunaiieq
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction

object UserDefUDF {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("udf1")
      .getOrCreate()
    //3.案例1:无参数数的自定义函数
    val randomUDFObj: UserDefinedFunction = udf(() => Math.random())
    spark.udf.register("random",randomUDFObj)
    println("无参数UDF:random")
    spark.sql("select random()").show()
    //4.案例2.一个参数的自定义函数
    val plusOneUDFObj: UserDefinedFunction = udf((x: Int) => x + 1)
    spark.udf.register("plus_one",plusOneUDFObj)
    println("1个参数UDF:加一")
    spark.sql("select plus_one(5)").show()
    //5.案例3.两个参数的自定义函数
    spark.udf.register("str_len2",(str:String,num:Int)=>str.length+num)
    println("2个参数UDF:字符串长度+给定值")
    spark.sql("select str_len2('test',1)").show()
    //6.案例4.用在where语句中的UDF
    spark.udf.register("arg_filter",(n:Int)=>n>5)
    //设置一个给定的test表
    spark.range(1,10).createOrReplaceTempView("test")
    spark.sql("select * from test").show()
    //在给定表test种进行UDF查询
    println("where语句中的UDF:在给定表种查询")
    spark.sql("select * from test where arg_filter(id)").show()
    //7.案例5.其他SQL语句中
    val nameList: List[String] = List[String](
      "zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
    //隐式转换
    import spark.implicits._
    //转换DF对象
    val nameDF: DataFrame = nameList.toDF("name")
    //将df注册为临时视图
    nameDF.createOrReplaceTempView("students")
    spark.udf.register("str_len",(name:String)=>name.length)
    //使用udf函数 strLen
    spark.sql("""select name, str_len(name) as length from students order by length desc""").show()
    //2.关闭
    spark.close()
  }
}

在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

特殊返回值(以数组为例)
package com.wunaiieq

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{ArrayType, StringType}


object UserDefUDF2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("udf2")
      .getOrCreate()
    //2.初始化一个List集合
    val lineList: List[String] = List[String](
      "python java scala", "spark core sql streaming")
    import spark.implicits._
    //3.转换为DF对象
    val lineDF: DataFrame = lineList.toDF("line")
    //4.注册临时视图
    lineDF.createOrReplaceTempView("words")
    //5.自定义函数
    spark.udf.register("split_space",
      (line:String)=>line.split(" "),ArrayType(StringType))
    //spark.udf.register("split_space",(line:String)=>line.split(" "))
    //6.使用自定义函数
    spark.sql("""select line,split_space(line) as arr from words""").show()
    //2.关闭
    spark.close()
  }
}

在这里插入图片描述

UDAF(User-Defined Aggregation Function)

以下为计算流程图(新标签页打开)
具体的操作方法类似于hadoop中的mapreduce
在这里插入图片描述

主函数

package com.wunaiieq

import org.apache.spark.sql.{DataFrame, SparkSession, functions}

object UserDefUDAFNew {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("udafNew")
      .getOrCreate()
    //3.读取json文件
    val df: DataFrame = spark.read.json("data/sql/student.json")
    //4.注册临时视图
    df.createOrReplaceTempView("tb_student")
    //5.注册自定义UDAF函数
    spark.udf.register("my_avg",functions.udaf(new MyUDAF()))
    //6.调用自定义的udaf函数查询每个科目,以及该科目的平均分
    spark.sql("""select class,my_avg(age) from tb_student group by class""").show()
    //2.关闭
    spark.close()
  }
}

样例类

package com.wunaiieq

//自定义样例类,sum聚合,cnt数量
case class MyBuf(var sum:Int,var cnt:Int)

**MyUDAF类 **

package com.wunaiieq

import com.wunaiieq.MyBuf
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.immutable.Range

//自定义类MyUDAF
/**@param Int 进来的参数类型
 * @param Double 输出的参数类型
 * @param MyBuf
 * */
class MyUDAF extends Aggregator[Int,MyBuf,Double]{
  //赋初始化的值 0,0
  override def zero: MyBuf = MyBuf(0,0)
  /**Map节点中的reduce操作将同一个分区下的数据进行分组、聚合
   * @param K 表示班级class,用于分组,
   * @param V 表示传入值,年龄
   * @return MyBuf对象K
   * */
  override def reduce(K: MyBuf, V: Int): MyBuf = {
    K.sum += V
    K.cnt += 1
    K
  }
  /**@param Final_K 表示最终汇总的对象
   * @param k 表示每次输入的小对象,需要将小对象的值一一聚合到K中
   * @return Final_K最终返回的对象,内部已经聚合了所有小对象的值
   * */
  //reduce端将同一个分组的数据进行聚合
  override def merge(Final_K: MyBuf, k: MyBuf): MyBuf = {
    Final_K.sum += k.sum
    Final_K.cnt += k.cnt
    Final_K
  }
  /**逻辑计算: 此处的逻辑计算为 Final_K的sum值 / Final_K的cnt值
   * @param Final_K 最终的对象,已经聚合所有的计算结果,只差最后的逻辑计算
   * @return 返回逻辑计算的结果
   *
   * */
  //聚合后每组数据得到一个MyBuf对象,然后[再做最后的计算]并返回结果
  override def finish(Final_K: MyBuf): Double
  = Final_K.sum.toDouble/Final_K.cnt
  //在中间计算时,每次返回的都是样例类型的对象,所以类型写入MyBuf
  //中间结果的序列化,元组或样例类调用Encoders.product进行序列化
  override def bufferEncoder: Encoder[MyBuf] = Encoders.product
  //最终结果的序列化,内部类型为最终的结果类型
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

开窗函数

描述
既显示聚集前的数据又显示聚集后的数据

package com.wunaiieq

import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession, functions}


object OpenWindowFunction {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("OpenWindowFunction")
      .getOrCreate()
    //3.读取json文件
    val df: DataFrame = spark.read.json("data/sql/student.json")
    //4.注册视图
    df.createOrReplaceTempView("tb_student")
    //5.聚合开窗函数
    //5.1.在每行信息后面显示所有人的平均年龄
    println("在每行信息后面显示所有人的平均年龄")
    spark.sql("""select id,name,A,B,avg(age) over() as avg_age from tb_student""").show()
    //5.2.在每行信息后面显示各个班级的平均年龄
    println("在每行信息后面显示各个班级的平均年龄")
    spark.sql("""select id,name,A,B,avg(age) over(partition by class) as avg_age from tb_student""").show()
    //6.排序开窗函数
    //6.1row_number排序开窗函数:值同名次不同,序号不间断
    println("------row_number-----")
    spark.sql("""select id,name,A,B,row_number() over(order by id desc) as rn from tb_student""").show()
    //6.2 dense_rank:值同名次同,序号不间断
    println("------dense_rank-----")
    spark.sql("""select id,name,A,B,dense_rank() over(order by id desc) as dr from tb_student""").show()
//    //6.3. rank: 值同名次同,序号间断
    println("------rank-----")
    spark.sql("""select id,name,A,B,rank() over(order by id desc) as rk from tb_student""").show()
    //2.关闭
    spark.close()
  }
}

最后一列的avg_age为计算结果,加入到表格中一起显示,这就是窗口函数的作用
在这里插入图片描述在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到