本文介绍Spark的DataFrame、SparkSQL,并进行SparkSQL实战,加强对编程的理解,实现快速入手
知识脉络
包含如下7部分内容:
- RDD和DataFrame、SparkSQL的对比
- 创建DataFrame
- DataFrame保存成文件
- DataFrame的API交互
- DataFrame的SQL交互
- SparkSQL实战
- 参考资料
RDD和DataFrame、SparkSQL的对比
RDD对比DataFrame
维度 | RDD | DataFrame | DataSet |
数据信息 | \ | 在RDD基础上增加了schma,能够获取列名信息 | 在DataFrame基础上进一步增加了数据类型信息,可以在编译时发现类型错误 |
API接口 | \ | \ | DataFrame可以看成DataSet[Row],两者的API接口完全相同 |
支持SQL | 不支持 | 支持 | 支持 |
支持语言 | \ | 在Python和R语言接口只支持DataFrame | 只有Scala语言和Java语言接口中才支持 |
数据结构 | 行存储 | 是通过RDD来实现的,但是是列存储 |
RDD对比SparkSQL
维度 | SparkSQL | RDD |
编程范式 | 声明式 | 命令式 |
实际运行代码 | Excutor上是java字节码,因而可以取得几乎和直接使用scala/java进行编程相当的效率(忽略语法解析时间差异) | Excutor上大多是Python代码,少量是java字节码 |
灵活性 | 默认支持的数据类型通常只有 Int,Long,Float,Double,String,Boolean 等这些标准SQL数据类型, 类型扩展相对繁琐。对于一些较为SQL中不直接支持的功能,通常可以借助于用户自定义函数(UDF)来实现,如果功能更加复杂,则可以转成RDD来进行实现。 | 比较灵活 |
说明:
- 用户应当尽可能多地使用SparkSQL以取得更好的性能。主要原因是SparkSQL是一种声明式编程风格,背后的计算引擎会自动做大量的性能优化工作
- 基于RDD的Spark的性能调优属于坑非常深的领域,并且很容易踩到。性能调优部分可以详见:Spark(篇四)
创建DataFrame
有如下5种方式:
方式1:通过toDF方法将RDD转换成DataFrame
方式2:通过createDataFrame方法将Pandas.DataFrame
方式3:通过createDataFrame方法将列表转换成DataFrame
方式4:通过createDataFrame方法指定schema创建DataFrame(可指定schema数据类型)
方式5:通过读取文件创建:读取json、csv、parquet、hive数据表(通过sparksql读取)、mysql数据表(对应的数据格式是jdbc)
DataFrame保存成文件
支持保存为json、csv、parquet、hive数据表
parquet:压缩格式, 占用存储小, 且是spark内存中存储格式,加载最快
hive数据表:
df.write\
.bucketBy(42, "name")\
.sortBy("age")\
.saveAsTable("people_bucketed")
DataFrame的API交互
Action操作
常用操作:show、count、collect、first、take、head、describe
类RDD操作
支持RDD中的一些操作,比如:
map、flatMap、filter、broadcast、distinct、cache、persist、sample、
intersect、exceptAll
可以把DataFrame当做数据类型为Row的RDD进行操作,必要时可将其转换成RDD来操作
df = spark.createDataFrame([("Hello World",),("Hello China",),("Hello Spark",)]).toDF("value")
df.show()
+-----------+
| value|
+-----------+
|Hello World|
|Hello China|
|Hello Spark|
+-----------+
df2 = spark.createDataFrame([["Hello World"],["Hello Scala"],["Hello Spark"]]).toDF("value")
df2.show()
+-----------+
| value|
+-----------+
|Hello World|
|Hello Scala|
|Hello Spark|
+-----------+
#map操作,需要先转换成rdd
rdd = df.rdd.map(lambda x:Row(x[0].upper()))
dfmap = rdd.toDF(["value"]).show()
+-----------+
| value|
+-----------+
|HELLO WORLD|
|HELLO CHINA|
|HELLO SPARK|
+-----------+
#flatMap,需要先转换成rdd
df_flat = df.rdd.flatMap(lambda x:x[0].split(" ")).map(lambda x:Row(x)).toDF(["value"])
df_flat.show()
+-----+
|value|
+-----+
|Hello|
|World|
|Hello|
|China|
|Hello|
|Spark|
+-----+
# filter和broadcast混合使用
broads = sc.broadcast(["Hello","World"])
df_filter_broad = df_flat.filter(~col("value").isin(broads.value))
df_filter_broad.show()
"""
sample(withReplacement : scala.Boolean, fraction : scala.Double,seed scala.Long)
withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30%
seed:表示一个种子,根据这个seed随机抽取,一般情况下只用前两个参数就可以,那么这个参数是干嘛的呢,这个参数一般用于调试,有时候不知道是程序出问题还是数据出了问题,就可以将这个参数设置为定值
"""
#sample抽样
dfsample = df.sample(False,0.6,0)
dfsample.show()
+-----------+
| value|
+-----------+
|Hello China|
|Hello Spark|
+-----------+
#intersect交集
dfintersect = df.intersect(df2)
dfintersect.show()
+-----------+
| value|
+-----------+
|Hello Spark|
|Hello World|
+-----------+
#exceptAll补集
dfexcept = df.exceptAll(df2)
dfexcept.show()
+-----------+
| value|
+-----------+
|Hello China|
+-----------+
类Excel操作
df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)
]).toDF("name","age","gender")
df.show()
df.printSchema()
+---------+---+------+
| name|age|gender|
+---------+---+------+
| LiLei| 15| male|
|HanMeiMei| 16|female|
| DaChui| 17| male|
| RuHua| 16| null|
+---------+---+------+
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
增加列:withColumn;
置换列的顺序:select;
删除列:drop;
重命名列:withColumnRenamed;
排序:sort、orderBy
去除nan值行:df.na.drop()
填充nan值:df.na.fill("female")
替换:df.na.replace({"":"female","RuHua":"SiYu"})
去重:dropDuplicates():根据全部字段;dropDuplicates(["age"]):根据部分字段
聚合:df.agg({"name":"count","age":"max"})
汇总信息:df.describe(),展现count、mean、stddev、min、max
频率过滤:#频率超过0.5的年龄和性别
df_freq = df.stat.freqItems(("age","gender"),0.5)
df_freq.show()
+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
| [16]| [male]|
+-------------+----------------+
类SQL操作
df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)]).toDF("name","age","gender")
df.show()
+---------+---+------+
| name|age|gender|
+---------+---+------+
| LiLei| 15| male|
|HanMeiMei| 16|female|
| DaChui| 17| male|
| RuHua| 16| null|
+---------+---+------+
dfscore = spark.createDataFrame([("LiLei","male",88),("HanMeiMei","female",90),("DaChui","male",50)]) \
.toDF("name","gender","score")
dfscore.show()
+---------+------+-----+
| name|gender|score|
+---------+------+-----+
| LiLei| male| 88|
|HanMeiMei|female| 90|
| DaChui| male| 50|
+---------+------+-----+
查询:select
表查询selectExpr,可以使用UDF函数,指定别名等
import datetime
spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age)
dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" )
dftest.show()
+---------+----------+------+
| name|birth_year|gender|
+---------+----------+------+
| LiLei| 2005| MALE|
|HanMeiMei| 2004|FEMALE|
| DaChui| 2003| MALE|
| RuHua| 2004| null|
+---------+----------+------+
#表查询where, 指定SQL中的where字句表达式
dftest = df.where("gender='male' and age>15")
dftest.show()
+------+---+------+
| name|age|gender|
+------+---+------+
|DaChui| 17| male|
+------+---+------+
#表查询filter
dftest = df.filter(df["age"]>16)
dftest.show()
+------+---+------+
| name|age|gender|
+------+---+------+
|DaChui| 17| male|
+------+---+------+
表连接:join,可以指定连接方式为"inner","left","right","outer","semi",
"full","leftanti","anti"等多种方式
dfjoin = df.join(dfscore,["name","gender"],"outer")
dfjoin.show()
+---------+------+---+-----+
| name|gender|age|score|
+---------+------+---+-----+
|HanMeiMei|female| 16| 90|
| DaChui| male| 17| 50|
| LiLei| male| 15| 88|
| RuHua| null| 16| null|
+---------+------+---+-----+
#表合并union
#表分组 groupBy
#表分组后聚合,groupBy,agg
#表分组聚合,groupBy,agg。注意F.expr的用法
dfagg = df.groupBy("gender").agg(F.expr("avg(age)"),
F.expr("collect_list(name)"))
dfagg.show()
+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
| null| 16.0| [RuHua]|
|female| 16.0| [HanMeiMei]|
| male| 16.0| [LiLei, DaChui]|
+------+--------+------------------+
#表分组后透视,groupBy,pivot
#窗口函数
df = spark.createDataFrame([("LiLei",78,"class1"),
("HanMeiMei",87,"class1"),
("DaChui",65,"class2"),
("RuHua",55,"class2")]) \
.toDF("name","score","class")
df.show()
dforder = df.selectExpr("name","score","class",
"row_number() over (partition by class order by score desc) as order")
dforder.show()
+---------+-----+------+
| name|score| class|
+---------+-----+------+
| LiLei| 78|class1|
|HanMeiMei| 87|class1|
| DaChui| 65|class2|
| RuHua| 55|class2|
+---------+-----+------+
+---------+-----+------+-----+
| name|score| class|order|
+---------+-----+------+-----+
| DaChui| 65|class2| 1|
| RuHua| 55|class2| 2|
|HanMeiMei| 87|class1| 1|
| LiLei| 78|class1| 2|
+---------+-----+------+-----+
#爆炸函数
import pyspark.sql.functions as F
students = [("LiLei","Swim|Sing|FootBall"),("Ann","Sing|Dance"),("LiLy","Reading|Sing|Dance")]
dfstudents = spark.createDataFrame(students,["name","hobbies"])
dfstudents.show()
dfstudents.createOrReplaceTempView("students")
#explode一行转多行,通常搭配LATERAL VIEW使用
dfhobby = spark.sql("select name,hobby from students LATERAL VIEW explode(split(hobbies,'\\\\|')) tmp as hobby") #注意特殊字符作为分隔符要加四个斜杠
dfhobby.show()
#统计每种hobby有多少同学喜欢
dfcount = dfhobby.groupBy("hobby").agg(F.expr("count(name) as cnt"))
dfcount.show()
+-----+------------------+
| name| hobbies|
+-----+------------------+
|LiLei|Swim|Sing|FootBall|
| Ann| Sing|Dance|
| LiLy|Reading|Sing|Dance|
+-----+------------------+
+-----+--------+
| name| hobby|
+-----+--------+
|LiLei| Swim|
|LiLei| Sing|
|LiLei|FootBall|
| Ann| Sing|
| Ann| Dance|
| LiLy| Reading|
| LiLy| Sing|
| LiLy| Dance|
+-----+--------+
+--------+---+
| hobby|cnt|
+--------+---+
| Swim| 1|
|FootBall| 1|
| Sing| 3|
| Reading| 1|
| Dance| 2|
+--------+---+
import pyspark.sql.functions as F
students = [("LiLei",89,76,65),("HanMeiMei",97,98,89),("Lucy",66,55,70)]
dfstudents = spark.createDataFrame(students,["name","math","physics","music"])
dfstudents.show()
#array类型
print("array类型")
dfarray = dfstudents.selectExpr("name","array(math,physics,music) as score")
dfarray.show()
dfarray.selectExpr("name","score[0] as math").show()
#struct类型
print("struct类型")
dfstruct = dfstudents.selectExpr("name","struct('math',math,'physics',physics,'music',music) as score")
dfstruct.show()
dfstruct.selectExpr("name","score.physics").show()
#map类型
print("map类型")
dfmap = dfstudents.selectExpr("name","map('math',math,'physics',physics,'music',music) as score")
dfmap.show()
dfmap.selectExpr("name","score['math'] as math").show()
+---------+----+-------+-----+
| name|math|physics|music|
+---------+----+-------+-----+
| LiLei| 89| 76| 65|
|HanMeiMei| 97| 98| 89|
| Lucy| 66| 55| 70|
+---------+----+-------+-----+
array类型
+---------+------------+
| name| score|
+---------+------------+
| LiLei|[89, 76, 65]|
|HanMeiMei|[97, 98, 89]|
| Lucy|[66, 55, 70]|
+---------+------------+
+---------+----+
| name|math|
+---------+----+
| LiLei| 89|
|HanMeiMei| 97|
| Lucy| 66|
+---------+----+
struct类型
+---------+--------------------+
| name| score|
+---------+--------------------+
| LiLei|[math,89,physics,...|
|HanMeiMei|[math,97,physics,...|
| Lucy|[math,66,physics,...|
+---------+--------------------+
+---------+-------+
| name|physics|
+---------+-------+
| LiLei| 76|
|HanMeiMei| 98|
| Lucy| 55|
+---------+-------+
map类型
+---------+--------------------+
| name| score|
+---------+--------------------+
| LiLei|Map(math -> 89, p...|
|HanMeiMei|Map(math -> 97, p...|
| Lucy|Map(math -> 66, p...|
+---------+--------------------+
+---------+----+
| name|math|
+---------+----+
| LiLei| 89|
|HanMeiMei| 97|
| Lucy| 66|
+---------+----+
#json构造(to_json)和解析(get_json_object)
#构造学生数据
dfstudents = spark.createDataFrame([("LiLei","Math",70),("LiLei","English",87)
,("HanMeimei","Math",80),("HanMeimei","English",90)]).toDF("name","course","score")
print("dfstudents:")
dfstudents.show()
#构造named_struct类型
dfnamed_struct = dfstudents.selectExpr("name","named_struct('course',course,'score',score) as scores")
print("dfnamed_struct:")
dfnamed_struct.show()
#构造array(named_struct)类型
dfagg = dfnamed_struct.groupby("name").agg(F.expr("collect_list(scores) as arr_scores"))
print("dfagg:")
dfagg.show()
#转换成json
dfjson = dfagg.selectExpr("name","to_json(arr_scores) as json_scores")
print("dfjson:")
dfjson.show()
#使用get_json_object解析json
dfscores = dfjson.selectExpr("name",
"get_json_object(json_scores,'$[0].score') as Math",
"get_json_object(json_scores,'$[1].score') as English",)
print("dfscores:")
dfscores.show()
dfstudents:
+---------+-------+-----+
| name| course|score|
+---------+-------+-----+
| LiLei| Math| 70|
| LiLei|English| 87|
|HanMeimei| Math| 80|
|HanMeimei|English| 90|
+---------+-------+-----+
dfnamed_struct:
+---------+------------+
| name| scores|
+---------+------------+
| LiLei| [Math,70]|
| LiLei|[English,87]|
|HanMeimei| [Math,80]|
|HanMeimei|[English,90]|
+---------+------------+
dfagg:
+---------+--------------------+
| name| arr_scores|
+---------+--------------------+
| LiLei|[[Math,70], [Engl...|
|HanMeimei|[[Math,80], [Engl...|
+---------+--------------------+
dfjson:
+---------+--------------------+
| name| json_scores|
+---------+--------------------+
| LiLei|[{"course":"Math"...|
|HanMeimei|[{"course":"Math"...|
+---------+--------------------+
dfscores:
+---------+----+-------+
| name|Math|English|
+---------+----+-------+
| LiLei| 70| 87|
|HanMeimei| 80| 90|
+---------+----+-------+
DataFrame的SQL交互
将DataFrame注册为临时表视图或者全局表视图后,可以使用sql语句对DataFrame进行交互。不仅如此,还可以通过SparkSQL对Hive表直接进行增删改查等操作。
注册视图后进行SQL交互
注册为临时表视图:createOrReplaceTempView,生命周期和SparkSession相关联;
注册为全局临时表视图:createOrReplaceGlobalTempView,其生命周期和整个Spark应用程序关联(spark.newSession().sql)
#注册为临时表视图, 其生命周期和SparkSession相关联
df = spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")],
("name","age","gender"))
df.show()
df.createOrReplaceTempView("student")
dfmale = spark.sql("select * from student where gender='male'")
dfmale.show()
+---------+---+------+
| name|age|gender|
+---------+---+------+
| LiLei| 18| male|
|HanMeiMei| 17|female|
| Jim| 16| male|
+---------+---+------+
+-----+---+------+
| name|age|gender|
+-----+---+------+
|LiLei| 18| male|
| Jim| 16| male|
+-----+---+------+
#注册为全局临时表视图,其生命周期和整个Spark应用程序关联
df.createOrReplaceGlobalTempView("student")
query = """
select t.gender
, collect_list(t.name) as names
from global_temp.student t
group by t.gender
""".strip("\n")
spark.sql(query).show()
#可以在新的Session中访问
spark.newSession().sql("select * from global_temp.student").show()
+------+------------+
|gender| names|
+------+------------+
|female| [HanMeiMei]|
| male|[LiLei, Jim]|
+------+------------+
+---------+---+------+
| name|age|gender|
+---------+---+------+
| LiLei| 18| male|
|HanMeiMei| 17|female|
| Jim| 16| male|
+---------+---+------+
对Hive表进行增删改查操作
创建:CREATE TABLE IF NOT EXISTS
删除:DROP TABLE IF EXISTS
动态写入数据到hive分区表:spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") #注意此处有一个设置操作
dfstudents = spark.createDataFrame([("LiLei",18,"class1","male"),
("HanMeimei",17,"class2","female"),
("DaChui",19,"class2","male"),
("Lily",17,"class1","female")])
.toDF("name","age","class","gender")
dfstudents.show()
#动态写入分区
dfstudents.write.mode("overwrite").format("hive")\
.partitionBy("class","gender").saveAsTable("students")
#写入到静态分区
dfstudents = spark.createDataFrame([("Jim",18,"class3","male"),
("Tom",19,"class3","male")])
.toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass3")
#INSERT INTO 尾部追加, INSERT OVERWRITE TABLE 覆盖分区
query = """
INSERT OVERWRITE TABLE `students`
PARTITION(class='class3',gender='male')
SELECT name,age from dfclass3
""".replace("\n"," ")
spark.sql(query)
#写入到混合分区
dfstudents = spark.createDataFrame([("David",18,"class4","male"),
("Amy",17,"class4","female"),
("Jerry",19,"class4","male"),
("Ann",17,"class4","female")])
.toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass4")
query = """
INSERT OVERWRITE TABLE `students`
PARTITION(class='class4',gender)
SELECT name,age,gender from dfclass4
""".replace("\n"," ")
spark.sql(query)
dfdata = spark.sql("select * from students")
dfdata.show()
+---------+---+------+------+
| name|age| class|gender|
+---------+---+------+------+
| Ann| 17|class4|female|
| Amy| 17|class4|female|
|HanMeimei| 17|class2|female|
| DaChui| 19|class2| male|
| LiLei| 18|class1| male|
| Lily| 17|class1|female|
| Jerry| 19|class4| male|
| David| 18|class4| male|
| Jim| 18|class3| male|
| Tom| 19|class3| male|
+---------+---+------+------+
#删除分区
query = """
ALTER TABLE `students`
DROP IF EXISTS
PARTITION(class='class3')
""".replace("\n"," ")
spark.sql(query)
#查看剩下数据
dfremain = spark.sql("select * from students")
dfremain.show()
+---------+---+------+------+
| name|age| class|gender|
+---------+---+------+------+
| Jerry| 19|class4| male|
| David| 18|class4| male|
| LiLei| 18|class1| male|
| DaChui| 19|class2| male|
| Lily| 17|class1|female|
|HanMeimei| 17|class2|female|
| Ann| 17|class4|female|
| Amy| 17|class4|female|
+---------+---+------+------+
SparkSQL实战
求平均数
#任务:求data的平均值
data = [1,5,7,10,23,20,6,5,10,7,10]
dfdata = spark.createDataFrame([(x,) for x in data]).toDF("value")
dfagg = dfdata.agg({"value":"avg"})
dfagg.show()
+-----------------+
| avg(value)|
+-----------------+
|9.454545454545455|
+-----------------+
求众数
#任务:求data中出现次数最多的数,若有多个,求这些数的平均值
from pyspark.sql import functions as F
data = [1,5,7,10,23,20,7,5,10,7,10]
dfdata = spark.createDataFrame([(x,1) for x in data]).toDF("key","value")
dfcount = dfdata.groupby("key").agg(F.count("value").alias("count")).cache()
max_count = dfcount.agg(F.max("count").alias("max_count")).take(1)[0]["max_count"]
dfmode = dfcount.where("count={}".format(max_count))
mode = dfmode.agg(F.expr("mean(key) as mode")).take(1)[0]["mode"]
print("mode:",mode)
dfcount.unpersist()
mode: 8.5
求topN
#任务:有一批学生信息表格,包括name,age,score, 找出score排名前3的学生, score相同可以任取
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
n = 3
dfstudents = spark.createDataFrame(students).toDF("name","age","score")
dftopn = dfstudents.orderBy("score", ascending=False).limit(n)
dftopn.show()
+---------+---+-----+
| name|age|score|
+---------+---+-----+
| LiLei| 18| 87|
|HanMeiMei| 16| 77|
| Jim| 18| 77|
+---------+---+-----+
排序并返回序号
#任务:按从小到大排序并返回序号, 大小相同的序号可以不同
data = [1,7,8,5,3,18,34,9,0,12,8]
from copy import deepcopy
from pyspark.sql import types as T
from pyspark.sql import Row,DataFrame
def addLongIndex(df, field_name):
schema = deepcopy(df.schema)
schema = schema.add(T.StructField(field_name, T.LongType()))
rdd_with_index = df.rdd.zipWithIndex()
def merge_row(t):
row,index= t
dic = row.asDict()
dic.update({field_name:index})
row_merged = Row(**dic)
return row_merged
rdd_row = rdd_with_index.map(lambda t:merge_row(t))
return spark.createDataFrame(rdd_row,schema)
dfdata = spark.createDataFrame([(x,) for x in data]).toDF("value")
dfsorted = dfdata.sort(dfdata["value"])
dfsorted_index = addLongIndex(dfsorted,"index")
dfsorted_index.show()
+-----+-----+
|value|index|
+-----+-----+
| 0| 0|
| 1| 1|
| 3| 2|
| 5| 3|
| 7| 4|
| 8| 5|
| 8| 6|
| 9| 7|
| 12| 8|
| 18| 9|
| 34| 10|
+-----+-----+
二次排序
#任务:有一批学生信息表格,包括name,age,score
#首先根据学生的score从大到小排序,如果score相同,根据age从大到小
students = [("LiLei",18,87),("HanMeiMei",16,77),("DaChui",16,66),("Jim",18,77),("RuHua",18,50)]
dfstudents = spark.createDataFrame(students).toDF("name","age","score")
dfsorted = dfstudents.orderBy(dfstudents["score"].desc(),dfstudents["age"].desc())
dfsorted.show()
+---------+---+-----+
| name|age|score|
+---------+---+-----+
| LiLei| 18| 87|
| Jim| 18| 77|
|HanMeiMei| 16| 77|
| DaChui| 16| 66|
| RuHua| 18| 50|
+---------+---+-----+
连接操作
#任务:已知班级信息表和成绩表,找出班级平均分在75分以上的班级
#班级信息表包括class,name,成绩表包括name,score
from pyspark.sql import functions as F
classes = [("class1","LiLei"), ("class1","HanMeiMei"),("class2","DaChui"),("class2","RuHua")]
scores = [("LiLei",76),("HanMeiMei",80),("DaChui",70),("RuHua",60)]
dfclass = spark.createDataFrame(classes).toDF("class","name")
dfscore = spark.createDataFrame(scores).toDF("name","score")
dfstudents = dfclass.join(dfscore,on ="name" ,how = "left")
dfagg = dfstudents.groupBy("class").agg(F.avg("score").alias("avg_score")).where("avg_score>75.0")
dfagg.show()
+------+---------+
| class|avg_score|
+------+---------+
|class1| 78.0|
+------+---------+
分组求众数
#任务:有一批学生信息表格,包括class和age。求每个班级学生年龄的众数。
students = [("class1",15),("class1",15),("class2",16),("class2",16),("class1",17),("class2",19)]
from pyspark.sql import functions as F
def mode(arr):
dict_cnt = {}
for x in arr:
dict_cnt[x] = dict_cnt.get(x,0)+1
max_cnt = max(dict_cnt.values())
most_values = [k for k,v in dict_cnt.items() if v==max_cnt]
s = 0.0
for x in most_values:
s = s + x
return s/len(most_values)
spark.udf.register("udf_mode",mode)
dfstudents = spark.createDataFrame(students).toDF("class","score")
dfscores = dfstudents.groupBy("class").agg(F.collect_list("score").alias("scores"))
dfmode = dfscores.selectExpr("class","udf_mode(scores) as mode_score")
dfmode.show()
+------+----------+
| class|mode_score|
+------+----------+
|class2| 16.0|
|class1| 15.0|
+------+----------+
参考资料
结尾
亲爱的读者朋友:感谢您在繁忙中驻足阅读本期内容!您的到来是对我们最大的支持❤️
正如古语所言:"当局者迷,旁观者清"。您独到的见解与客观评价,恰似一盏明灯💡,能帮助我们照亮内容盲区,让未来的创作更加贴近您的需求。
若此文给您带来启发或收获,不妨通过以下方式为彼此搭建一座桥梁: ✨ 点击右上角【点赞】图标,让好内容被更多人看见 ✨ 滑动屏幕【收藏】本篇,便于随时查阅回味 ✨ 在评论区留下您的真知灼见,让我们共同碰撞思维的火花
我始终秉持匠心精神,以键盘为犁铧深耕知识沃土💻,用每一次敲击传递专业价值,不断优化内容呈现形式,力求为您打造沉浸式的阅读盛宴📚。
有任何疑问或建议?评论区就是我们的连心桥!您的每一条留言我都将认真研读,并在24小时内回复解答📝。
愿我们携手同行,在知识的雨林中茁壮成长🌳,共享思想绽放的甘甜果实。下期相遇时,期待看到您智慧的评论与闪亮的点赞身影✨!
万分感谢🙏🙏您的点赞👍👍、收藏⭐🌟、评论💬🗯️、关注❤️💚~
自我介绍:一线互联网大厂资深算法研发(工作6年+),4年以上招聘面试官经验(一二面面试官,面试候选人400+),深谙岗位专业知识、技能雷达图,已累计辅导15+求职者顺利入职大中型互联网公司。熟练掌握大模型、NLP、搜索、推荐、数据挖掘算法和优化,提供面试辅导、专业知识入门到进阶辅导等定制化需求等服务,助力您顺利完成学习和求职之旅(有需要者可私信联系)
友友们,自己的知乎账号为“快乐星球”,定期更新技术文章,敬请关注!