pyspark RDD相关常用函数使用案例

发布于:2025-03-11 ⋅ 阅读:(11) ⋅ 点赞:(0)


一、浅语

现在虽然各种AI大模型层出不穷的展现着其强大的智能水平,紧跟时代的同时,也需要脚踏实地,对一些基础信息进行了解和掌握,知其然才能知其所以然
对于想入门和掌握pyspark这类语言学来说,更需要动手才行。看一百遍没有下笔,真动起手来总感觉一无所知的样子。虽然让大模型生成了现成的代码,但自己看不动其处理方式终归不是走捷径之道。一个字母一个字母的敲击、运行、输出、报错、调试,纸上觉来终觉浅,绝知此事要躬行,大抵如是。

二、启动pyspark模式

2.1 启动方式

import findspark
findspark.init() 
from pyspark.sql import SparkSession

# 创建 Spark 会话
spark = SparkSession.builder \
    .appName("Test PySpark") \
    .master("local[*]") \
    .getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'

2.2 示例

data=sc.parallelize(range(1000),7)
print(data.count())

# 创建一个 DataFrame
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# 显示 DataFrame
df.show()
1000
+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
+-----+---+

三、基本RDD转换运算

3.1 创建intRDD

intRDD = sc.parallelize((1,2,3,5,5))

3.1.1 intRDD转换为List

intRDD.collect()
[1, 2, 3, 5, 5]

3.1.2 map函数

def addOne(x):
    return (x+3)

intRDD.map(addOne).collect()
[4, 5, 6, 8, 8]
intRDD.map(lambda x:x+10).collect()
[11, 12, 13, 15, 15]

3.1.3 filter函数:过滤

intRDD.filter(lambda x:x>2).collect()
[3, 5, 5]

3.1.4 distinct函数:去重

intRDD.distinct().collect()
[1, 2, 3, 5]

###3.1.5 randomSplit函数:将整个集合元素以随机数的方式按照比例分为多个rdd

splitRddOne,splitRddTwo = intRDD.randomSplit([0.2,0.8])
splitRddOne.collect(),splitRddTwo.collect()
([1, 2], [3, 5, 5])

3.1.6 groupBy函数:按照传入的函数规则将数据分为多个list

gRdd = intRDD.groupBy(lambda x:'偶数' if x%2==0 else '奇数').collect()
gRdd[0]
('偶数', <pyspark.resultiterable.ResultIterable at 0x2ffafc10>)
list(gRdd[1][1])
[1, 3, 5, 5]

3.2 多个RDD转换运算

intRDD2 = sc.parallelize([5,6])
intRDD3 = sc.parallelize([2,7])

3.2.1 union并集

intRDD.union(intRDD2).union(intRDD3).collect()
[1, 2, 3, 5, 5, 5, 6, 2, 7]

3.2.2 intersection交集

intRDD.intersection(intRDD2).collect()
[5]

3.2.3 subtract差集

intRDD.subtract(intRDD2).collect()
[1, 2, 3]

3.2.4 cartesian笛卡尔乘积

intRDD.cartesian(intRDD2).collect()
[(1, 5),
 (1, 6),
 (2, 5),
 (2, 6),
 (3, 5),
 (3, 6),
 (5, 5),
 (5, 6),
 (5, 5),
 (5, 6)]
intRDD.cartesian(intRDD2).keys().collect()
[1, 1, 2, 2, 3, 3, 5, 5, 5, 5]

四、基本动作运算

4.1 读取元素

4.1.1 first()

intRDD.first()
1

4.1.2 take()

intRDD.take(2)
[1, 2]

4.1.3 takeOrdered() :从小到大排序

intRDD.takeOrdered(2) 
[1, 2]
intRDD.takeOrdered(2,key = lambda x:-x) # 从大到小
[5, 5]

4.2 统计功能

4.2.1 stats():统计

intRDD.stats()
(count: 5, mean: 3.2, stdev: 1.6, max: 5.0, min: 1.0)

4.2.2 min()/max()/stdev()/count()/sum()/mean()

print(intRDD.min(),intRDD.max(),intRDD.stdev(),intRDD.count(),intRDD.sum(),intRDD.mean())
1 5 1.6 5 16 3.2

4.3 RDD key-value基本转换运算

kvRDD = sc.parallelize([(1,2),(3,2),(4,4),(5,3),(1,9)])
kvRDD.collect()
[(1, 2), (3, 2), (4, 4), (5, 3), (1, 9)]

4.3.1 查看key值

kvRDD.keys().collect()
[1, 3, 4, 5, 1]

4.3.2 查看value值

kvRDD.values().collect()
[2, 2, 4, 3, 9]

4.3.3 filter过滤key/value

kvRDD.filter(lambda x:x[0]>3).collect(),kvRDD.filter(lambda x:x[1]>3).collect()
([(4, 4), (5, 3)], [(4, 4), (1, 9)])

4.3.4 mapValues运算:产生一个新RDD

kvRDD.mapValues(lambda x:x*x).collect()
[(1, 4), (3, 4), (4, 16), (5, 9), (1, 81)]

4.3.5 sortByKey:按照key排序

kvRDD.sortByKey(ascending=False).collect()
[(5, 3), (4, 4), (3, 2), (1, 2), (1, 9)]

4.3.6 reduceByKey:按照key进行reduce运算

