Spark从入门到熟悉(篇三)

发布于:2025-07-04 ⋅ 阅读:(14) ⋅ 点赞:(0)

本文介绍Spark的DataFrame、SparkSQL,并进行SparkSQL实战,加强对编程的理解,实现快速入手

知识脉络

包含如下7部分内容:

  • RDD和DataFrame、SparkSQL的对比
  • 创建DataFrame
  • DataFrame保存成文件
  • DataFrame的API交互
  • DataFrame的SQL交互
  • SparkSQL实战
  • 参考资料

RDD和DataFrame、SparkSQL的对比

RDD对比DataFrame

维度 RDD DataFrame DataSet
数据信息 \ 在RDD基础上增加了schma,能够获取列名信息 在DataFrame基础上进一步增加了数据类型信息,可以在编译时发现类型错误
API接口 \ \ DataFrame可以看成DataSet[Row],两者的API接口完全相同
支持SQL 不支持 支持 支持
支持语言 \ 在Python和R语言接口只支持DataFrame 只有Scala语言和Java语言接口中才支持
数据结构 行存储 是通过RDD来实现的,但是是列存储

RDD对比SparkSQL

维度 SparkSQL RDD
编程范式 声明式 命令式
实际运行代码 Excutor上是java字节码,因而可以取得几乎和直接使用scala/java进行编程相当的效率(忽略语法解析时间差异) Excutor上大多是Python代码,少量是java字节码
灵活性 默认支持的数据类型通常只有 Int,Long,Float,Double,String,Boolean 等这些标准SQL数据类型, 类型扩展相对繁琐。对于一些较为SQL中不直接支持的功能,通常可以借助于用户自定义函数(UDF)来实现,如果功能更加复杂,则可以转成RDD来进行实现。 比较灵活

说明:

  1. 用户应当尽可能多地使用SparkSQL以取得更好的性能。主要原因是SparkSQL是一种声明式编程风格,背后的计算引擎会自动做大量的性能优化工作
  2. 基于RDD的Spark的性能调优属于坑非常深的领域,并且很容易踩到。性能调优部分可以详见:Spark(篇四)

创建DataFrame

有如下5种方式:

方式1:通过toDF方法将RDD转换成DataFrame

方式2:通过createDataFrame方法将Pandas.DataFrame

方式3:通过createDataFrame方法将列表转换成DataFrame

方式4:通过createDataFrame方法指定schema创建DataFrame(可指定schema数据类型)

方式5:通过读取文件创建:读取json、csv、parquet、hive数据表(通过sparksql读取)、mysql数据表(对应的数据格式是jdbc)

DataFrame保存成文件

支持保存为json、csv、parquet、hive数据表

parquet:压缩格式, 占用存储小, 且是spark内存中存储格式,加载最快

hive数据表:

df.write\
  .bucketBy(42, "name")\
  .sortBy("age")\
  .saveAsTable("people_bucketed")


DataFrame的API交互

Action操作

常用操作:show、count、collect、first、take、head、describe

类RDD操作

支持RDD中的一些操作,比如:

map、flatMap、filter、broadcast、distinct、cache、persist、sample、

intersect、exceptAll

可以把DataFrame当做数据类型为Row的RDD进行操作,必要时可将其转换成RDD来操作

df = spark.createDataFrame([("Hello World",),("Hello China",),("Hello Spark",)]).toDF("value")
df.show()
+-----------+
|      value|
+-----------+
|Hello World|
|Hello China|
|Hello Spark|
+-----------+

df2 = spark.createDataFrame([["Hello World"],["Hello Scala"],["Hello Spark"]]).toDF("value")
df2.show()
+-----------+
|      value|
+-----------+
|Hello World|
|Hello Scala|
|Hello Spark|
+-----------+
#map操作,需要先转换成rdd
rdd = df.rdd.map(lambda x:Row(x[0].upper()))
dfmap = rdd.toDF(["value"]).show()

+-----------+
|      value|
+-----------+
|HELLO WORLD|
|HELLO CHINA|
|HELLO SPARK|
+-----------+

#flatMap,需要先转换成rdd
df_flat = df.rdd.flatMap(lambda x:x[0].split(" ")).map(lambda x:Row(x)).toDF(["value"])
df_flat.show()

+-----+
|value|
+-----+
|Hello|
|World|
|Hello|
|China|
|Hello|
|Spark|
+-----+

# filter和broadcast混合使用
broads = sc.broadcast(["Hello","World"])
df_filter_broad = df_flat.filter(~col("value").isin(broads.value))
df_filter_broad.show() 
"""
sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long)
withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30%
seed:表示一个种子,根据这个seed随机抽取,一般情况下只用前两个参数就可以,那么这个参数是干嘛的呢,这个参数一般用于调试,有时候不知道是程序出问题还是数据出了问题,就可以将这个参数设置为定值
"""
#sample抽样
dfsample = df.sample(False,0.6,0)

dfsample.show()  

+-----------+
|      value|
+-----------+
|Hello China|
|Hello Spark|
+-----------+
#intersect交集
dfintersect = df.intersect(df2)
dfintersect.show()
+-----------+
|      value|
+-----------+
|Hello Spark|
|Hello World|
+-----------+

#exceptAll补集
dfexcept = df.exceptAll(df2)
dfexcept.show()
+-----------+
|      value|
+-----------+
|Hello China|
+-----------+

类Excel操作

df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)
]).toDF("name","age","gender")

df.show()
df.printSchema()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
增加列:withColumn;
置换列的顺序:select;
删除列:drop;
重命名列:withColumnRenamed;
排序:sort、orderBy
去除nan值行:df.na.drop()
填充nan值:df.na.fill("female")
替换:df.na.replace({"":"female","RuHua":"SiYu"})
去重:dropDuplicates():根据全部字段;dropDuplicates(["age"]):根据部分字段
聚合:df.agg({"name":"count","age":"max"})
汇总信息:df.describe(),展现count、mean、stddev、min、max
频率过滤:#频率超过0.5的年龄和性别
df_freq = df.stat.freqItems(("age","gender"),0.5)
df_freq.show()
+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
|         [16]|          [male]|
+-------------+----------------+

类SQL操作

df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)]).toDF("name","age","gender")

df.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+

dfscore = spark.createDataFrame([("LiLei","male",88),("HanMeiMei","female",90),("DaChui","male",50)]) \
          .toDF("name","gender","score") 
dfscore.show()
+---------+------+-----+
|     name|gender|score|
+---------+------+-----+
|    LiLei|  male|   88|
|HanMeiMei|female|   90|
|   DaChui|  male|   50|
+---------+------+-----+
查询:select


表查询selectExpr,可以使用UDF函数,指定别名等
import datetime
spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age)
dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" )
dftest.show()
+---------+----------+------+
|     name|birth_year|gender|
+---------+----------+------+
|    LiLei|      2005|  MALE|
|HanMeiMei|      2004|FEMALE|
|   DaChui|      2003|  MALE|
|    RuHua|      2004|  null|
+---------+----------+------+


#表查询where, 指定SQL中的where字句表达式
dftest = df.where("gender='male' and age>15")
dftest.show()
+------+---+------+
|  name|age|gender|
+------+---+------+
|DaChui| 17|  male|
+------+---+------+


#表查询filter
dftest = df.filter(df["age"]>16)
dftest.show()
+------+---+------+
|  name|age|gender|
+------+---+------+
|DaChui| 17|  male|
+------+---+------+
表连接:join,可以指定连接方式为"inner","left","right","outer","semi",
"full","leftanti","anti"等多种方式
dfjoin = df.join(dfscore,["name","gender"],"outer")
dfjoin.show()

