spark-sql

发布于:2025-04-16 ⋅ 阅读:(20) ⋅ 点赞:(0)

实验内容:

利用IDEA开发Spark-SQL。

实验步骤:

利用IDEA开发Spark-SQL

创建子模块 并且添加依赖

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.12</artifactId>
   <version>3.0.0</version>
</dependency>

在项目中创建了名为 Spark-SQL 的子模块,并添加了 org.apache.spark:spark-sql_2.12:3.0.0 依赖。

创建Spark-SQL的测试代码:

导入必要的包和定义数据结构:

case class User(id:Int,name:String,age:Int)

定义了一个名为 User 的样例类,用于表示用户数据的结构,包含 id(整数类型)、name(字符串类型)和 age(整数类型)三个字段。


主函数和上下文环境配置:

object SparkSQLDemo {
 def main(args: Array[String]): Unit = {
   //创建上下文环境配置对象
   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
   //创建SparkSession对象
   val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

在 main 函数中,首先创建了一个 SparkConf 对象,设置了运行模式为 local[*](表示在本地使用所有可用的 CPU 核心运行),并设置了应用程序的名称为 SQLDemo。然后通过 SparkSession.builder() 方法并传入配置对象来创建 SparkSession 对象,SparkSession 是 Spark SQL 中用于与 Spark 进行交互的入口点,它整合了 SparkContext、SQLContext 等重要组件,提供了统一的编程接口。


导入隐式转换并读取 JSON 文件:

import spark.implicits._
   //读取json文件
   val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
   df.show()

通过 import spark.implicits._ 导入了隐式转换,这使得在后续代码中可以方便地进行数据类型转换,例如将 RDD 转换为 DataFrame 等。接着使用 spark.read.json() 方法从指定路径 Spark-SQL/input/user.json 读取 JSON 格式的数据,并将其存储为 DataFrame 对象 df,然后调用 show() 方法展示数据的内容。

SQL 风格语法操作:


   //SQL风格语法
   df.createOrReplaceTempView("user")
   spark.sql("select * from user").show
   spark.sql("select avg(age) from user").show
          通过 createOrReplaceTempView("user") 将 DataFrame 注册为一个临时视图 user,这样就可以使用 SQL 语句来查询和操作这个数据。然后分别执行了 select * from user 和 select avg(age) from user 两条 SQL 语句,前者展示了所有数据,后者计算并展示了用户年龄的平均值。

DSL 风格语法操作:


   //DSL风格语法
   df.select("username","age").show()
       使用 DataFrame 的 DSL(领域特定语言)风格语法,通过 select 方法选择了 username 和 age 两列数据,并展示出来。这里可能存在一个小错误,原始数据结构中是 name 字段,而代码中写的是 username,如果实际数据中没有 username 列,会导致错误。

RDD、DataFrame 和 DataSet 之间的转换操作:


   //RDD=>DataFrame=>DataSet
   //RDD
   val rdd1 :RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(
     List((1,"zhangsan",30),(2,"lisi",40))
   )
   //DataFrame
   val df1 :DataFrame = rdd1.toDF("id","name","age")
   df1.show()
   //DataSet
   val ds1 :Dataset[User] = df1.as[User]
   ds1.show()
       首先创建了一个 RDD,包含了用户的 id、name 和 age 信息。然后通过 toDF 方法将 RDD 转换为 DataFrame,并指定了列名。接着使用 as[User] 方法将 DataFrame 转换为 DataSet,并使用 show 方法展示了转换后的数据。


   //DataSet=>DataFrame=>RDD
   val df2 =ds1.toDF()
   df2.show()
   val rdd2 :RDD[Row] = df2.rdd
   rdd2.foreach(a=>println(a.getString(1)))

         将 DataSet 转换回 DataFrame,并展示数据。然后将 DataFrame 转换为 RDD,并通过 foreach 方法遍历 RDD,打印出每行数据中索引为 1 的字符串类型字段(即 name 字段)。

   rdd1.map{
     case (id,name,age)=>User(id,name,age)
   }.toDS().show()
        将 RDD 中的数据通过 map 操作转换为 User 对象,并再次转换为 DataSet 展示出来。


   val rdd3 = ds1.rdd
   rdd3.foreach(a=>println(a.age))
   rdd3.foreach(a=>println(a.id))
   rdd3.foreach(a=>println(a.name))

将 DataSet 转换为 RDD,并分别通过 foreach 方法打印出 User 对象中的 age、id 和 name 字段的值。

关闭 SparkSession:

   spark.stop()
 }

}