print('累积:',kvRDD.reduceByKey(lambda x,y:x*y).collect(),'\n累加:',kvRDD.reduceByKey(lambda x,y:x+y).collect())
累积: [(1, 18), (3, 2), (4, 4), (5, 3)] 
累加: [(1, 11), (3, 2), (4, 4), (5, 3)]

4.4 多个RDD Key-Value转换运算

kvRDD2 = sc.parallelize([(1,2),(1,22),(2,9)])

4.4.1 join运算:类似内连接

kvRDD.join(kvRDD2).collect()
[(1, (2, 2)), (1, (2, 22)), (1, (9, 2)), (1, (9, 22))]

4.4.2 leftOuterJoin运算:左外连接

kvRDD.leftOuterJoin(kvRDD2).collect()
[(1, (2, 2)),
 (1, (2, 22)),
 (1, (9, 2)),
 (1, (9, 22)),
 (3, (2, None)),
 (4, (4, None)),
 (5, (3, None))]

4.4.3 rightOuterJoin运算:右外连接

kvRDD.rightOuterJoin(kvRDD2).collect()
[(1, (2, 2)), (1, (2, 22)), (1, (9, 2)), (1, (9, 22)), (2, (None, 9))]

4.4.4 subtrackByKey运算:删除具有相同key的数据

kvRDD.subtractByKey(kvRDD2).collect()
[(3, 2), (4, 4), (5, 3)]

4.5 Key-Value动作运算

4.5.1 获取数据:first()/take()

kvRDD.first(),kvRDD.take(2)
((1, 2), [(1, 2), (3, 2)])
kvRDD.first()[0],kvRDD.first()[1],kvRDD.take(2)[0]
(1, 2, (1, 2))

4.5.2 countByKey():计算key值的项数

kvRDD.countByKey()
defaultdict(int, {1: 2, 3: 1, 4: 1, 5: 1})

4.5.3 collectAsMap():创建key-value的字典

kvRDD.collectAsMap()
{1: 9, 3: 2, 4: 4, 5: 3}

4.5.4 lookup():输入key值查找value值

kvRDD.lookup(1)
[2, 9]

五、共享变量

共享变量可以节省内层和运行时间,提升并行处理的执行效率。

共享变量包括:

  • 广播变量:Broadcast
  • 累加器:accumulator.

5.1 广播变量:Broadcast

使用规则:

  • 使用SparkContext.broadcast([初始值])创建
  • 使用.value方法来读取广播变量的值
  • 广播变量被创建后不能修改
# 示例
kvRDD = sc.parallelize([(1,'orange'),(2,'apple'),(3,'banana'),(4,'gra')])
# 创建字典
dictK = kvRDD.collectAsMap()
print('字典信息:',dictK)
# 转换为广播变量
bcVal = sc.broadcast(dictK)
print('获取广播变量值:',bcVal.value)

# 广播变量字典转换和应用示例
id_val = sc.parallelize([1,2,3,4])
id_name = id_val.map(lambda x:bcVal.value[x]).collect()
print('结果:',id_name)
字典信息: {1: 'orange', 2: 'apple', 3: 'banana', 4: 'gra'}
获取广播变量值: {1: 'orange', 2: 'apple', 3: 'banana', 4: 'gra'}
结果: ['orange', 'apple', 'banana', 'gra']

5.2 累加器:accumulator

使用规则:

  • 使用SparkContext.accumulator([初始值])来创建
  • 使用.add()进行累加
  • 只有驱动程序即循环外,才可以使用.value来读取累加器的值
# 示例
intRDD = sc.parallelize([1,2,3,4,5,5,6])
# 创建累加器,初始值0.0,Double类型,记录总和
sumVal = sc.accumulator(0.0)
# 创建累加器,初始值0,int类型,记录个数
numVal = sc.accumulator(0)

# 更新累加器的值
intRDD.foreach(lambda x:[sumVal.add(x),numVal.add(1)])

# 计算平均值
avgVal = sumVal.value/numVal.value
print('总和:',sumVal.value,'个数:',numVal.value,'平均值:',avgVal)
总和: 26.0 个数: 7 平均值: 3.7142857142857144

5.3 持久化:RDD Persistence

RDD持久化机制用于将需要重复运算的RDD存储在内存中,以大幅提升运算效率。

RDD持久化使用方法:

  • RDD.persist(存储等级):进行RDD持久化,可以指定存储等级,默认MEMORY_ONLY,即存储在内存中
  • RDD.unpersist():取消持久化
# 示例
intRDD = sc.parallelize([1,2,3,4,4])
# 持久化
intRDD.persist()
# 查看是否已经cached(缓存)
print(intRDD.is_cached)
# 取消持久化
intRDD.unpersist()
# 查看是否已经cached(缓存)
print(intRDD.is_cached)
True
False

六、实战示例

6.1 WordCount

# 读取文本文件
textFile = sc.textFile('test.txt')
# 读取每个单词
wordRDD = textFile.flatMap(lambda x:x.split(' '))
# 计算每个单词出现的次数
countRDD = wordRDD.map(lambda x:[x,1])
countRDD = countRDD.reduceByKey(lambda x,y:x+y)
countRDD.collect()
[('yellow', 2),
 ('green', 2),
 ('red', 4),
 ('blue', 1),
 ('black', 1),
 ('orange', 1)]

参考资料

《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739