+---------+------+---+-----+
|     name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female| 16|   90|
|   DaChui|  male| 17|   50|
|    LiLei|  male| 15|   88|
|    RuHua|  null| 16| null|
+---------+------+---+-----+
#表合并union
#表分组 groupBy
#表分组后聚合,groupBy,agg
#表分组聚合,groupBy,agg。注意F.expr的用法
dfagg = df.groupBy("gender").agg(F.expr("avg(age)"),
                                 F.expr("collect_list(name)"))
dfagg.show()
+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
|  null|    16.0|           [RuHua]|
|female|    16.0|       [HanMeiMei]|
|  male|    16.0|   [LiLei, DaChui]|
+------+--------+------------------+
#表分组后透视,groupBy,pivot
#窗口函数
df = spark.createDataFrame([("LiLei",78,"class1"),		
                            ("HanMeiMei",87,"class1"),
                           ("DaChui",65,"class2"),
                            ("RuHua",55,"class2")]) \
          .toDF("name","score","class")
df.show()
dforder = df.selectExpr("name","score","class",
         "row_number() over (partition by class order by score desc) as order")
dforder.show()
+---------+-----+------+
|     name|score| class|
+---------+-----+------+
|    LiLei|   78|class1|
|HanMeiMei|   87|class1|
|   DaChui|   65|class2|
|    RuHua|   55|class2|
+---------+-----+------+

+---------+-----+------+-----+
|     name|score| class|order|
+---------+-----+------+-----+
|   DaChui|   65|class2|    1|
|    RuHua|   55|class2|    2|
|HanMeiMei|   87|class1|    1|
|    LiLei|   78|class1|    2|
+---------+-----+------+-----+
#爆炸函数

import pyspark.sql.functions as F 
students = [("LiLei","Swim|Sing|FootBall"),("Ann","Sing|Dance"),("LiLy","Reading|Sing|Dance")]
dfstudents = spark.createDataFrame(students,["name","hobbies"])

dfstudents.show()
dfstudents.createOrReplaceTempView("students")

#explode一行转多行,通常搭配LATERAL VIEW使用
dfhobby = spark.sql("select name,hobby from students LATERAL VIEW explode(split(hobbies,'\\\\|')) tmp as hobby") #注意特殊字符作为分隔符要加四个斜杠
dfhobby.show() 

#统计每种hobby有多少同学喜欢
dfcount = dfhobby.groupBy("hobby").agg(F.expr("count(name) as cnt"))
dfcount.show() 

+-----+------------------+
| name|           hobbies|
+-----+------------------+
|LiLei|Swim|Sing|FootBall|
|  Ann|        Sing|Dance|
| LiLy|Reading|Sing|Dance|
+-----+------------------+

+-----+--------+
| name|   hobby|
+-----+--------+
|LiLei|    Swim|
|LiLei|    Sing|
|LiLei|FootBall|
|  Ann|    Sing|
|  Ann|   Dance|
| LiLy| Reading|
| LiLy|    Sing|
| LiLy|   Dance|
+-----+--------+

+--------+---+
|   hobby|cnt|
+--------+---+
|    Swim|  1|
|FootBall|  1|
|    Sing|  3|
| Reading|  1|
|   Dance|  2|
+--------+---+
import pyspark.sql.functions as F 
students = [("LiLei",89,76,65),("HanMeiMei",97,98,89),("Lucy",66,55,70)]
dfstudents = spark.createDataFrame(students,["name","math","physics","music"])
dfstudents.show() 


#array类型
print("array类型")
dfarray = dfstudents.selectExpr("name","array(math,physics,music) as score")
dfarray.show() 
dfarray.selectExpr("name","score[0] as math").show()


#struct类型
print("struct类型")
dfstruct = dfstudents.selectExpr("name","struct('math',math,'physics',physics,'music',music) as score")
dfstruct.show() 
dfstruct.selectExpr("name","score.physics").show()


#map类型
print("map类型")
dfmap = dfstudents.selectExpr("name","map('math',math,'physics',physics,'music',music) as score")
dfmap.show() 
dfmap.selectExpr("name","score['math'] as math").show()

