PySpark基础例题(包含map、reduceByKey、filter、sortBy等算子)

发布于:2025-09-16 ⋅ 阅读:(20) ⋅ 点赞:(0)

建议结合一下文章学习:

1.PySpark基础知识(python)-CSDN博客

2.通过PySpark单词计数案例带你学习map、flatMap、reduceByKey方法(Python)-CSDN博客

3.什么是RDD?-CSDN博客

文本文件:PySpark基础例题(包含map、reduceByKey、filter、sortBy等算子)资源-CSDN下载


现有一份商品销售数据文件test2.txt,数据格式为每行一个 JSON 对象,包含id(订单 ID)、timestamp(时间戳)、category(商品类别)、areaName(销售城市)、money(销售金额,字符串类型)等字段。请使用 PySpark 完成以下数据处理需求,编写对应的 Spark 代码:

  1. 需求 1:城市销售额排序
    读取test2.txt文件数据,计算各销售城市的总销售额(需将money字段转为整数类型),并按照总销售额从高到低的顺序排序,最终输出各城市及其对应的总销售额。

  2. 需求 2:全量商品类别统计
    基于上述读取的销售数据,提取所有在售的商品类别,要求去除重复类别,最终输出所有不重复的商品类别列表。

  3. 需求 3:北京在售商品类别统计
    从销售数据中筛选出 “北京” 地区的所有记录,提取该地区在售的商品类别并去除重复项,最终输出北京地区所有不重复的商品类别列表。

代码实现:

from pyspark import SparkConf, SparkContext
import os
import json

from unicodedata import category

os.environ['PYSPARK_PYTHON'] = "E:/Python/Python3.10.4/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 需求1:城市销售额排序
# 1.1 读取文件到RDD(注意:替换为你本地test2.txt的实际路径)
file_rdd = sc.textFile("D:/test2.txt")
# 1.2 去除每行末尾可能的逗号(修复JSON解析错误),无需按|分割(原数据无|分隔符)
json_str_rdd = file_rdd.map(lambda x: x.strip(","))
# 1.3 将JSON字符串转换为字典(修复原代码split后无法解析的问题)
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
print(dict_rdd.collect())
#1.4取出城市和销售额名额
city_with_money_rdd = dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))
#1.5按城市分组按销售额整合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b: a+b)
#1.6按销售额整合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结构是:",result1_rdd.collect())

#需求2:全部城市有多少商品类别在售卖
#2.1取出全部商品类别
category_rdd = dict_rdd.map(lambda x:x['category'] ).distinct()
print("需求2的结果是:",category_rdd.collect())

#需求3 北京有哪些商品类别在售卖
#3.1过滤北京市的数据
beijing_date_rdd = dict_rdd.filter(lambda x:x['areaName'] == '北京')
#3.2取出全部商品类别
result3_rdd = beijing_date_rdd.map(lambda x:x['category']).distinct()
print("需求3的结果是:",result3_rdd.collect())

实现效果图: