实验内容:
利用IDEA开发Spark-SQL。
实验步骤:
利用IDEA开发Spark-SQL
创建子模块 并且添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
在项目中创建了名为 Spark-SQL 的子模块,并添加了 org.apache.spark:spark-sql_2.12:3.0.0 依赖。
创建Spark-SQL的测试代码:
导入必要的包和定义数据结构:
case class User(id:Int,name:String,age:Int)
定义了一个名为 User 的样例类,用于表示用户数据的结构,包含 id(整数类型)、name(字符串类型)和 age(整数类型)三个字段。
主函数和上下文环境配置:
object SparkSQLDemo {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
在 main 函数中,首先创建了一个 SparkConf 对象,设置了运行模式为 local[*](表示在本地使用所有可用的 CPU 核心运行),并设置了应用程序的名称为 SQLDemo。然后通过 SparkSession.builder() 方法并传入配置对象来创建 SparkSession 对象,SparkSession 是 Spark SQL 中用于与 Spark 进行交互的入口点,它整合了 SparkContext、SQLContext 等重要组件,提供了统一的编程接口。
导入隐式转换并读取 JSON 文件:
import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
df.show()
通过 import spark.implicits._ 导入了隐式转换,这使得在后续代码中可以方便地进行数据类型转换,例如将 RDD 转换为 DataFrame 等。接着使用 spark.read.json() 方法从指定路径 Spark-SQL/input/user.json 读取 JSON 格式的数据,并将其存储为 DataFrame 对象 df,然后调用 show() 方法展示数据的内容。
SQL 风格语法操作:
//SQL风格语法
df.createOrReplaceTempView("user")
spark.sql("select * from user").show
spark.sql("select avg(age) from user").show
通过 createOrReplaceTempView("user") 将 DataFrame 注册为一个临时视图 user,这样就可以使用 SQL 语句来查询和操作这个数据。然后分别执行了 select * from user 和 select avg(age) from user 两条 SQL 语句,前者展示了所有数据,后者计算并展示了用户年龄的平均值。
DSL 风格语法操作:
//DSL风格语法
df.select("username","age").show()
使用 DataFrame 的 DSL(领域特定语言)风格语法,通过 select 方法选择了 username 和 age 两列数据,并展示出来。这里可能存在一个小错误,原始数据结构中是 name 字段,而代码中写的是 username,如果实际数据中没有 username 列,会导致错误。
RDD、DataFrame 和 DataSet 之间的转换操作:
//RDD=>DataFrame=>DataSet
//RDD
val rdd1 :RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(
List((1,"zhangsan",30),(2,"lisi",40))
)
//DataFrame
val df1 :DataFrame = rdd1.toDF("id","name","age")
df1.show()
//DataSet
val ds1 :Dataset[User] = df1.as[User]
ds1.show()
首先创建了一个 RDD,包含了用户的 id、name 和 age 信息。然后通过 toDF 方法将 RDD 转换为 DataFrame,并指定了列名。接着使用 as[User] 方法将 DataFrame 转换为 DataSet,并使用 show 方法展示了转换后的数据。
//DataSet=>DataFrame=>RDD
val df2 =ds1.toDF()
df2.show()
val rdd2 :RDD[Row] = df2.rdd
rdd2.foreach(a=>println(a.getString(1)))
将 DataSet 转换回 DataFrame,并展示数据。然后将 DataFrame 转换为 RDD,并通过 foreach 方法遍历 RDD,打印出每行数据中索引为 1 的字符串类型字段(即 name 字段)。
rdd1.map{
case (id,name,age)=>User(id,name,age)
}.toDS().show()
将 RDD 中的数据通过 map 操作转换为 User 对象,并再次转换为 DataSet 展示出来。
val rdd3 = ds1.rdd
rdd3.foreach(a=>println(a.age))
rdd3.foreach(a=>println(a.id))
rdd3.foreach(a=>println(a.name))
将 DataSet 转换为 RDD,并分别通过 foreach 方法打印出 User 对象中的 age、id 和 name 字段的值。
关闭 SparkSession:
spark.stop()
}
}
自定义函数:
UDF:
配置 Spark 环境并创建 SparkSession:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
这段代码首先创建了一个 SparkConf 对象,设置了 Spark 应用程序的运行模式为 local[*],表示在本地使用所有可用的 CPU 核心来运行应用程序,并设置应用程序的名称为 SQLDemo。然后通过 SparkSession.builder() 方法,传入配置对象 sparkConf,创建了一个 SparkSession 对象 spark。SparkSession 是 Spark SQL 中用于与 Spark 进行交互的核心入口点,它整合了多个关键组件,如 SparkContext 和 SQLContext 等,为后续的操作提供了统一的接口。
导入隐式转换并读取 JSON 文件:
import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
import spark.implicits._ 导入了 spark 相关的隐式转换,这使得在后续代码中能够方便地进行数据类型转换,例如将 RDD 转换为 DataFrame 等操作。接着使用 spark.read.json() 方法从指定路径 Spark-SQL/input/user.json 读取 JSON 格式的数据,并将其存储为 DataFrame 对象 df。DataFrame 是 Spark SQL 中用于处理结构化数据的一种数据结构,类似于关系型数据库中的表。
注册自定义函数(UDF):
spark.udf.register("addName",(x:String)=>"Name:"+x)
通过 spark.udf.register() 方法注册了一个名为 addName 的自定义函数。该函数接受一个 String 类型的参数 x,并返回一个新的字符串,格式为 "Name:" 加上传入的字符串参数 x。例如,如果传入的参数是 "Alice",则返回的字符串将是 "Name:Alice"。注册后的 UDF 可以在后续的 SQL 查询中使用。
创建临时视图并执行 SQL 查询:
df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()
df.createOrReplaceTempView("people") 将之前读取的 DataFrame df 注册为一个临时视图 people。临时视图类似于数据库中的表,这样就可以使用 SQL 语句来查询和操作这个数据。然后通过 spark.sql() 方法执行了一条 SQL 查询语句 select addName(username),age from people,该语句调用了之前注册的 UDF addName 对 username 列的数据进行处理,并同时选择了 age 列。最后使用 show() 方法展示了查询结果。
关闭 SparkSession:
spark.stop()
UDAF(自定义聚合函数)
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator
实验需求:计算平均工资
实现方式一:RDD
配置 Spark 环境并创建 SparkContext:
val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
首先创建了一个 SparkConf 对象,设置应用名称为 "app",运行模式为 "local[*]",即本地使用所有可用的 CPU 核心来运行应用。然后通过 SparkConf 对象创建了 SparkContext 对象 sc,SparkContext 是 Spark 中所有功能的入口点,用于与集群进行交互、创建 RDD 等操作。
创建 RDD 并进行转换操作:
val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))).map {
case (name, salary) => {
(salary, 1)
}
}
使用 sc.makeRDD() 方法创建了一个包含员工姓名和工资的 RDD,初始数据为 List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))。然后通过 map 转换操作,将每个元素(即每个员工的姓名和工资的元组)映射为一个新的元组 (salary, 1),这里的 1 表示每个员工作为一个计数单位,后续用于计算员工数量。
使用 reduce 操作计算总和和数量:
.reduce {
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
}
对前面经过 map 转换后的 RDD 使用 reduce 操作。reduce 操作会对 RDD 中的元素进行两两合并,这里的合并逻辑是将两个元组 t1 和 t2 的第一个元素(工资)相加,第二个元素(计数)相加,最终得到一个包含工资总和和员工数量的元组。
计算并输出平均工资:
println(resRDD._1/resRDD._2)
关闭连接
sc.stop()
实现方式二:弱类型UDAF
定义自定义聚合函数类 MyAverageUDAF:
class MyAverageUDAF extends UserDefinedAggregateFunction{
def inputSchema: StructType =
StructType(Array(StructField("salary",IntegerType)))
该方法定义了聚合函数的输入模式。这里表示输入数据是一个包含名为 salary 的字段,其数据类型为 IntegerType 的结构。也就是说,这个聚合函数期望输入的数据行中包含一个名为 salary 的整数类型字段。
bufferSchema 方法:
// 聚合函数缓冲区中值的数据类型(salary,count)
def bufferSchema: StructType = {StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
}
此方法定义了聚合函数缓冲区中值的数据类型。缓冲区用于在聚合过程中存储中间结果,这里定义了一个包含两个字段的结构:sum(用于存储工资总和,数据类型为 LongType)和 count(用于存储工资的个数,数据类型为 LongType)。
- dataType 方法:
// 函数返回值的数据类型
def dataType: DataType = DoubleType
该方法指定了聚合函数最终返回值的数据类型。由于要计算平均工资,结果是一个小数,所以返回值类型为 DoubleType。
- deterministic 方法:
// 稳定性:对于相同的输入是否一直返回相同的输出。
def deterministic: Boolean = true
deterministic 表示函数的确定性,即对于相同的输入是否总是返回相同的输出。这里设置为 true,说明这个聚合函数是确定性的,相同的输入数据会得到相同的计算结果。
initialize 方法:
// 函数缓冲区初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存薪资的总和
buffer(0) = 0L
// 存薪资的个数
buffer(1) = 0L
}
initialize 方法用于初始化聚合函数的缓冲区。将缓冲区的第一个位置(存储工资总和)设置为 0L,第二个位置(存储工资个数)也设置为 0L。
- update 方法:
// 更新缓冲区中的数据
def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getInt(0)
buffer(1) = buffer.getLong(1) + 1
}
}
update 方法用于更新缓冲区中的数据。当输入的行数据中的 salary 字段不为空时,将当前行的 salary 值累加到缓冲区的 sum 字段中,并将 count 字段加 1,以记录处理的工资个数。
- merge 方法:
// 合并缓冲区
def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
merge 方法用于合并两个缓冲区。在分布式计算中,当不同分区的聚合结果需要合并时,会调用这个方法。将 buffer2 中的 sum 和 count 值分别累加到 buffer1 对应的字段中。
- evaluate 方法:
// 计算最终结果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble /
buffer.getLong(1)
}
evaluate 方法用于计算最终的聚合结果。从缓冲区中取出 sum 和 count 值,将 sum 转换为 Double 类型后除以 count,得到平均工资并返回。
主程序部分:
配置 Spark 环境并创建 SparkSession:
val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()
创建 SparkConf 对象,设置应用名称为 "app",运行模式为 "local[*]",然后通过 SparkSession.builder() 方法创建 SparkSession 对象 spark,作为与 Spark 交互的入口。
创建 RDD 并转换为 DataFrame:
import spark.implicits._
val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))
val df :DataFrame = res.toDF("name","salary")
使用 spark.sparkContext.makeRDD() 创建一个包含员工姓名和工资的 RDD,然后通过 toDF 方法将其转换为 DataFrame,并指定列名为 "name" 和 "salary"。
- 注册临时视图并注册 UDAF:
df.createOrReplaceTempView("user")
var myAverage = new MyAverageUDAF
//在 spark 中注册聚合函数
spark.udf.register("avgSalary",myAverage)
将 DataFrame 注册为临时视图 "user",方便后续使用 SQL 语句查询。创建 MyAverageUDAF 类的实例 myAverage,并通过 spark.udf.register() 方法将其注册为名为 "avgSalary" 的用户自定义函数。
执行 SQL 查询并展示结果:
spark.sql("select avgSalary(salary) from user").show()
关闭连接
spark.stop()
实现方式三:强类型UDAF
定义缓冲区数据结构 Buff:
case class Buff(var sum:Long,varcnt:Long)
定义了一个名为 Buff 的样例类,用于表示聚合过程中的缓冲区数据结构。它包含两个可变字段 sum(用于存储工资总和,类型为 Long)和 cnt(用于存储工资个数,类型为 Long)。
- 定义强类型 UDAF 类 MyAverageUDAF:
class MyAverageUDAF extends Aggregator[Long,Buff,Double]{
zero 方法:
override def zero: Buff = Buff(0,0)
zero 方法定义了聚合操作的初始值,即创建一个 Buff 实例,其 sum 和 cnt 字段都初始化为 0。
reduce 方法:
override def reduce(b: Buff, a: Long): Buff = {
b.sum += a
b.cnt += 1
b
}
reduce 方法用于在每个分区内对输入数据进行聚合操作。它接受一个缓冲区对象 b 和一个输入数据 a(工资数值),将输入的工资累加到缓冲区的 sum 字段,并将 cnt 字段加 1,然后返回更新后的缓冲区对象。
merge 方法:
override def merge(b1: Buff, b2: Buff): Buff = {
b1.sum += b2.sum
b1.cnt += b2.cnt
b1
}
merge 方法用于合并不同分区的聚合结果。它接受两个缓冲区对象 b1 和 b2,将 b2 的 sum 和 cnt 累加到 b1 对应的字段上,然后返回 b1。
finish 方法:
override def finish(reduction: Buff): Double = {
reduction.sum.toDouble/reduction.cnt
}
finish 方法用于计算最终的聚合结果。它接受一个已经聚合好的缓冲区对象 reduction,将 sum 转换为 Double 类型后除以 cnt,得到平均工资并返回。
bufferEncoder 方法:
override def bufferEncoder: Encoder[Buff] = Encoders.product
bufferEncoder 方法定义了如何对缓冲区数据进行编码,这里使用 Encoders.product 来编码 Buff 类型的数据,因为 Buff 是一个样例类,Encoders.product 可以自动处理其字段的编码。
outputEncoder 方法:
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
outputEncoder 方法定义了如何对最终输出结果进行编码,这里使用 Encoders.scalaDouble 来编码 Double 类型的数据。
主程序部分:
配置 Spark 环境并创建 SparkSession:
val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()
创建 SparkConf 对象,设置应用名称为 "app",运行模式为 "local[*]",然后通过 SparkSession.builder() 方法创建 SparkSession 对象 spark,作为与 Spark 交互的入口。
创建 RDD 并转换为 DataFrame:
import spark.implicits._
val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))
val df :DataFrame = res.toDF("name","salary")
使用 spark.sparkContext.makeRDD() 创建一个包含员工姓名和工资的 RDD,然后通过 toDF 方法将其转换为 DataFrame,并指定列名为 "name" 和 "salary"。
注册临时视图并注册 UDAF:
df.createOrReplaceTempView("user")
var myAverage = new MyAverageUDAF
//在 spark 中注册聚合函数
spark.udf.register("avgSalary",functions.udaf(myAverage))
将 DataFrame 注册为临时视图 "user",方便后续使用 SQL 语句查询。创建 MyAverageUDAF 类的实例 myAverage,并通过 spark.udf.register() 方法将其注册为名为 "avgSalary" 的用户自定义函数。这里使用 functions.udaf(myAverage) 将强类型的 Aggregator 包装成可以注册的 UDF。
执行 SQL 查询并展示结果:
spark.sql("select avgSalary(salary) from user").show()
关闭连接
spark.stop()