Spark-SQL(二)

发布于:2025-04-19 ⋅ 阅读:(38) ⋅ 点赞:(0)

本节课基于上节课,在idea中创建新的Spark-SQL文件进行运行

学习了自定义函数(UDF)、用户定义函数(UDAF)的使用,了解到强类型UDAF跟弱类型UDAF的区别。

了解到自定义函数侧重于对单条数据进行处理,但代码繁琐;而用户定义函数则是对更复杂的聚合计算逻辑进行处理,更高效、更贴合数据。

弱类型 UDAF 在 Spark 早期版本中使用较多,使用时需要关注数据类型的定义和转换,灵活性稍弱但能满足常规聚合需求;强类型 UDAF 在 Spark 3.0 及之后版本提供了更简洁、类型安全的聚合方式,开发效率和代码可读性在一些复杂场景下可能更优。

一、上节课的代码在idea中运行

1. 首先创建Spark-SQL文件,创建class文件

2. 在大的框架pom.xml中添加以下依赖

3. 在 Spark-SQL/src/main/scala 中创建Scala Class 文件,名为SparkSQLDemo ,添加以下代码

case class User(id:Int,name:String,age:Int)

object SparkSQLDemo {
  def main(args: Array[String]): Unit = {
    //创建上下文环境配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
    //创建SparkSession对象
    val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._
    //读取json文件
    val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
    df.show()
    //SQL风格语法
    df.createOrReplaceTempView("user")
    spark.sql("select * from user").show
    spark.sql("select avg(age) from user").show

    //DSL风格语法
    df.select("username","age").show()

    //RDD=>DataFrame=>DataSet
    //RDD
    val rdd1 :RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(
      List((1,"zhangsan",30),(2,"lisi",40))
    )
    //DataFrame
    val df1 :DataFrame = rdd1.toDF("id","name","age")
    df1.show()
    //DataSet
    val ds1 :Dataset[User] = df1.as[User]
    ds1.show()

    //DataSet=>DataFrame=>RDD
    val df2 =ds1.toDF()
    df2.show()

    val rdd2 :RDD[Row] = df2.rdd
    rdd2.foreach(a=>println(a.getString(1)))

    rdd1.map{
      case (id,name,age)=>User(id,name,age)
    }.toDS().show()

    val rdd3 = ds1.rdd
    rdd3.foreach(a=>println(a.age))
    rdd3.foreach(a=>println(a.id))
    rdd3.foreach(a=>println(a.name))

    spark.stop()
  }

}

4. 在Spark-SQL文件下创建input 文件,创建 user.json 并添加以下代码

[
  {"id": 1, "name": "zhangsan", "age": 30},
  {"id": 2, "name": "lisi", "age": 40}
]

5. 运行 SparkSQLDemo 代码

二、自定义函数

UDF:

实现项目:计算平均工资

(一)RDD

(二)弱类型UDAF

先定义

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// 先定义MyAverageUDAF类
class MyAverageUDAF extends UserDefinedAggregateFunction {
  def inputSchema: StructType =
    StructType(Array(StructField("salary", IntegerType)))
  // 聚合函数缓冲区中值的数据类型(salary,count)
  def bufferSchema: StructType = {
    StructType(Array(StructField("sum", LongType), StructField("count", LongType)))
  }
  // 函数返回值的数据类型
  def dataType: DataType = DoubleType
  // 稳定性:对于相同的输入是否一直返回相同的输出。
  def deterministic: Boolean = true
  // 函数缓冲区初始化
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    // 存薪资的总和
    buffer(0) = 0L
    // 存薪资的个数
    buffer(1) = 0L
  }
  // 更新缓冲区中的数据
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getInt(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // 合并缓冲区
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // 计算最终结果
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

后运行

(三)强类型UDAF

先添加

import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
// 导入 RDD 类型所在的包
import org.apache.spark.rdd.RDD

// 重命名为 SalaryBuff
case class SalaryBuff(var sum: Long, var cnt: Long)
// 重命名为 NewAverageUDAF
class NewAverageUDAF extends Aggregator[Long, SalaryBuff, Double] {
  override def zero: SalaryBuff = SalaryBuff(0, 0)

  override def reduce(b: SalaryBuff, a: Long): SalaryBuff = {
    b.sum += a
    b.cnt += 1
    b
  }

  override def merge(b1: SalaryBuff, b2: SalaryBuff): SalaryBuff = {
    b1.sum += b2.sum
    b1.cnt += b2.cnt
    b1
  }

  override def finish(reduction: SalaryBuff): Double = {
    reduction.sum.toDouble / reduction.cnt
  }
  override def bufferEncoder: Encoder[SalaryBuff] = Encoders.product
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

后运行


网站公告

今日签到

点亮在社区的每一天
去签到