本节课基于上节课,在idea中创建新的Spark-SQL文件进行运行
学习了自定义函数(UDF)、用户定义函数(UDAF)的使用,了解到强类型UDAF跟弱类型UDAF的区别。
了解到自定义函数侧重于对单条数据进行处理,但代码繁琐;而用户定义函数则是对更复杂的聚合计算逻辑进行处理,更高效、更贴合数据。
弱类型 UDAF 在 Spark 早期版本中使用较多,使用时需要关注数据类型的定义和转换,灵活性稍弱但能满足常规聚合需求;强类型 UDAF 在 Spark 3.0 及之后版本提供了更简洁、类型安全的聚合方式,开发效率和代码可读性在一些复杂场景下可能更优。
一、上节课的代码在idea中运行
1. 首先创建Spark-SQL文件,创建class文件
2. 在大的框架pom.xml中添加以下依赖
3. 在 Spark-SQL/src/main/scala 中创建Scala Class 文件,名为SparkSQLDemo ,添加以下代码
case class User(id:Int,name:String,age:Int)
object SparkSQLDemo {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
df.show()
//SQL风格语法
df.createOrReplaceTempView("user")
spark.sql("select * from user").show
spark.sql("select avg(age) from user").show
//DSL风格语法
df.select("username","age").show()
//RDD=>DataFrame=>DataSet
//RDD
val rdd1 :RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(
List((1,"zhangsan",30),(2,"lisi",40))
)
//DataFrame
val df1 :DataFrame = rdd1.toDF("id","name","age")
df1.show()
//DataSet
val ds1 :Dataset[User] = df1.as[User]
ds1.show()
//DataSet=>DataFrame=>RDD
val df2 =ds1.toDF()
df2.show()
val rdd2 :RDD[Row] = df2.rdd
rdd2.foreach(a=>println(a.getString(1)))
rdd1.map{
case (id,name,age)=>User(id,name,age)
}.toDS().show()
val rdd3 = ds1.rdd
rdd3.foreach(a=>println(a.age))
rdd3.foreach(a=>println(a.id))
rdd3.foreach(a=>println(a.name))
spark.stop()
}
}
4. 在Spark-SQL文件下创建input 文件,创建 user.json 并添加以下代码
[
{"id": 1, "name": "zhangsan", "age": 30},
{"id": 2, "name": "lisi", "age": 40}
]
5. 运行 SparkSQLDemo 代码
二、自定义函数
UDF:
实现项目:计算平均工资
(一)RDD
(二)弱类型UDAF
先定义
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// 先定义MyAverageUDAF类
class MyAverageUDAF extends UserDefinedAggregateFunction {
def inputSchema: StructType =
StructType(Array(StructField("salary", IntegerType)))
// 聚合函数缓冲区中值的数据类型(salary,count)
def bufferSchema: StructType = {
StructType(Array(StructField("sum", LongType), StructField("count", LongType)))
}
// 函数返回值的数据类型
def dataType: DataType = DoubleType
// 稳定性:对于相同的输入是否一直返回相同的输出。
def deterministic: Boolean = true
// 函数缓冲区初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存薪资的总和
buffer(0) = 0L
// 存薪资的个数
buffer(1) = 0L
}
// 更新缓冲区中的数据
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getInt(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 合并缓冲区
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 计算最终结果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
后运行
(三)强类型UDAF
先添加
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
// 导入 RDD 类型所在的包
import org.apache.spark.rdd.RDD
// 重命名为 SalaryBuff
case class SalaryBuff(var sum: Long, var cnt: Long)
// 重命名为 NewAverageUDAF
class NewAverageUDAF extends Aggregator[Long, SalaryBuff, Double] {
override def zero: SalaryBuff = SalaryBuff(0, 0)
override def reduce(b: SalaryBuff, a: Long): SalaryBuff = {
b.sum += a
b.cnt += 1
b
}
override def merge(b1: SalaryBuff, b2: SalaryBuff): SalaryBuff = {
b1.sum += b2.sum
b1.cnt += b2.cnt
b1
}
override def finish(reduction: SalaryBuff): Double = {
reduction.sum.toDouble / reduction.cnt
}
override def bufferEncoder: Encoder[SalaryBuff] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
后运行