自定义函数:

UDF:

配置 Spark 环境并创建 SparkSession:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
         这段代码首先创建了一个 SparkConf 对象,设置了 Spark 应用程序的运行模式为 local[*],表示在本地使用所有可用的 CPU 核心来运行应用程序,并设置应用程序的名称为 SQLDemo。然后通过 SparkSession.builder() 方法,传入配置对象 sparkConf,创建了一个 SparkSession 对象 spark。SparkSession 是 Spark SQL 中用于与 Spark 进行交互的核心入口点,它整合了多个关键组件,如 SparkContext 和 SQLContext 等,为后续的操作提供了统一的接口。

导入隐式转换并读取 JSON 文件:

import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")
        import spark.implicits._ 导入了 spark 相关的隐式转换,这使得在后续代码中能够方便地进行数据类型转换,例如将 RDD 转换为 DataFrame 等操作。接着使用 spark.read.json() 方法从指定路径 Spark-SQL/input/user.json 读取 JSON 格式的数据,并将其存储为 DataFrame 对象 df。DataFrame 是 Spark SQL 中用于处理结构化数据的一种数据结构,类似于关系型数据库中的表。

注册自定义函数(UDF):


spark.udf.register("addName",(x:String)=>"Name:"+x)

         通过 spark.udf.register() 方法注册了一个名为 addName 的自定义函数。该函数接受一个 String 类型的参数 x,并返回一个新的字符串,格式为 "Name:" 加上传入的字符串参数 x。例如,如果传入的参数是 "Alice",则返回的字符串将是 "Name:Alice"。注册后的 UDF 可以在后续的 SQL 查询中使用。

创建临时视图并执行 SQL 查询:


 df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()

       df.createOrReplaceTempView("people") 将之前读取的 DataFrame df 注册为一个临时视图 people。临时视图类似于数据库中的表,这样就可以使用 SQL 语句来查询和操作这个数据。然后通过 spark.sql() 方法执行了一条 SQL 查询语句 select addName(username),age from people,该语句调用了之前注册的 UDF addName 对 username 列的数据进行处理,并同时选择了 age 列。最后使用 show() 方法展示了查询结果。

关闭 SparkSession:

spark.stop()

UDAF(自定义聚合函数)

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

实验需求:计算平均工资

实现方式一:RDD

配置 Spark 环境并创建 SparkContext:

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

首先创建了一个 SparkConf 对象,设置应用名称为 "app",运行模式为 "local[*]",即本地使用所有可用的 CPU 核心来运行应用。然后通过 SparkConf 对象创建了 SparkContext 对象 sc,SparkContext 是 Spark 中所有功能的入口点,用于与集群进行交互、创建 RDD 等操作。

创建 RDD 并进行转换操作:

val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))).map {
 case (name, salary) => {
   (salary, 1)
 }
}

使用 sc.makeRDD() 方法创建了一个包含员工姓名和工资的 RDD,初始数据为 List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))。然后通过 map 转换操作,将每个元素(即每个员工的姓名和工资的元组)映射为一个新的元组 (salary, 1),这里的 1 表示每个员工作为一个计数单位,后续用于计算员工数量。

使用 reduce 操作计算总和和数量:

.reduce {
 (t1, t2) => {
   (t1._1 + t2._1, t1._2 + t2._2)
 }
}

对前面经过 map 转换后的 RDD 使用 reduce 操作。reduce 操作会对 RDD 中的元素进行两两合并,这里的合并逻辑是将两个元组 t1 和 t2 的第一个元素(工资)相加,第二个元素(计数)相加,最终得到一个包含工资总和和员工数量的元组。

 计算并输出平均工资:

