文章目录
1–Spark—core
2.1–RDD的创建
2.1.1–并行化创建
概念:并行化创建是指:将本地集合=>分布式RDD,这一步就是分布式的开端:本地转分布式
API:
- parallelize(参数1,参数2)
- 参数1 集合对象即可 比如list
- 参数2 分区数,不写默认是电脑的线程数
# 导入spark相关的包
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
# 初始化sparkcontext的对象
conf = SparkConf().setAppName("text").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 通过并行化的方式创建RDD
rdd= sc.parallelize([1,2,3,4,5,6,7,8,9])
# parallelize没有给分区数 默认是电脑cpu的线程
print("默认分区",rdd.getNumPartitions())
rdd_par = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
print("分区数",rdd_par.getNumPartitions())
# 打印rdd的内容
# collect方法是将rdd中的每个分区的数据都发送到Driver,
# 形成一个python的list的对象 分布式对象==>本地集合
print("内容",rdd_par.collect())
2.1.2–获取分区数
getNumPartitions() 获取RDD的分区数量 返回值是Int数字
API:
- rdd.getNumPartitions()
2.1.3–读取文件创建RDD
textFile
textFile():可以读取本地数据,也可以读取hdfs数据
API:
sparkcontext.textFile(参数1,参数2)
参数1:必填,文件路径支持本地,支持HDFS,也支持一些比如S3协议
参数2:可选,表示最小分区数量
注意:参数2话语权不足,spark有自己的判断,在它的允许的范围内,参数2才有效果,超出spark允许的范围,参数2就失效
读取本地文件:
file_rdd1 = sc.textFile("../data/input/a.txt")
print("默认读取分区数:",file_rdd1.getNumPartitions())
print("内容",file_rdd1.collect())
参数2的用法:
# 加最小分区数测试 最小分区数只是个参考值,spark有自己的判断
file_rdd2 = sc.textFile("../data/input/a.txt",5)
file_rdd3 = sc.textFile("../data/input/a.txt",100)
print("rdd2的分区数:",file_rdd2.getNumPartitions())
print("rdd3的分区数",file_rdd3.getNumPartitions())
读取HDFS文件:
hdfs_rdd = sc.textFile("hdfs://node1:8020/input/b.txt")
print("hdfs_rdd分区数:",hdfs_rdd.getNumPartitions())
print("hdfs_rdd内容:",hdfs_rdd.collect())
wholeTextFile
wholeTextFile()适合读取一堆小文件,这个API适合读取小文件,因此文件的数据很小,分区很多,导致shuffle的几率更高,所以尽量少分区读取数据
API:
- sparkcontext.wholeTextFile(参数1,参数2)
- 参数1:必填,文件夹的目录,支持本地,支持HDFS,也支持一些比如S3协议
- 参数2:可选,表示最小分区数量
- 注意:参数2话语权不足,这API的分区数量最多只能开到文件数量
rdd = sc.wholeTextFiles("../data/input/tiny_files")
print(rdd.map(lambda x:x[1]).collect())
返回结果为二元组的形式展示, 前一个值是文件路径, 后一个值为文件内容
2.2–RDD算子
2.2.1–算子概念
算子:分布式集合对象上的API称之为算子
分类:
- Transformation:转换算子
- Action:动作(行动)算子
转换算子:
- 定义:RDD的算子,返回值仍然是一个RDD的,称之为转换算子
- 特性:这类算子是lazy 懒加载的,如果没有action算子,Transformation算子是不工作的
动作算子:
- 定义:返回值不是RDD的就是action算子
2.2.2–Transformation算子
map
map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
API:
- rdd.map(func)
- func : f:(T)=>U
- f:表示这是一个函数(方法)
- (T)=> U 表示方法传入参数为任意了类型,返回值也是任意类型
- (A)=> A 表示传入参数为任意类型,返回值也是任意类型。但是传入值和返回值类型相同
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
# 定义方法作为算子的传入
def add(data):
return data*10
print(rdd.map(add).collect())
# 更简单的方式是定义lamda表达式来写的匿名函数
print(rdd.map(lambda data: data * 10).collect())
# 两种方式都可以,第一种适合复杂函数,第二种适合一行解决的函数
#结果
[10, 20, 30, 40, 50, 60, 70, 80, 90]
[10, 20, 30, 40, 50, 60, 70, 80, 90]
flatMap
flatMap对rdd执行map操作,然后进行解除嵌套操作.
API:
- rdd.flatMap(func)
- func : f:(T)=>U
eg:
- 嵌套的list: list[[1,2,3,4,5,6],[7,8],[9]]
- 解除嵌套的list: list[1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(["hadoop spark hadoop","hadoop flink","spark hadoop"])
rdd2 = rdd.map(lambda line: line.split(" "))
# flatmap无需传参数就可以直接解除嵌套
rdd3 = rdd.flatMap(lambda line:line.split(" "))
#结果
['hadoop', 'spark', 'hadoop', 'hadoop', 'flink', 'spark', 'hadoop']
reduceByKey
针对(K,V)型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作
API:
- rdd.reduceByKey(func)
- func:(v,v)=>v
- 接受两个传入参数(类型一致),返回一个值,类型和传入的要求一致
rdd = sc.parallelize([('a',1),('b',1),('c',1),('a',1),('a',1)])
print(rdd.reduceByKey(lambda a, b: a + b).collect())
#结果
[('b', 1), ('c', 1), ('a', 3)]
mapValues
分区操作算子 就只对value 进行操作
API:
rdd.mapValue(func)
rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('a',5)])
# 就只对value 进行操作
print(rdd.mapValues(lambda value: value * 10).collect())
#结果
groupBy
将RDD的数据进行分组
API:
- rdd.groupBy(func)
- func(T)=>K
- 函数要求传入一个参数,返回一个返回值,类型无所谓
- 这个函数是拿到你的输入值后,将所有相同输入值的放在一个组里
- 分组完成后,每个组是一个二元组,key就是输入值,所有的数据都放入一个迭代器里作为value
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
# groupby 传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)
result_rdd = rdd.groupBy(lambda t:t[0])
print(result_rdd.collect())
# 将value的值强制转换,才能查看迭代器的内容
print(result_rdd.map(lambda t: (t[0], list(t[1]))).collect())
#结果
filter
过滤想要的数据进行保留
API:
- rdd.filter(func)
- func(T)=>bool
- 传入任意类型的参数,返回值是False或者True
rdd = sc.parallelize([1,2,3,4,5])
# 通过filter算子过滤出奇数
result_rdd = rdd.filter(lambda x:x%2 == 1)
print(result_rdd.collect())
#结果
distinct
对RDD数据进行去重,返回新的RDD
API:
- rdd.distinct(参数1)
- 参数1,去重分区数量,一般不用传
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 1, 2, 5, 9, 3])
# 去重
print(rdd.distinct().collect())
#结果
union
2个RDD合并成1个RDD返回
API:
- rdd.union(other_rdd)
- 注意只会合并不会去重
- 不同类型的RDD也可以混合
- 合并后的分区数是rdd1和rdd2的分区数和
rdd = sc.parallelize([1, 2, 3, 1, 2, 5, 9, 3])
rdd1 = sc.parallelize(["c", "b", "a"])
# 将两个rdd进行合并
print(rdd.union(rdd1).collect())
#结果
join
分区操作算子 对两个RDD进行Join操作(可实现sql的内\外连接)
API:
注意:join算子只能用于二元元组
rdd.join(other_rdd) 内连接
rdd.leftOuterJoin(other_rdd)左外连接
rdd.rightOuterJoin(other_rdd)右外连接
rdd1 = sc.parallelize([(1001,"zhangshan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")])
rdd2 = sc.parallelize([(1001,"销售部"),(1002,"科技部")])
# 通过join来进行rdd间的关联
# 对于join来说,关联条件按照二元组key来进行关联
print(rdd1.join(rdd2).collect())
# 左外连接
print(rdd1.leftOuterJoin(rdd2).collect())
# 右外连接
print(rdd1.rightOuterJoin(rdd2).collect())
#结果
intersection
求2个RDD的交集,返回一个新的rdd
API:
- rdd.intersection(other_rdd)
rdd1 = sc.parallelize([1,2,3,4,'a','b'])
rdd2 = sc.parallelize([2,3,4,'b'])
# 求rdd间的交集
print(rdd1.intersection(rdd2).collect())
#结果
glom
将RDD的数据加上嵌套,这个嵌套按照分区来进行
API:
- rdd.glom()
eg:
- rdd数据为[1,2,3,4,5]有两个分区,那么glom后,数据就有可能变成[[1,2,3],[4,5]]
rdd = sc.parallelize([1,2,3,4,5,6,7,8],3)
# glom可以查看分区排布
print(rdd.glom().collect())
#结果
groupByKey
针对KV型RDD,自动按照key分组
API:
- rdd.groupByKey()
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
# 与groupby相同,只是不需要指定 直接就是按照key分组的,
# 并且第二个值是相同key的value的集合
rdd1 = rdd.groupByKey()
rdd2 = rdd1.map(lambda x:(x[0],list(x[1])))
print(rdd2.collect())
#结果
[('b', [1, 1, 1]), ('a', [1, 1])]
sortBy
对RDD数据进行排序,基于你指定的排序依据
API:
rdd.sortBy(func,ascending=False,numPartition=1)
func: (T)=>U 告知按照rdd中的哪个数据进行排序 比如:lambda x:x[1] 表示按照rdd中的第二列元素进行排序
ascending True升序 False降序
numPartition 用多少分区进行排序
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c', 1)],3)
# 使用sortby对数据进行排序
# 按照value进行排序
# 注意 如果要全局有序 排序分区数应设为1
rdd1 = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=1)
print(rdd1.collect())
#结果
[('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 3)]
sortByKey
针对KV型RDD,按照key进行排序
API:
- sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD.>
- ascending:升序or降序,True升序, False降序,默认是升序
- numPartitions:按照几个分区进行排序,如果全局有序,设置1
- keyfunc:在排序前对key进行处理,语法是: (k)→ U ,一个参数传入,返回一个值
rdd = sc.parallelize([('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)],3)
# keyfunc = lambda key: str(key).lower() 忽略大小写的影响
rdd1 = rdd.sortByKey(ascending=True,numPartitions=3,keyfunc=lambda key:str(key).lower())
print(rdd1.collect())
#结果
[('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)]
keys
概述: rdd必须是键值对类型的数据, 获取键值对所有的键(key).
values
概述: rdd必须是键值对类型的数据, 获取键值对所有的值(value).
2.2.3–Action算子
countByKey
统计key出现的次数(一般适用于KV型的RDD)
rdd = sc.textFile("../data/input/a.txt")
rdd2 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
# 通过countbykey来对key进行计数 这是一个Action算子 不能colloct
result = rdd2.countByKey()
print(result)
print(type(result))
#结果
defaultdict(<class 'int'>, {'hadoop': 2, 'hello': 2, 'zxz': 2, 'wrx': 2})
<class 'collections.defaultdict'>
collect
将RDD各个分区内的数据,统一收集到Driver中,形成List对象
API:
- rdd.collect()
- 这个算子,是将RDD各个分区数据都拉取到Driver
- 注意的是, RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明的了解结果数据集不会太大。不然,会把Driver内存撑爆
reduce
对RDD数据集按照传入的逻辑进行聚合
API:
- rdd.reduce(func)
- func:(T,T)=>T
- 两个参数传入,一个返回值,返回值和参数要求类型一致
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
# reduce是action算子
result = rdd.reduce(lambda a,b:a+b)
print(result)
#结果
45
fold
和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的,这个初始值聚合会作用在分区内和分区间
API:
- rdd,fold(初始值,func)
eg:
- 比如:[[1,2,3],[4,5,6],[7,8,9]]数据分布在3个分区
- 分区1 123聚合的时候带上10作为初始值得到16
- 分区2 456聚合的时候带上10作为初始值得到25
- 分区3 789聚合的时候带上10作为初始值得到34
- 3个分区的结果做聚合也带上初始值10,所以结果是: 10+16+ 25+34=85
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
# print(rdd.glom().collect())
result = rdd.fold(10,lambda a,b:a+b)
print(result)
#结果
85
aggregate
和fold算子相同, 唯一不同的是可以分别定义分区内计算函数和分区外计算函数
rdd1 = sc.parallelize(range(5), 2)
print(rdd1.glom().collect())
# aggregate算子演示
seqOp = (lambda x,y:x+y)
comOp = (lambda x,y:x+y+1)
print(rdd1.aggregate(2, seqOp, comOp))
下图所示的是三个算子执行时的计算过程, 这里使用的是累加的过程
fold和aggregate计算过程是一样的, 但是唯一区别是aggregate可以指定分区内外的计算方式不同.
设置初始值的算子,初始值被加分区数n+1次
first
取出RDD第一个元素
rdd = sc.parallelize([1,4,3,2,7,6,2,1,9])
print(rdd.first())
#结果
1
take
取RDD的前N个元素,组合成List返回
注意: take是将rdd中前n个元素拉取到Driver所在主机的内存中, 由于driver的内存是有限的, 数据过多会造成driver的内存溢出.
print(rdd.take(4))
#结果
[1, 4, 3, 2]
top
对RDD数据集进行降序排序,取前N个
print(rdd.top(3))
#结果
[9, 7, 6]
count
计算RDD有多少个数据,返回值是个数字
print(rdd.count())
#结果
9
takeSample
随机抽样RDD的数据
API:
- takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
- 参数1:True表示运行取同一个数据,False表示不允许取同一个数据. 和数据内容无关,是否重复表示的是同一个位置的数据
- 参数2:抽样要几个
- 参数3∶随机数种子,这个参数传入—个数字即可,随意给
rdd = sc.parallelize([1,2,1,4,1,6,7,1,2])
print(rdd.takeSample(False, 5,1))
#结果
[6, 7, 1, 1, 4]
takeOrdered
对RDD进行排序取前N个
API:
- rdd.takeordered(参数1,参数2)
- 参数1要几个数据
- 参数2对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
- 这个方法使用按照元素自然顺序升序排序,如果你想玩倒叙,需要用参数2来对排序的数据进行处理
rdd = sc.parallelize([1, 2, 9, 2, 1, 5, 7, 3, 9])
# 默认是升序并选取前n个元素
print(rdd.takeOrdered(3))
# 增加匿名函数可以使其降序排序并选取前n个 == top
print(rdd.takeOrdered(3, lambda x: -x))
#结果
[1, 1, 2]
[9, 9, 7]
foreach
对RDD的每一个元素,执行提供的逻辑操作(和map一样),但这个方法没有返回值
注意: 但是它执行的时候, 每个分区会抢占cpu资源,从而会导致数据乱序的情况
API:
- rdd.foreach(func)
- func(T)=>None
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
result = rdd.foreach(lambda x: x * 10)
# foreach对元素进行处理后没有返回值
print(result)
# 这种方式的输出是由executor输出的而不是drive
result = rdd.foreach(lambda x: print(x * 10))
#结果
None
50
60
70
80
90
10
20
30
40
saveAsTextFile
将RDD的数据写入文本文件,支持本地写出,hdfs等文件系统
- 注意:写出的时候,每个分区所在的Task直接控制数据写出到目标文件系统中,所以才会一个分区产生1个结果文件.
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
# 将内容写到本地
# rdd.saveAsTextFile("../data/output/out2")
# 将内容保存到hdfs
rdd.saveAsTextFile("hdfs://node1:8020/input/output/out1")
# saveastextfile中是由executor来写入的
- action算子中foreach和saveAsTextFile算子是分区直接执行的,其余的Action算子都回将结果发送至Driver
collectAsMap
概述: 将二元组的RDD转化为字典(dict). 将它转为rdd,就相当于将其转为本地,不再是弹性分布式数据集rdd了.
2.2.4–分区操作算子
mapPartition
Transformation算子 mapPartition—次被传递的是一整个分区的数据作为一个迭代器(一次性list)对象传入过来
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
def process(iter):
result = list()
for it in iter:
result.append(it*10)
return result
# mapParttion 是将一个分区的值按照list或者迭代器送到计算的地方
# 相比map 可以提高网络传送的性能
print(rdd.mapPartitions(process).collect())
#结果
[10, 20, 30, 40, 50, 60, 70, 80, 90]
foreachPartition
Action算子 和普通的foreach一致,但是一次处理的是一整个分区数据
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
def process(iter):
result = list()
for it in iter:
result.append(it*10)
return print(result)
# 与mapPartition相同,但是不返回值
rdd.foreachPartition(process)
#结果
[10, 20, 30]
[40, 50, 60]
[70, 80, 90]
- foreachPartition就是一个没有返回值的mapPartitions
partitionBy
Transformation算子 对RDD进行自定义分区
API:
- rdd.partitionBy(参数1,参数2)
- 参数1重新分区后有几个分区
- 参数2自定义分区规则,函数传入
- 参数2:(K)→ int 一个传入参数进来,类型无所谓,但是返回值一定是int类型将key传给这个函数,你自己写逻辑,决定返回一个分区编号
- 分区编号从0开始,不要超出分区数-1
rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('d',5)])
# 使用partitionby实现自定义分区
def process(key):
if 'a' == key or 'b' ==key:
return 0
if 'c' == key:
return 1
return 2
print(rdd.partitionBy(3, process).glom().collect())
#结果
repartition
Transformation算子 对RDD的分区执行重新分区(仅数量)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
# repartition修改分区
# 使用该API时,尽量降分区数,因为用该api会影响内存迭代管道
print(rdd.repartition(1).getNumPartitions())
print(rdd.repartition(5).getNumPartitions())
coalesce
Transformation算子 对分区数量增减
API:
- rdd.coalesce(参数1,参数2)
- 参数1,分区数
- 参数2,True or False
- True表示允许shuffle,也就是可以加分区
- False表示不允许shuffle,也就是不能加分区,False是默认
# coalesce 修改分区 只有将shuffle改为true才能按照所给的数分区,
# 要不然会自动进行安全鉴定而进行分区
print(rdd.coalesce(1).getNumPartitions())
print(rdd.coalesce(5,shuffle=True).getNumPartitions())
2.3–RDD缓存
2.3.1–RDD缓存的目的
- RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失。
- RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。
- 这个特性可以最大化的利用资源,老l日RDD没用了就从内存中清理,给后续的计算腾出内存空间。
- RDD的缓存技术: Spark提供了缓存APIl,可以让我们通过调用API,将指定的RDD数据保留在内存或者硬盘上缓存的API
2.3.1–RDD缓存的特点
缓存技术可以将过程RDD数据持久化保存到内存或者硬盘上,但是,这个保存在设定上是认为不安全的。
缓存的数据在设计上是认为有丢失风险的,所以缓存有一个特点就是:其保留RDD之间的血缘(依赖)关系
一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据缓存如何丢失:在内存中的缓存是不安全的,比如断电\计算任务内存不足,把缓存清理给计算让路
硬盘中因为硬盘损坏也是可能丢失的。RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上,这是分散存储
2.3.2–RDD缓存的API
from pyspark.storagelevel import StorageLevel
rdd3.cache() 缓存到内存中
rdd3. persist(StorageLevel.MEMORY_ONLY) 仅内存缓存
rdd3. persist(StorageLevel.MEMORY_ONLY_2) 仅内存缓存,2个副本
rdd3. persist(StorageLevel.DISK_ONLY) 仅缓存硬盘上
rdd3.persist(StorageLevel.DISK_ONLY_2) 仅缓存硬盘上,2个副本
rdd3.persist(storageLevel.DISK_ONLY_3) 仅缓存硬盘上,3个副本
rdd3.persist(StorageLevel.MEMORY_AND_DISK) 先放内存,不够放硬盘
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)先放内存,不够放硬盘,2个副本
rdd3.persist(StorageLevel.OFF_HEAP) 堆外内存(系统内存)
如上API,自行选择使用即可
一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)
如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了用CheckPoint
主动清理缓存的API rdd . unpersist()
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel
if __name__ == '__main__':
conf = SparkConf().setAppName("Text").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd1 = sc.textFile("../data/input/a.txt")
rdd2 = rdd1.flatMap(lambda x:x.split(" "))
rdd3 = rdd2.map(lambda x:(x,1))
# 增加缓存
rdd3.cache()
# 自定义缓存
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)
rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
print(rdd4.collect())
rdd5 = rdd3.groupByKey()
rdd6 = rdd5.mapValues(lambda x:sum(x))
print(rdd6.collect())
# 解除缓存
rdd3.unpersist()
2.3.3–RDD的CheckPoint
CheckPoint技术,也是将RDD的数据,保存起来,但是它仅支持硬盘存储,并且,它被设计认为是安全的不保留血缘关系
特点:
- CheckPoint存储RDD数据,是集中收集各个分区数据进行存储,而缓存是分散存储
- CheckPoint不管分区数量多少,风险是一样的,缓存分区越多,风险越高
- CheckPoint支持写入HDFS,缓存不行, HDFS是高可靠存储, CheckPoint被认为是安全的.
- CheckPoint不支持内存,缓存可以,缓存如果写内存性能比CheckPoint要好一些
- CheckPoint因为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留
使用:
CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适,或者数据量很大,用CheckPoint比较合适.
如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要,直接缓存即可.
Cache和CheckPoint两个API都不是Action类型,所以想要它俩工作,必须在后面接上Action,接上Action的目的,是让RDD有数据,而不是为了让checkPoint和Cache工作。
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel
if __name__ == '__main__':
conf = SparkConf().setAppName("Text").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 1、告知spark开启checkpoint功能
sc.setCheckpointDir("hdfs://node1:8020/input/output/ckp")
rdd1 = sc.textFile("../data/input/a.txt")
rdd2 = rdd1.flatMap(lambda x:x.split(" "))
rdd3 = rdd2.map(lambda x:(x,1))
# 2、调用checkpoint的api 保存数据即可
rdd3.checkpoint()
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print(rdd4.collect())
rdd5 = rdd3.groupByKey()
rdd6 = rdd5.mapValues(lambda x: sum(x))
print(rdd6.collect())
# 解除缓存
rdd3.unpersist()
2.4–广播变量
2.4.1–概念
本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了2份一样的数据。
executor是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。
如果将本地list对象标记为广播变量对象,那么当上述场景出现的时候,Spark只会,给每个Executor来一份数据,而不是像原本那样,每一个分区的处理线程都来一份,节省内存.
2.4.2–API
1.将本地list标记成广播变量是即可
broadcast = sc. broadcast(stu_info_list)
2.使用广播变量,从broadcast对象中取出本地list对象即可value = broadcast.value
- 也就是先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了
- 只要中间传翰的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区要都给.
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("text").setMaster("local[*]")
sc = SparkContext(conf=conf)
stu_info_list = {
(1,'zxz',11),
(2,'wrx',13),
(3,'小明',11),
(4,'王大力',11)
}
# 1.将本地python list对象标记为广播变量
broadcast = sc.broadcast(stu_info_list)
score_info_rdd = sc.parallelize([
(1,'语文',99),
(2,'数学',99),
(3,'英语',99),
(4,'数学',99),
(2,'编程',99),
(4,'语文',99),
(1,'数学',99)
])
def map_func(data):
id = data[0]
name = ''
# 匹配分布list和rdd分布式的id
# 2.在使用到本地集合对象的地方,从广播变量中取出来用即可
for stu_info in broadcast.value:
stu_id = stu_info[0]
if id == stu_id:
name = stu_info[1]
return (name,data[1],data[2])
print(score_info_rdd.map(map_func).collect())
"""
场景:本地集合对象和分布式对象(RDD)进行关联的时候
需要将本地集合封装为广播变量
可以节省:
1.网络io的次数
2.Executor的内存占用
"""
#结果
2.5–累加器
2.5.1–需求
想要对map算子计算中的数据,进行计数累加,得到全部数据计算完后的累加结果
2.5.2–没有累加器的代码演示
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("text").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
count = 0
def map_func(data):
global count
count += 1
print(count)
rdd.map(map_func).collect()
print(count)
# 可以看出count最外面的值没有变化
#结果
0
2.5.3–增加累加器的代码
原因:
count来自driver对象,当在分布式的map算子中需要count对象的时候,driver会将count对象发送给每一个executor一份(复制发送),每个executor各自收到一个,在最后执行print(count)的时候,这个被打印的count依旧是driver的那个,所以不管executor中累加到多少,都和driver这个count无关。
API:
- sc.accumulator(初始值)
- 这个对象唯一和前面提到的count不同的是
- 这个对象可以从各个Executor中收集它们的执行结果,作用回自己身上
from pyspark import SparkContext,SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("text").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
# spark提供的累加器变量,参数是初始值
scmlt = sc.accumulator(0)
def map_func(data):
global scmlt
scmlt += 1
# print(scmlt)
rdd2 = rdd.map(map_func)
rdd2.collect()
rdd3 = rdd2.map(lambda x:x)
rdd3.collect()
print(scmlt)
#结果
20
如上代码,第一次rdd2被action后,累加器值是10,然后rdd2就没有了(没数据了),当rdd3构建出来的时候,是依赖rdd2的, rdd2没数据,那么rdd2就要重新生成.重新生成就导致累加器累加数据的代码再次被执行,所以代码的结果是20。
注意事项:
也就是,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用可能会重新构建此rdd如果累加器累加代码,存在重新构建的步骤中,累加器累加代码就可能被多次执行。
如何解决:加缓存或者checkPoint即可.
2.6–python程序对hdfs文件操作
from hdfs.client import Client
client = Client('http://node1:9870')
# 创建文件夹
client.makedirs('/datas/aaa')
# 删除文件夹
client.delete('/datas/output')
# 上传文件
client.upload('/datas/aaa/a.txt','E:/aa/a.txt')
# 下载文件
client.download('/datas/a.txt','E:/a.txt')
2–SparkSQL
3.1–DataFrame
3.1.1–SparkSession对象环境的创建
- SparkSession对象
# Sparksession对象的导包
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建session环境入口
spark = SparkSession.builder.appName("test").master('local[*]') \
.getOrCreate()
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
# 通过SparkSession对象获取SparkContext对象
sc = spark.sparkContext
- 用于SparkSQL编程作为入口对象
# SparkSql的helloworld
df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False)
df2 = df.toDF("id", "name", "score")
df2.printSchema()
df2.show()
df2.createTempView("score")
# SQL风格
spark.sql("""
SELECT * FROM score WHERE name='语文' LIMIT 5
""").show()
- 所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象
3.1.2–DataFrame的组成
DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
- 行
列 - 表结构描述
比如,在MySQL中的一张表:
- 由许多行组成
- 数据也被分成多个列
- 表也有表结构信息(列、列名、列类型、列约束等)
基于这个前提,DataFrame的组成如下:
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面:
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空
同时,一行数据描述为Row对象,如Row(1, 张三, 11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息
3.1.3–DataFrame代码构建
基于RDD方式1
DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构
API:
- spark.createDataFrame(data_rdd,schema=[‘name’,‘age’])
- 参数1 被转换的rdd
- 参数2 通过list的形式按照顺序依次提供字符串名称即可
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.appName('test').master('local[*]').\
getOrCreate()
sc = spark.sparkContext
# 基于rdd转换成Dataframe类型
file_rdd = sc.textFile('../data/input/sql/people.txt')
data_rdd = file_rdd.map(lambda x:x.split(', ')).map(lambda x:(x[0],int(x[1])))
# 构建Dataframe对象
# 参数1 被转换的rdd
# 参数2 通过list的形式按照顺序依次提供字符串名称即可
df = spark.createDataFrame(data_rdd,schema=['name','age'])
# 打印DataFrame的表结构
df.printSchema()
# 打印df的数据
# 参数1:展示多少条数据,默认不传的话是20
# 参数2:表示是否对列进截断,如果列的数据超过20个字符串长度,
# 后续的内容不显示,以。。。代替,如果给False,表示不截断全部显示,默认是True
df.show(20,False)
# 将df对象转换成临时视图表,可供sql语句查询
df.createOrReplaceTempView("people")
spark.sql("""
SELECT * FROM people WHERE age<30
""").show()
基于RDD方式2
通过StructType对象来定义DataFrame的“表结构”转换RDD
API:
- StructType().add(“name”,StringType(),nullable=True)
- 参数1 列名 参数2 类型 参数3 是否能为空
- 如果是多个列就多次调用add()
# 构建表结构的描述对象,structType对象
# 参数1 列名 参数2 类型 参数3 是否能为空
schema = StructType().add("name",StringType(),nullable=True).\
add("age",IntegerType(),nullable=False)
# 基于StructType对象去构建dataframe的对象的rdd转化
df = spark.createDataFrame(data_rdd,schema = schema)
df.printSchema()
df.show()
基于RDD方式3
使用RDD的toDF方法转换RDD
API:
- data_rdd.toDF(参数)
- 参数可以是一个列表,也可以是schema对象
# todf 的方式构建Dataframe
df1 = data_rdd.toDF(["name","age"])
df1.printSchema()
df1.show()
# todf方式2 通过structType来构建
schema = StructType().add('name',StringType(),nullable=True).\
add('age',IntegerType(),nullable=False)
df2 = data_rdd.toDF(schema=schema)
df2.printSchema()
df2.show()
基于Pandas的DataFrame
将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象
API:
- spark.createDataFrame(参数)
- 参数为pandas的DataFrame数据
# 基于pandas的dataframe构建sparksql的dataframe对象
pdf = pd.DataFrame({
"id":[1,2,3],
"name":['张大仙','王小名','张三'],
"age":[11,18,16]
})
df = spark.createDataFrame(pdf)
df.printSchema()
df.show()
读取外部数据
通过SparkSQL的统一API进行数据读取构建DataFrame
- 读取text数据源
- 使用format(“text”)读取文本数据
- 读取到的DataFrame只会有一个列,列名默认称之为:value
API:
- spark.read.format(“读取文件类型”). schema().load(读取文件位置)
# 构建StructType,text,
# 读取数据的特点是将一整行当成一个列。
# 默认列名是value 类型是String
schema = StructType().add('data',StringType(),nullable=True)
df = spark.read.format("text").\
schema(schema=schema).\
load("../data/input/sql/people.txt")
df.printSchema()
df.show()
- 读取json数据源,使用format(“json”)读取json数据
df = spark.read.format('json').load("../data/input/sql/people.json")
df.printSchema()
df.show()
- 读取csv数据源,使用format(“csv”)读取csv数据
df =spark.read.format('csv').\
option("sep",";").\ 按照什么切割
option("harder",True).\ 是否有表头
option("encoding","utf-8").\ 编码
schema("name STRING,age INT,job STRING").\
load("../data/input/sql/people.csv")
df.printSchema()
df.show()
- 读取parquet数据源,使用format(“parquet”)读取parquet数据
df = spark.read.format("parquet").load("../data/input/sql/users.parquet")
df.printSchema()
df.show()
读取mysql数据
df2 = spark.read.format("jdbc").option("url","jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
.option("dbtable", "pyspark_info.stu_info") \
.option("user", "root") \
.option("password", "123456") \
.load()
df2.show()
读取hive数据
要求开启hive的metastore服务
配置文件
#----------------local模式:在node1配置-------------------------- # 进入配置文件目录,创建配置文件 cd /export/server/spark-local/conf vim hive-site.xml # 添加以下内容 <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://node1.itcast.cn:9083</value> </property> </configuration> #----------------Spark On Yarn模式:在node1,node2,node3配置------------------------ # 进入配置文件目录创建配置文件 cd /export/server/spark-yarn/conf vim hive-site.xml # 添加以下内容 <?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://node1.itcast.cn:9083</value> </property> </configuration>
import time
from pyspark import SparkContext, SparkConf, StorageLevel
import os
import sys
from pyspark.sql import SparkSession, Window
# spark入门案例 --- WordCount
# 1、设置环境变量
from pyspark.sql.functions import *
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__':
#创建用于操作Hive的SparkSession
"""
spark.sql.warehouse.dir:用来指定Hive表数据在HDFS的位置
hive.metastore.uris :用来指定hive的metastore服务(runjar)在哪台主机
enableHiveSupport :开启spark支持hive
"""
spark = SparkSession \
.builder \
.appName("SparkSQLAppName") \
.master("local[2]") \
.config("spark.sql.shuffle.partitions", 2) \
.config("spark.sql.warehouse.dir", 'hdfs://node1.itcast.cn:8020/user/hive/warehouse')\
.config("hive.metastore.uris", "thrift://node1.itcast.cn:9083")\
.enableHiveSupport()\
.getOrCreate()
# 设置日志级别为WARN
spark.sparkContext.setLogLevel("WARN")
#--------------------从hive读取-SQL风格--------------------------------
#使用spark操作hive的表
spark.sql(
"""
select * from db_hive.emp;
"""
).show()
spark.sql(
"""
select deptno,count(*) as cnt from db_hive.emp group by deptno;
"""
).show()
# --------------------从hive读取-DSL风格--------------------------------
dataFrame1 = spark.read.table("db_hive.emp")
dataFrame2 = dataFrame1 \
.select("deptno", "sal") \
.groupBy("deptno") \
.agg(
count("*").alias("cnt")
)
dataFrame2.show()
读取es数据
df_es_source = spark.read.format('es')\
.option("es.node",f"{rule['esNodes']}")\
.option("es.resource",f"{rule['esIndex']}/{rule['esType']}")\
.option("es.index.read.missing.as.empty","yes")\
.option("es.query","?q=*") \
.option("es.read.field.include",f"{rule['selectFields']}")\
.load()
3.1.4–DataFrame DSL风格演示
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
import pandas as pd
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.appName('test').master('local[*]').\
getOrCreate()
df = spark.read.format("csv").\
schema("id INT,subject STRING,score INT").\
load("../data/input/sql/stu_score.txt")
# column对象的获取
id_column = df['id']
subject_column = df['subject']
# Dsl风格演示
# df.select(["id","subject"]).show()
# df.select("id","subject").show()
# df.select(id_column,subject_column).show()
# filter API
# df.filter("score<99").show()
# df.filter(df["score"]<99).show()
# where API
# df.where("score<99").show()
# df.where(df["score"] < 99).show()
# group by API
df.groupBy("subject").count().show()
df.groupBy(df["subject"]).count().show()
# groupby API的返回值不是dataframe类型
# groupdata对象是一个关于分组的数据结构,有统一的API供我们分组做聚合
# SQL: groupby 后接聚合:sum avg count min max
# groupdata对象也类似Sql分组的数据结构,同样有上述的5中聚合方式
# groupdata对象其实只是一个中转的一个对象,最终还是要获取dataframe对象
r = df.groupBy("subject")
print(type(r))
3.1.5–DataFrame SQL风格演示
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建执行环境入口对象SparkSession
spark = SparkSession.builder.appName('test').master('local[*]').\
getOrCreate()
df = spark.read.format("csv").\
schema("id INT,subject STRING,score INT").\
load("../data/input/sql/stu_score.txt")
# 注册成临时表
df.createTempView("score")# 注册成临时表
df.createOrReplaceTempView("score_2")# 注册或替换临时表
# 注册全网临时视图,使用的时候需要在前面带上global_temp.前缀
df.createGlobalTempView("score_3")
# 可以通过SparkSession对象的sql_api完成sql语句执行
spark.sql("SELECT subject,COUNT(*) AS cnt FROM score GROUP BY subject").show()
spark.sql("SELECT subject,COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()
spark.sql("SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()
3.1.6–数据清理
数据去重
# dropDuplicates 是dataframe的api可以完成数据去重
# 无参数使用,对全部的列联合起来比较进行去重
df.dropDuplicates().show()
# 也可指定列来进行去重
df.dropDuplicates(["age","job"]).show()
缺失值处理
# 无参数使用,只要列中有null就删除这一行数据
df.dropna().show()
# thresh = 3 至少满足三个有效列
df.dropna(thresh=3).show()
# 表示在指定的列中至少有两个不为空
df.dropna(thresh=2,subset=["name","age"]).show()
# 缺失值处理也可以给缺失处进行填写
# dataframe的fillna api可以对确实的列进行填充
df.fillna("loss").show()
# 指定列进行填充
df.fillna("N/A",subset=['job']).show()
# 设定一个字典对所有列进行填充规则
df.fillna({"name":"未知名称","age":1,"job":"worker"}).show()
3.1.7–数据写出
写出到文件
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions",2).\
getOrCreate()
sc = spark.sparkContext
# 读取数据
df = spark.read.format("csv").\
option("sep",";").\
option("header",True).\
load("../data/input/sql/people.csv")
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("times", StringType(), nullable=True)
# 读取文件
df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("../data/input/sql/u.data")
# Write text 写出 只能写成一个列的数据,需要将df转换为单列df
df.select(F.concat_ws("---" , "user_id" , "movie_id", "rank","times")).\
write.\
mode("overwrite").\
format("text").\
save("../data/output/sql/text")
# Write csv
df.write.mode("overwrite").\
format("csv").\
option("sep",";").\
option("header",True).\
save("../data/output/sql/csv")
# write json
df.write.mode("overwrite").\
format("json").\
save("../data/output/sql/json")
# write parquet
df.write.mode("overwrite").\
format("parquet").\
save("../data/output/sql/parquet")
写出到mysql
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
import os
# spark入门案例 --- WordCount
# 1、设置环境变量
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark-local' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__':
# 创建spark对象 用于写sparksql
spark = SparkSession \
.builder.appName("test") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# 创建sc对象 用于写rdd
sc = spark.sparkContext
df1 = spark.read.format("csv") \
.schema("id int,name string") \
.option("sep", ",") \
.option("encoding", "utf8") \
.load("./data/input/stu_info.txt")
# 写入到mysql
df1.write.mode("append") \
.format("jdbc") \
.option("url", "jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
.option("dbtable", "pyspark_info.stu_info") \
.option("user", "root") \
.option("password", "123456") \
.save()
写出到hive
# --------------------向hive写入--------------------------------
#test_rs表如果不存在,则创建,如果存在,则覆盖数据
dataFrame2.write.mode("overwrite").format("hive").saveAsTable("db_hive.test_rs")
# 创建分区表
df.write.saveAsTable("db_hive.test_rs", mode='append',partitionBy=[''])
# append 是新增数据, overwrite则会将已存在的表删除,新建表
写出到es
(‘es.write.operation’, ‘upsert’) 表示插入模式为upsert 没有就插入 有就更新
.option(‘es.mapping.id’, ‘userId’) 表示插入时按照哪个来表示是否已经存在, 也就是给es索引下的_id赋值
(‘es.mapping.name’, ‘userId:userId,tagsId:tagsId’) 表示插入时, df和es索引的映射关系(df字段:es字段)
def es_write(result_df: DataFrame):
result_df.write.format('es') \
.option('es.nodes', 'up01:9200') \
.option('es.resource', 'tfec_userprofile_index') \
.option('es.write.operation', 'upsert') \
.option('es.mapping.id', 'userId') \
.option('es.mapping.name', 'userId:userId,tagsId:tagsId') \
.mode('append') \
.save()
3.2–DataFrameAPI
printSchema
打印DataFrame的表结构
API:
- df.printSchema()
show
打印df的数据
API:
- df.show(20,False)
- 参数1:展示多少条数据,默认不传的话是20
- 参数2:表示是否对列进截断,如果列的数据超过20个字符串长度,
- 后续的内容不显示,以。。。代替,如果给False,表示不截断全部显示,默认是True
createTempView
将df对象注册成临时视图表,可供sql语句查询
API:
- df.createTempView(“score”)
createOrReplaceTempView
将df对象注册或替换临时表,可供sql语句查询
API:
- df.createOrReplaceTempView(“people”)
createGlobalTempView
将df注册全网临时视图,使用的时候需要在前面带上global_temp.前缀
API:
- df.createGlobalTempView(“score_3”)
- spark.sql(“SELECT subject,COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject”).show()
select
选择DataFrame中的指定列(通过传入参数进行指定)
API:
- select(cols)
- 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串列名来指定列
List[Column]对象或者List[str]对象, 用来选择多个列 - df.select(“id”,“subject”).show()
- df.select(id_column,subject_column).show()
- df.select([“id”,“subject”]).show()
join
表的连接
df1.join(df2, on条件 , 连接方式 ) 'inner ’ 'left ’ 'right ’ ‘full’
df1.withColumn(
'rk',
F.rank().over(Window.partitionBy('dep_id').orderBy(F.col('salary').desc()))
).where('rk<=2').join(df2, df1.dep_id == df2.dep_id_2, 'inner')\
.select(['emp_id','emp_name','salary','dep_id','dep_name','dep_addr','rk']).show()
filter
过滤DataFrame内的数据,返回一个过滤后的DataFrame
API:
- filter(参数)
- df.filter(“score<99”).show()
- df.filter(df[“score”]<99).show()
where
过滤DataFrame内的数据,返回一个过滤后的DataFrame
API:
- where(参数)
- df.where(“score<99”).show()
- df.where(df[“score”] < 99).show()
groupBy
按照指定的列进行数据的分组, 返回值是GroupedData对象
API:
- groupBy(参数)
- df.groupBy(“subject”).count().show()
- df.groupBy(df[“subject”]).count().show()
- groupby API的返回值不是dataframe类型
- groupdata对象是一个关于分组的数据结构,有统一的API供我们分组做聚合
- SQL: groupby 后接聚合:sum avg count min max
- groupdata对象也类似Sql分组的数据结构,同样有上述的5中聚合方式
- groupdata对象其实只是一个中转的一个对象,最终还是要获取dataframe对象
orderBy
按照某列进行排序
API:
- orderBy(参数1,参数2)
- 参数1是被排序的列
- 参数2是ascending 默认是升序 ascending=False为降序
- orderBy(“cnt”, ascending=False)
withColumn
withcolum方法对已存在的列进行操作返回一个新的列
API:
- df1.withColumn(参数1,参数2)
- 参数1是新的一列的名字,如果名字和老列相同则替换,否则作为新列存在
- 参数2是对已存在列的操作
- df1.withColumn(“value”,F.explode(F.split(df1[“value”]," ")))
- 它还能用来实现窗口函数.
withColumnRenamed
对列名进行修改
API:
- withColumnRenamed(参数1,参数2)
- 参数1是要原列名
- 参数2是要现列名
- 一次只能修改一列,若要修改多个列,多次调用此方法
first
取第一行
limit
取第n行
dropDuplicates
数据去重
API:
- df.dropDuplicates()
- 无参数是对全部的列联合起来比较进行去重
- df.dropDuplicates([“age”,“job”])
- 也可指定列来进行去重
dropna
缺失值删除
API:
fillna
缺失值填充
API:
write
数据写出
API:
- write.mode(“overwrite”).format(写出文件类型).save(写出路径)
- mode(“overwrite”)表示是否覆盖
- append: 如果数据存在,继续追加
- overwrite: 如果目标存在时, 覆写以前数据, 存储当前最新数据
- error/errorifexists: 如果目标存在就报错, 默认模式
- ignore: 如果目标存在, 不做任何操作. 忽略
- 注意写出text时应转换成单列的dataframe
- 案例
3.3 类似于RDD的API
count
概述: 统计行数
collect
概述: 将DataFrame转换成一个列表
take
概述: 取DataFrame中前N行的数据
first
概述: 取DataFrame中第一行的数据
head
概述: 默认取DataFrame中第一行的数据,可以指定返回前N行的数据
tail
概述: 可以指定返回后N行的数据
foreach
概述: 对DataFrame中每条数据进行处理,没有返回值
foreachPartition
概述: 对DataFrame中每个分区数据进行处理,没有返回值
distinct
概述: 对DataFrame中每条数据进行去重处理
union/unionAll
概述: 实现两个DataFrame的合并
coalesce/repartition
概述: 调整DataFrame的分区数
cache/persist
概述: 对DataFrame进行缓存
unpersist
概述: 取消DataFrame的缓存
columns
概述: 返回DataFrame中的所有列名
schema
概述: 返回DataFrame中Schema的信息
rdd
概述: 返回DataFrame中的数据放入RDD中
printSchema
概述: 打印DataFrame的Schema信息
3.4–GroupDataAPI
count
按照分组进行计数,返回一个新的列默认名称是cnt
avg
按照分组求平均值
sum
求和
min
求最小值
max
求最大值
round
保留几位小数
file_df\
.select("movie_id","rank")\
.groupby("movie_id")\
.avg("rank")\
.withColumnRenamed("avg(rank)","avg_rank")\
.withColumn("avg_rank",F.round("avg_rank",2))\
.show()
agg
作用是在里面可以写多个聚合
df.groupBy("user_id").\
agg(
F.round(F.avg("rank"),2).alias("avg_rank"),
F.min("rank").alias("min_rank"),
F.max("rank").alias("max_rank")
).show()
3.5–column的常用API
alias
column对象的API 可以针对一个列改名
col("字段名").alias("别名")
astype
转换数据类型,是cast的别名
between
判断数据在什么和什么之间
df_etl.select(F.col("receivable").between(10,20))
cast
数据类型转化
df_etl.select(F.col("receivable").cast(DecimalType(10,2)))
astype
转换数据类型,是cast的别名
contains
判断是否包含
df_etl.select(F.col("receivable").contains("小"))
endswith(other)
boolen值,以other结尾的字符串
df.filter(F.col(‘对手’).endswith(‘熊’)).show()
eqNullSafe(other)
空值/指定值判断
lit
df增加一列每一行都为固定参数的列
new_result_df = result_df.withColumn('rule', F.lit(','.join(index_id_list)))
3.6–UDF
3.6.1–UDF的定义
方式1:
udf2 = spark.udf.register("udf1",num_ride_10,IntegerType())
- 参数1 注册udf的名称 这个名称仅可以用于sql风格
- 参数2 udf的处理逻辑 是一个单独的方法
- 参数3 udf的返回值类型
- 注意:udf注册的时候必须声名返回值类型,并且udf的真实返回值必须要和声名的一致
- 返回值对象 这是一个udf对象,仅可以用于dsl语法
- 当前这种方式定义的udf,可以通过参数1的名称用于sql风格,通过返回值对象来用于dsl风格
调用:
dsl df.select(udf2(df[‘num’])).show()
sql df.selectExpr(“udf1(num)”).show()
方式2:
udf3 = F.udf(num_ride_10,IntegerType())
- 仅能用于dsl风格
调用:
df.select(udf3((df[‘num’]))).show()
3.6.2–UDF的返回值
返回array
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions",2).\
getOrCreate()
sc = spark.sparkContext
# 构建一个rdd
rdd = sc.parallelize([["hadoop spark flink"],["hadoop java python"]])
df = rdd.toDF(['line'])
df.show()
# 注册一个udf函数
def split(data):
return data.split(" ") #返回值是个array对象
# TODO 1:方式1构建udf
udf2 = spark.udf.register("udf1",split,ArrayType(StringType()))
df.select(udf2(df['line'])).show()
df.createTempView("lines")
spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)
# TODO 2:方式2构建
udf3 = F.udf(split,ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate=False)
返回dict
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions",2).\
getOrCreate()
sc = spark.sparkContext
# 假设 有三个数字 1 2 3 我们传入数字,返回数字所在的序号的字母 然后和数字结合成dict返回
# 比如传入 1 返回{"num":1,"letters":"a"}
rdd = sc.parallelize([[1],[2],[3]])
df = rdd.toDF(["num"])
# 注册udf
def process(data):
return {"num":data,"letters":string.ascii_letters[data]}
"""
UDF的返回值是字典话,需要用structType来接收
"""
udf1 = spark.udf.register("udf1",process,StructType().add("num",IntegerType(),nullable=True).\
add("letters",StringType(),nullable=True))
df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)
3.6.3–UDAF by RDD
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions",2).\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
df = rdd.map(lambda x:[x]).toDF(['num'])
# df.show()
# 折中的方式 就是使用认定的的mappartitions 算子来完成聚合操作
# 如果用mappartition API 完成UDAF聚合 一定要是单分区
single_parttion_rdd = df.rdd.repartition(1)
# print(single_parttion_rdd.collect())
def process(data):
sum = 0
for row in data:
sum += row['num']
return [sum] #一定要嵌套list,因为mappartition方法要求的返回值是list
print(single_parttion_rdd.mapPartitions(process).collect())
3.7–窗口函数
DSL风格
"""
@Project :pyspark
@File :pys_04_windows_func.py
@IDE :PyCharm
@Author :wrx
@Date :2023/7/19 15:44
@describe:
"""
from pyspark.sql import SparkSession,Window
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
import os
# spark入门案例 --- WordCount
# 1、设置环境变量
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['SPARK_HOME'] = '/export/server/spark-local' # Spark安装位置
os.environ["PYSPARK_PYTHON"] = "/export/server/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/export/server/anaconda3/bin/python3"
if __name__ == '__main__':
# 创建spark对象 用于写sparksql
spark = SparkSession \
.builder.appName("test") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# 创建sc对象 用于写rdd
sc = spark.sparkContext
# 定义表结构
schema = StructType()\
.add("stu_id",StringType(),nullable=True) \
.add("project",StringType(),nullable=True) \
.add("score",IntegerType(),nullable=True) \
df1 = spark.read.format('csv')\
.option("header",False)\
.option("sep",",")\
.option("encoding","utf8")\
.schema(schema=schema)\
.load("./data/input/stu_score.txt")
# DSL实现开窗函数
df1.withColumn(
'rk',
F.rank().over(Window.partitionBy("project").orderBy(F.col("score").desc()))
).where("rk<=2").show()
# 关闭资源
sc.stop()
spark.stop()
SQL风格
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions",2).\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
("张三","class1",99),
("网易", "class2", 79),
("张六", "class1", 99),
("王如", "class2", 88),
("张二", "class3", 89),
("知晓", "class1", 77),
("张力", "class2", 59),
("网卡", "class1", 66),
("网课", "class2", 100),
("回合", "class3", 79)
])
schema = StructType().add("name",StringType()).add("class",StringType()).add("score",IntegerType())
df = rdd.toDF(schema)
df.createTempView("stu")
# todo :聚合窗口函数的演示
spark.sql("""
SELECT *,AVG(score) OVER() AS avg_score FROM stu
""").show()
# todo :排序相关的窗口函数
# RAKN over ,DENSE_RANK over,ROW_NUMBER over
spark.sql("""
SELECT *,ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# todo : NTILE
spark.sql("""
SELECT * ,NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()
3–spark优化
避免创建重复的RDD
在我们开发一个spark作业时,首先会基于某个数据源(hive或者hdfs文件)来创建一个RDD,然后对这个RDD执行某个算子操作,而得到下一个RDD。以此循环,直到计算出我们需要的结果,在这个过程中,多个RDD会通过不同的算子操作窜起来,这个“RDD串”就是RDD lineage,就是“RDD的血缘关系链”。但是在我们开发过程中,对于同一份数据,只能创建一个RDD,不能创建多个RDD代表同一份数据,从而增加作业的性能开销。尽可能复用同一个RDD
目的是尽可能的减少RDD的数量,从而尽可能减少算子执行的次数对多次使用的RDD进行持久化
当你在Spark代码中多次对一个RDD算子操作时,每调用一次此RDD都会从源头计算一遍,这样性能很差。所以可以对其进行持久化操作。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中,后面再用这个RDD就不用从源头再计算一遍了。尽量避免使用shuffle类算子
如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。使用map-side预聚合的shuffle操作
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。使用高性能算子
除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。
a)使用reduceByKey/aggregateByKey替代groupByKey
b)使用mapPartitions替代普通map
c)使用foreachPartitions替代foreach
d)使用filter之后进行coalesce操作
e)使用repartitionAndSortWithinPartitions替代repartition与sort类操作
(以上可以自行查博客,找疑点)广播大变量
有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
示例:
val list1 = …
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast…)使用Kryo优化序列化性能
在Spark中,主要有三个地方涉及到了序列化:1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。 2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。 3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
示例:
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(…).setAppName(…)
// 设置序列化器为KryoSerializer。
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))优化数据结构
实用性不强,