“data01.txt”数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algorithm,60 Jim,DataStructure,80 …… |
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1) 该系总共有多少学生;
(2) 该系共开设了多少门课程;
(3) Tom同学的总成绩平均分是多少;
(4) 求每名同学的选修的课程门数;
(5) 该系DataBase课程共有多少人选修;
(6) 各门课程的平均分是多少;
2.编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件B的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
Plain Text
3.编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
下载spark 可以使用下载pip install pyspark
官网下载spark
实际代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, countDistinct
# 初始化 SparkSession
spark = SparkSession.builder.appName("GradeAnalysis").getOrCreate()
# 读取数据
data_path = "data01.txt"
lines = spark.read.text(data_path).rdd
# 分割数据并转换为 DataFrame
data = lines.map(lambda line: line.value.split(',')) \
.map(lambda x: (x[0], x[1], int(x[2]))) \
.toDF(["Student", "Course", "Score"])
# 1. 计算该系总共有多少学生
students_count = data.select("Student").distinct().count()
print(f"学生总数: {students_count}")
# 2. 计算该系共开设了多少门课程
courses_count = data.select("Course").distinct().count()
print(f"课程总数: {courses_count}")
# 3. 计算Tom同学的总成绩平均分
tom_avg_score = data.filter(data.Student == "Tom").groupBy().agg(avg("Score").alias("AvgScore")).collect()[0]["AvgScore"]
print(f"Tom同学的总成绩平均分: {tom_avg_score}")
# 4. 计算每名同学的选修的课程门数
students_courses_count = data.groupBy("Student").agg(countDistinct("Course").alias("CourseCount")).show()
print("每名同学的选修的课程门数:")
students_courses_count.show()
# 5. 计算该系DataBase课程共有多少人选修
db_students_count = data.filter(data.Course == "DataBase").select("Student").distinct().count()
print(f"DataBase课程选修人数: {db_students_count}")
# 6. 计算各门课程的平均分
courses_avg_score = data.groupBy("Course").agg(avg("Score").alias("AvgScore")).show()
print("各门课程的平均分:")
courses_avg_score.show()
# 停止 SparkSession
spark.stop()
二.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
def merge_and_deduplicate(fileA, fileB, outputFile):
# 创建SparkSession
spark = SparkSession.builder \
.appName("MergeAndDeduplicate") \
.getOrCreate()
# 读取文件A和B,假设它们是以空格分隔的文本文件,并且没有列头
dfA = spark.read.csv(fileA, header=False, inferSchema=True, sep=" ")
dfB = spark.read.csv(fileB, header=False, inferSchema=True, sep=" ")
# 为DataFrame设置列名(这里我们假设只有两列,分别是日期和值)
dfA = dfA.withColumnRenamed("_c0", "date").withColumnRenamed("_c1", "value")
dfB = dfB.withColumnRenamed("_c0", "date").withColumnRenamed("_c1", "value")
# 合并两个DataFrame
dfCombined = dfA.unionByName(dfB)
# 去重(基于两列的组合)
dfDeduplicated = dfCombined.dropDuplicates()
# 将结果写入新的文件C
dfDeduplicated.write.csv(outputFile, header=True, sep=" ")
# 停止SparkSession
spark.stop()
# 调用函数,传入文件路径和输出路径
merge_and_deduplicate("A.txt", "B.txt", "C.txt")
三.
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.group import GroupedData
def calculate_average_scores(input_files, output_file):
# 创建SparkSession
spark = SparkSession.builder \
.appName("AverageScoresCalculator") \
.getOrCreate()
# 读取所有输入文件,假设它们是以空格分隔的文本文件,并且第一列是学生名字,第二列是成绩
# 由于我们不知道有多少个文件,所以使用reduce来逐个union它们
from functools import reduce
dfs = [spark.read.csv(file, header=False, inferSchema=True, sep=" ") for file in input_files]
df_combined = reduce(lambda df1, df2: df1.unionByName(df2), dfs)
# 为DataFrame设置列名
df_combined = df_combined.withColumnRenamed("_c0", "student_name").withColumnRenamed("_c1", "score")
# 将成绩列转换为数值类型(如果inferSchema没有正确推断的话)
df_combined = df_combined.withColumn("score", df_combined["score"].cast("double"))
# 按学生名字分组,并计算平均成绩
grouped = df_combined.groupBy("student_name")
avg_scores = grouped.agg(avg("score").alias("average_score"))
# 将结果写入新的文件
avg_scores.write.csv(output_file, header=True, sep=" ")
# 停止SparkSession
spark.stop()
# 输入文件列表(这里假设有三个文件,分别代表不同学科的成绩)
input_files = ["1.txt", "2.txt", "3.txt"]
# 输出文件路径
output_file = "average_scores.txt"
# 调用函数,传入输入文件列表和输出文件路径
calculate_average_scores(input_files, output_file)