println(resRDD._1/resRDD._2)

关闭连接

sc.stop()

实现方式二:弱类型UDAF

定义自定义聚合函数类 MyAverageUDAF:

class MyAverageUDAF extends UserDefinedAggregateFunction{
 def inputSchema: StructType =
 StructType(Array(StructField("salary",IntegerType)))

该方法定义了聚合函数的输入模式。这里表示输入数据是一个包含名为 salary 的字段,其数据类型为 IntegerType 的结构。也就是说,这个聚合函数期望输入的数据行中包含一个名为 salary 的整数类型字段。


bufferSchema 方法:
 

// 聚合函数缓冲区中值的数据类型(salary,count)
 def bufferSchema: StructType = {StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
 }

此方法定义了聚合函数缓冲区中值的数据类型。缓冲区用于在聚合过程中存储中间结果,这里定义了一个包含两个字段的结构:sum(用于存储工资总和,数据类型为 LongType)和 count(用于存储工资的个数,数据类型为 LongType)。

- dataType 方法:

 // 函数返回值的数据类型
 def dataType: DataType = DoubleType

该方法指定了聚合函数最终返回值的数据类型。由于要计算平均工资,结果是一个小数,所以返回值类型为 DoubleType。

- deterministic 方法:

 // 稳定性:对于相同的输入是否一直返回相同的输出。
 def deterministic: Boolean = true

deterministic 表示函数的确定性,即对于相同的输入是否总是返回相同的输出。这里设置为 true,说明这个聚合函数是确定性的,相同的输入数据会得到相同的计算结果。

initialize 方法:

 // 函数缓冲区初始化
 def initialize(buffer: MutableAggregationBuffer): Unit = {
   // 存薪资的总和
   buffer(0) = 0L
   // 存薪资的个数
   buffer(1) = 0L
 }

initialize 方法用于初始化聚合函数的缓冲区。将缓冲区的第一个位置(存储工资总和)设置为 0L,第二个位置(存储工资个数)也设置为 0L。

- update 方法:

 // 更新缓冲区中的数据
 def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
   if (!input.isNullAt(0)) {
     buffer(0) = buffer.getLong(0) + input.getInt(0)
     buffer(1) = buffer.getLong(1) + 1
   }
 }

update 方法用于更新缓冲区中的数据。当输入的行数据中的 salary 字段不为空时,将当前行的 salary 值累加到缓冲区的 sum 字段中,并将 count 字段加 1,以记录处理的工资个数。

- merge 方法:

// 合并缓冲区
 def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {
   buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
   buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
 }

merge 方法用于合并两个缓冲区。在分布式计算中,当不同分区的聚合结果需要合并时,会调用这个方法。将 buffer2 中的 sum 和 count 值分别累加到 buffer1 对应的字段中。

- evaluate 方法:

 // 计算最终结果
 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble /
   buffer.getLong(1)
}
 evaluate 方法用于计算最终的聚合结果。从缓冲区中取出 sum 和 count 值,将 sum 转换为 Double 类型后除以 count,得到平均工资并返回。

主程序部分:

配置 Spark 环境并创建 SparkSession:

 val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()

创建 SparkConf 对象,设置应用名称为 "app",运行模式为 "local[*]",然后通过 SparkSession.builder() 方法创建 SparkSession 对象 spark,作为与 Spark 交互的入口。

创建 RDD 并转换为 DataFrame:

import spark.implicits._
val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))

val df :DataFrame = res.toDF("name","salary")

使用 spark.sparkContext.makeRDD() 创建一个包含员工姓名和工资的 RDD,然后通过 toDF 方法将其转换为 DataFrame,并指定列名为 "name" 和 "salary"。

- 注册临时视图并注册 UDAF:

df.createOrReplaceTempView("user")
var myAverage = new MyAverageUDAF
//在 spark 中注册聚合函数
spark.udf.register("avgSalary",myAverage)

