Spark-SQL

发布于:2025-04-19 ⋅ 阅读:(16) ⋅ 点赞:(0)

Spark-SQL

一、Spark-SQL核心编程(四)

1、实验内容:

利用IDEA开发Spark-SQL。

2、实验步骤:

利用IDEA开发Spark-SQL

① 创建子模块Spark-SQL,并添加依赖

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-sql_2.12</artifactId>

    <version>3.0.0</version>

</dependency>

② 创建Spark-SQL的测试代码:

case class User(id:Int,name:String,age:Int)

 

object SparkSQLDemo {

  def main(args: Array[String]): Unit = {

    //创建上下文环境配置对象

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")

    //创建SparkSession对象

    val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

 

    import spark.implicits._

    //读取json文件

    val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")

    df.show()

    //SQL风格语法

    df.createOrReplaceTempView("user")

    spark.sql("select * from user").show

    spark.sql("select avg(age) from user").show

 

    //DSL风格语法

    df.select("username","age").show()

 

    //RDD=>DataFrame=>DataSet

    //RDD

    val rdd1 :RDD[(Int,String,Int)] = spark.sparkContext.makeRDD(

      List((1,"zhangsan",30),(2,"lisi",40))

    )

    //DataFrame

    val df1 :DataFrame = rdd1.toDF("id","name","age")

    df1.show()

    //DataSet

    val ds1 :Dataset[User] = df1.as[User]

    ds1.show()

 

    //DataSet=>DataFrame=>RDD

    val df2 =ds1.toDF()

    df2.show()

 

    val rdd2 :RDD[Row] = df2.rdd

    rdd2.foreach(a=>println(a.getString(1)))

 

    rdd1.map{

      case (id,name,age)=>User(id,name,age)

    }.toDS().show()

 

    val rdd3 = ds1.rdd

    rdd3.foreach(a=>println(a.age))

    rdd3.foreach(a=>println(a.id))

    rdd3.foreach(a=>println(a.name))

 

    spark.stop()

  }

 

}

 

二、Spark-SQL核心编程(五)

1、自定义函数:

UDF:

 

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")

//创建SparkSession对象

val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

 

import spark.implicits._

//读取json文件

val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")

 

spark.udf.register("addName",(x:String)=>"Name:"+x)

 

df.createOrReplaceTempView("people")

spark.sql("select addName(username),age from people").show()

 

spark.stop()

2、UDAF(自定义聚合函数)

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),

countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

3、实验需求:计算平均工资

3.1、实现方式一:RDD

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

val sc: SparkContext = new SparkContext(conf)

val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))).map {

  case (name, salary) => {

    (salary, 1)

  }

}.reduce {

  (t1, t2) => {

    (t1._1 + t2._1, t1._2 + t2._2)

  }

}

println(resRDD._1/resRDD._2)

// 关闭连接

sc.stop()

 

3.2、实现方式二:弱类型UDAF

class MyAverageUDAF extends UserDefinedAggregateFunction{

  def inputSchema: StructType =

    StructType(Array(StructField("salary",IntegerType)))

  // 聚合函数缓冲区中值的数据类型(salary,count)

  def bufferSchema: StructType = {

 

    StructType(Array(StructField("sum",LongType),StructField("count",LongType)))

  }

  // 函数返回值的数据类型

  def dataType: DataType = DoubleType

  // 稳定性:对于相同的输入是否一直返回相同的输出。

  def deterministic: Boolean = true

  // 函数缓冲区初始化

  def initialize(buffer: MutableAggregationBuffer): Unit = {

    // 存薪资的总和

    buffer(0) = 0L

    // 存薪资的个数

    buffer(1) = 0L

  }

  // 更新缓冲区中的数据

  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {

    if (!input.isNullAt(0)) {

      buffer(0) = buffer.getLong(0) + input.getInt(0)

      buffer(1) = buffer.getLong(1) + 1

    }

  }

  // 合并缓冲区

  def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {

    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)

    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)

  }

  // 计算最终结果

  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble /

    buffer.getLong(1)

}

 

 

 

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()

 

import spark.implicits._

val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))

 

val df :DataFrame = res.toDF("name","salary")

df.createOrReplaceTempView("user")

var myAverage = new MyAverageUDAF

//在 spark 中注册聚合函数

spark.udf.register("avgSalary",myAverage)

spark.sql("select avgSalary(salary) from user").show()

 

// 关闭连接

spark.stop()

 

3.3、实现方式三:强类型UDAF

case class Buff(var sum:Long,var cnt:Long)

class MyAverageUDAF extends Aggregator[Long,Buff,Double]{

  override def zero: Buff = Buff(0,0)

  override def reduce(b: Buff, a: Long): Buff = {

    b.sum += a

    b.cnt += 1

    b

  }

  override def merge(b1: Buff, b2: Buff): Buff = {

    b1.sum += b2.sum

    b1.cnt += b2.cnt

    b1

  }

  override def finish(reduction: Buff): Double = {

    reduction.sum.toDouble/reduction.cnt

  }

  override def bufferEncoder: Encoder[Buff] = Encoders.product

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

 

}

 

 

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")

val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()

 

import spark.implicits._

val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))

 

val df :DataFrame = res.toDF("name","salary")

df.createOrReplaceTempView("user")

var myAverage = new MyAverageUDAF

//在 spark 中注册聚合函数

spark.udf.register("avgSalary",functions.udaf(myAverage))

spark.sql("select avgSalary(salary) from user").show()

 

// 关闭连接

spark.stop()

 


网站公告

今日签到

点亮在社区的每一天
去签到