+---------+----+-------+-----+
|     name|math|physics|music|
+---------+----+-------+-----+
|    LiLei|  89|     76|   65|
|HanMeiMei|  97|     98|   89|
|     Lucy|  66|     55|   70|
+---------+----+-------+-----+

array类型
+---------+------------+
|     name|       score|
+---------+------------+
|    LiLei|[89, 76, 65]|
|HanMeiMei|[97, 98, 89]|
|     Lucy|[66, 55, 70]|
+---------+------------+

+---------+----+
|     name|math|
+---------+----+
|    LiLei|  89|
|HanMeiMei|  97|
|     Lucy|  66|
+---------+----+

struct类型
+---------+--------------------+
|     name|               score|
+---------+--------------------+
|    LiLei|[math,89,physics,...|
|HanMeiMei|[math,97,physics,...|
|     Lucy|[math,66,physics,...|
+---------+--------------------+

+---------+-------+
|     name|physics|
+---------+-------+
|    LiLei|     76|
|HanMeiMei|     98|
|     Lucy|     55|
+---------+-------+

map类型
+---------+--------------------+
|     name|               score|
+---------+--------------------+
|    LiLei|Map(math -> 89, p...|
|HanMeiMei|Map(math -> 97, p...|
|     Lucy|Map(math -> 66, p...|
+---------+--------------------+

+---------+----+
|     name|math|
+---------+----+
|    LiLei|  89|
|HanMeiMei|  97|
|     Lucy|  66|
+---------+----+
#json构造(to_json)和解析(get_json_object)

#构造学生数据
dfstudents = spark.createDataFrame([("LiLei","Math",70),("LiLei","English",87)
  ,("HanMeimei","Math",80),("HanMeimei","English",90)]).toDF("name","course","score")
print("dfstudents:")
dfstudents.show() 

#构造named_struct类型
dfnamed_struct = dfstudents.selectExpr("name","named_struct('course',course,'score',score) as scores")
print("dfnamed_struct:")
dfnamed_struct.show() 


#构造array(named_struct)类型
dfagg = dfnamed_struct.groupby("name").agg(F.expr("collect_list(scores) as arr_scores"))
print("dfagg:")
dfagg.show() 

#转换成json 
dfjson = dfagg.selectExpr("name","to_json(arr_scores) as json_scores")
print("dfjson:")
dfjson.show() 

#使用get_json_object解析json 
dfscores = dfjson.selectExpr("name",
    "get_json_object(json_scores,'$[0].score') as Math",
    "get_json_object(json_scores,'$[1].score') as English",)
print("dfscores:")
dfscores.show() 

dfstudents:
+---------+-------+-----+
|     name| course|score|
+---------+-------+-----+
|    LiLei|   Math|   70|
|    LiLei|English|   87|
|HanMeimei|   Math|   80|
|HanMeimei|English|   90|
+---------+-------+-----+

dfnamed_struct:
+---------+------------+
|     name|      scores|
+---------+------------+
|    LiLei|   [Math,70]|
|    LiLei|[English,87]|
|HanMeimei|   [Math,80]|
|HanMeimei|[English,90]|
+---------+------------+

dfagg:
+---------+--------------------+
|     name|          arr_scores|
+---------+--------------------+
|    LiLei|[[Math,70], [Engl...|
|HanMeimei|[[Math,80], [Engl...|
+---------+--------------------+

dfjson:
+---------+--------------------+
|     name|         json_scores|
+---------+--------------------+
|    LiLei|[{"course":"Math"...|
|HanMeimei|[{"course":"Math"...|
+---------+--------------------+

dfscores:
+---------+----+-------+
|     name|Math|English|
+---------+----+-------+
|    LiLei|  70|     87|
|HanMeimei|  80|     90|
+---------+----+-------+

DataFrame的SQL交互

将DataFrame注册为临时表视图或者全局表视图后,可以使用sql语句对DataFrame进行交互。不仅如此,还可以通过SparkSQL对Hive表直接进行增删改查等操作。

注册视图后进行SQL交互

注册为临时表视图:createOrReplaceTempView,生命周期和SparkSession相关联;

注册为全局临时表视图:createOrReplaceGlobalTempView,其生命周期和整个Spark应用程序关联(spark.newSession().sql)

#注册为临时表视图, 其生命周期和SparkSession相关联
df = spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")],
                              ("name","age","gender"))

df.show()
df.createOrReplaceTempView("student")
dfmale = spark.sql("select * from student where gender='male'")
dfmale.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 18|  male|
|HanMeiMei| 17|female|
|      Jim| 16|  male|
+---------+---+------+

+-----+---+------+
| name|age|gender|
+-----+---+------+
|LiLei| 18|  male|
|  Jim| 16|  male|
+-----+---+------+

#注册为全局临时表视图,其生命周期和整个Spark应用程序关联
df.createOrReplaceGlobalTempView("student")
query = """
 select t.gender
 , collect_list(t.name) as names 
 from global_temp.student t 
 group by t.gender
""".strip("\n")

spark.sql(query).show()
#可以在新的Session中访问
spark.newSession().sql("select * from global_temp.student").show()
+------+------------+
|gender|       names|
+------+------------+
|female| [HanMeiMei]|
|  male|[LiLei, Jim]|
+------+------------+

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 18|  male|
|HanMeiMei| 17|female|
|      Jim| 16|  male|
+---------+---+------+

对Hive表进行增删改查操作

创建:CREATE TABLE IF NOT EXISTS


删除:DROP TABLE IF EXISTS


动态写入数据到hive分区表:spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") #注意此处有一个设置操作
dfstudents = spark.createDataFrame([("LiLei",18,"class1","male"),
                                  ("HanMeimei",17,"class2","female"),
                                  ("DaChui",19,"class2","male"),     
                                   ("Lily",17,"class1","female")])
                             .toDF("name","age","class","gender")
dfstudents.show()
#动态写入分区
dfstudents.write.mode("overwrite").format("hive")\
.partitionBy("class","gender").saveAsTable("students")


#写入到静态分区
dfstudents = spark.createDataFrame([("Jim",18,"class3","male"),       
                                ("Tom",19,"class3","male")])
                                .toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass3")
#INSERT INTO 尾部追加, INSERT OVERWRITE TABLE 覆盖分区
query = """
INSERT OVERWRITE TABLE `students`
PARTITION(class='class3',gender='male') 
SELECT name,age from dfclass3
""".replace("\n"," ")
spark.sql(query)

#写入到混合分区
dfstudents = spark.createDataFrame([("David",18,"class4","male"),
                                    ("Amy",17,"class4","female"),
                                    ("Jerry",19,"class4","male"),
                                    ("Ann",17,"class4","female")])
                                .toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass4")
query = """
INSERT OVERWRITE TABLE `students`
PARTITION(class='class4',gender) 
SELECT name,age,gender from dfclass4
""".replace("\n"," ")
spark.sql(query)
dfdata = spark.sql("select * from students")
dfdata.show()
+---------+---+------+------+
|     name|age| class|gender|
+---------+---+------+------+
|      Ann| 17|class4|female|
|      Amy| 17|class4|female|
|HanMeimei| 17|class2|female|
|   DaChui| 19|class2|  male|
|    LiLei| 18|class1|  male|
|     Lily| 17|class1|female|
|    Jerry| 19|class4|  male|
|    David| 18|class4|  male|
|      Jim| 18|class3|  male|
|      Tom| 19|class3|  male|
+---------+---+------+------+

#删除分区
query = """
ALTER TABLE `students`
DROP IF EXISTS
PARTITION(class='class3') 
""".replace("\n"," ")
spark.sql(query)

#查看剩下数据
dfremain = spark.sql("select * from students")
dfremain.show() 
+---------+---+------+------+
|     name|age| class|gender|
+---------+---+------+------+
|    Jerry| 19|class4|  male|
|    David| 18|class4|  male|
|    LiLei| 18|class1|  male|
|   DaChui| 19|class2|  male|
|     Lily| 17|class1|female|
|HanMeimei| 17|class2|female|
|      Ann| 17|class4|female|
|      Amy| 17|class4|female|
+---------+---+------+------+

SparkSQL实战

求平均数

#任务:求data的平均值
data = [1,5,7,10,23,20,6,5,10,7,10]

dfdata = spark.createDataFrame([(x,) for x in data]).toDF("value")
dfagg = dfdata.agg({"value":"avg"})
dfagg.show()

+-----------------+
|       avg(value)|
+-----------------+
|9.454545454545455|
+-----------------+

求众数

#任务:求data中出现次数最多的数,若有多个,求这些数的平均值
from pyspark.sql import functions as F 
data =  [1,5,7,10,23,20,7,5,10,7,10]

dfdata = spark.createDataFrame([(x,1) for x in data]).toDF("key","value")
dfcount = dfdata.groupby("key").agg(F.count("value").alias("count")).cache()
max_count = dfcount.agg(F.max("count").alias("max_count")).take(1)[0]["max_count"]
dfmode = dfcount.where("count={}".format(max_count))
mode = dfmode.agg(F.expr("mean(key) as mode")).take(1)[0]["mode"]
print("mode:",mode)
dfcount.unpersist()

mode: 8.5

求topN

#任务:有一批学生信息表格,包括name,age,score, 找出score排名前3的学生, score相同可以任取
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
n = 3

dfstudents = spark.createDataFrame(students).toDF("name","age","score")
dftopn = dfstudents.orderBy("score", ascending=False).limit(n)

dftopn.show()

+---------+---+-----+
|     name|age|score|
+---------+---+-----+
|    LiLei| 18|   87|
|HanMeiMei| 16|   77|
|      Jim| 18|   77|
+---------+---+-----+

排序并返回序号

#任务:按从小到大排序并返回序号, 大小相同的序号可以不同

data = [1,7,8,5,3,18,34,9,0,12,8]

from copy import deepcopy
from pyspark.sql import types as T
from pyspark.sql import Row,DataFrame

def addLongIndex(df, field_name):
    schema = deepcopy(df.schema)
    schema = schema.add(T.StructField(field_name, T.LongType()))
    rdd_with_index = df.rdd.zipWithIndex()

    def merge_row(t):
        row,index= t
        dic = row.asDict() 
        dic.update({field_name:index})
        row_merged = Row(**dic)
        return row_merged

    rdd_row = rdd_with_index.map(lambda t:merge_row(t))
    return spark.createDataFrame(rdd_row,schema)

dfdata = spark.createDataFrame([(x,) for x in data]).toDF("value")
dfsorted = dfdata.sort(dfdata["value"])

dfsorted_index = addLongIndex(dfsorted,"index")

dfsorted_index.show() 

+-----+-----+
|value|index|
+-----+-----+
|    0|    0|
|    1|    1|
|    3|    2|
|    5|    3|
|    7|    4|
|    8|    5|
|    8|    6|
|    9|    7|
|   12|    8|
|   18|    9|
|   34|   10|
+-----+-----+

二次排序

#任务:有一批学生信息表格,包括name,age,score
#首先根据学生的score从大到小排序,如果score相同,根据age从大到小
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
dfstudents = spark.createDataFrame(students).toDF("name","age","score")
dfsorted = dfstudents.orderBy(dfstudents["score"].desc(),dfstudents["age"].desc())
dfsorted.show()

+---------+---+-----+
|     name|age|score|
+---------+---+-----+
|    LiLei| 18|   87|
|      Jim| 18|   77|
|HanMeiMei| 16|   77|
|   DaChui| 16|   66|
|    RuHua| 18|   50|
+---------+---+-----+

连接操作

#任务:已知班级信息表和成绩表,找出班级平均分在75分以上的班级
#班级信息表包括class,name,成绩表包括name,score

from pyspark.sql import functions as F 
classes = [("class1","LiLei"), ("class1","HanMeiMei"),("class2","DaChui"),("class2","RuHua")]
scores = [("LiLei",76),("HanMeiMei",80),("DaChui",70),("RuHua",60)]

dfclass = spark.createDataFrame(classes).toDF("class","name")
dfscore = spark.createDataFrame(scores).toDF("name","score")

dfstudents = dfclass.join(dfscore,on ="name" ,how = "left")

dfagg = dfstudents.groupBy("class").agg(F.avg("score").alias("avg_score")).where("avg_score>75.0")
     
dfagg.show()
+------+---------+
| class|avg_score|
+------+---------+
|class1|     78.0|
+------+---------+

分组求众数

#任务:有一批学生信息表格,包括class和age。求每个班级学生年龄的众数。

students = [("class1",15),("class1",15),("class2",16),("class2",16),("class1",17),("class2",19)]


from pyspark.sql import functions as F 

def mode(arr):
    dict_cnt = {}
    for x in arr:
        dict_cnt[x] = dict_cnt.get(x,0)+1
    max_cnt = max(dict_cnt.values())
    most_values = [k for k,v in dict_cnt.items() if v==max_cnt]
    s = 0.0
    for x in most_values:
        s = s + x
    return s/len(most_values)
spark.udf.register("udf_mode",mode)
dfstudents = spark.createDataFrame(students).toDF("class","score")
dfscores = dfstudents.groupBy("class").agg(F.collect_list("score").alias("scores"))
dfmode = dfscores.selectExpr("class","udf_mode(scores) as mode_score")
dfmode.show()

+------+----------+
| class|mode_score|
+------+----------+
|class2|      16.0|
|class1|      15.0|
+------+----------+

参考资料

Spark SQL学习小结

结尾

亲爱的读者朋友:感谢您在繁忙中驻足阅读本期内容!您的到来是对我们最大的支持❤️

正如古语所言:"当局者迷,旁观者清"。您独到的见解与客观评价,恰似一盏明灯💡,能帮助我们照亮内容盲区,让未来的创作更加贴近您的需求。

若此文给您带来启发或收获,不妨通过以下方式为彼此搭建一座桥梁: ✨ 点击右上角【点赞】图标,让好内容被更多人看见 ✨ 滑动屏幕【收藏】本篇,便于随时查阅回味 ✨ 在评论区留下您的真知灼见,让我们共同碰撞思维的火花

我始终秉持匠心精神,以键盘为犁铧深耕知识沃土💻,用每一次敲击传递专业价值,不断优化内容呈现形式,力求为您打造沉浸式的阅读盛宴📚。

有任何疑问或建议?评论区就是我们的连心桥!您的每一条留言我都将认真研读,并在24小时内回复解答📝。

愿我们携手同行,在知识的雨林中茁壮成长🌳,共享思想绽放的甘甜果实。下期相遇时,期待看到您智慧的评论与闪亮的点赞身影✨!

万分感谢🙏🙏您的点赞👍👍、收藏⭐🌟、评论💬🗯️、关注❤️💚~


自我介绍:一线互联网大厂资深算法研发(工作6年+),4年以上招聘面试官经验(一二面面试官,面试候选人400+),深谙岗位专业知识、技能雷达图,已累计辅导15+求职者顺利入职大中型互联网公司。熟练掌握大模型、NLP、搜索、推荐、数据挖掘算法和优化,提供面试辅导、专业知识入门到进阶辅导等定制化需求等服务,助力您顺利完成学习和求职之旅(有需要者可私信联系)

友友们,自己的知乎账号为“快乐星球”,定期更新技术文章,敬请关注!   


网站公告

今日签到

点亮在社区的每一天
去签到