将 DataFrame 注册为临时视图 "user",方便后续使用 SQL 语句查询。创建 MyAverageUDAF 类的实例 myAverage,并通过 spark.udf.register() 方法将其注册为名为 "avgSalary" 的用户自定义函数。

执行 SQL 查询并展示结果:

spark.sql("select avgSalary(salary) from user").show()

关闭连接

spark.stop()

实现方式三:强类型UDAF

定义缓冲区数据结构 Buff:

case class Buff(var sum:Long,varcnt:Long)

定义了一个名为 Buff 的样例类,用于表示聚合过程中的缓冲区数据结构。它包含两个可变字段 sum(用于存储工资总和,类型为 Long)和 cnt(用于存储工资个数,类型为 Long)。

- 定义强类型 UDAF 类 MyAverageUDAF:

class MyAverageUDAF extends Aggregator[Long,Buff,Double]{

zero 方法:

override def zero: Buff = Buff(0,0)

zero 方法定义了聚合操作的初始值,即创建一个 Buff 实例,其 sum 和 cnt 字段都初始化为 0。

reduce 方法:

override def reduce(b: Buff, a: Long): Buff = {
   b.sum += a
   b.cnt += 1
   b
 }

reduce 方法用于在每个分区内对输入数据进行聚合操作。它接受一个缓冲区对象 b 和一个输入数据 a(工资数值),将输入的工资累加到缓冲区的 sum 字段,并将 cnt 字段加 1,然后返回更新后的缓冲区对象。

merge 方法:

override def merge(b1: Buff, b2: Buff): Buff = {
   b1.sum += b2.sum
   b1.cnt += b2.cnt
   b1
 }

merge 方法用于合并不同分区的聚合结果。它接受两个缓冲区对象 b1 和 b2,将 b2 的 sum 和 cnt 累加到 b1 对应的字段上,然后返回 b1。

finish 方法:

override def finish(reduction: Buff): Double = {
   reduction.sum.toDouble/reduction.cnt
 }

finish 方法用于计算最终的聚合结果。它接受一个已经聚合好的缓冲区对象 reduction,将 sum 转换为 Double 类型后除以 cnt,得到平均工资并返回。

bufferEncoder 方法:

override def bufferEncoder: Encoder[Buff] = Encoders.product

bufferEncoder 方法定义了如何对缓冲区数据进行编码,这里使用 Encoders.product 来编码 Buff 类型的数据,因为 Buff 是一个样例类,Encoders.product 可以自动处理其字段的编码。

outputEncoder 方法:

override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

outputEncoder 方法定义了如何对最终输出结果进行编码,这里使用 Encoders.scalaDouble 来编码 Double 类型的数据。

主程序部分:

配置 Spark 环境并创建 SparkSession:

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()

创建 SparkConf 对象,设置应用名称为 "app",运行模式为 "local[*]",然后通过 SparkSession.builder() 方法创建 SparkSession 对象 spark,作为与 Spark 交互的入口。

创建 RDD 并转换为 DataFrame:

import spark.implicits._
val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))
val df :DataFrame = res.toDF("name","salary")

使用 spark.sparkContext.makeRDD() 创建一个包含员工姓名和工资的 RDD,然后通过 toDF 方法将其转换为 DataFrame,并指定列名为 "name" 和 "salary"。

注册临时视图并注册 UDAF:

df.createOrReplaceTempView("user")
var myAverage = new MyAverageUDAF
//在 spark 中注册聚合函数
spark.udf.register("avgSalary",functions.udaf(myAverage))

将 DataFrame 注册为临时视图 "user",方便后续使用 SQL 语句查询。创建 MyAverageUDAF 类的实例 myAverage,并通过 spark.udf.register() 方法将其注册为名为 "avgSalary" 的用户自定义函数。这里使用 functions.udaf(myAverage) 将强类型的 Aggregator 包装成可以注册的 UDF。

执行 SQL 查询并展示结果:

spark.sql("select avgSalary(salary) from user").show()

关闭连接

spark.stop()


网站公告

今日签到

点亮在社区的每一天
去签到