spark实验

发布于:2025-03-14 ⋅ 阅读:(15) ⋅ 点赞:(0)

“data01.txt”数据集包含了某大学计算机系的成绩,数据格式如下所示:

Tom,DataBase,80

Tom,Algorithm,50

Tom,DataStructure,60

Jim,DataBase,90

Jim,Algorithm,60

Jim,DataStructure,80

……

请根据给定的实验数据,在pyspark中通过编程来计算以下内容:

(1)       该系总共有多少学生;

(2)       该系共开设了多少门课程;

(3)       Tom同学的总成绩平均分是多少;

(4)       求每名同学的选修的课程门数;

(5)       该系DataBase课程共有多少人选修;

(6)       各门课程的平均分是多少;

2.编写独立应用程序实现数据去重

对于两个输入文件AB,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

20170101    x

20170102    y

20170103    x

20170104    y

20170105    z

20170106    z

输入文件B的样例如下:

20170101    y

20170102    y

20170103    x

20170104    z

20170105    y

根据输入的文件AB合并得到的输出文件C的样例如下:

20170101    x

20170101    y

20170102    y

20170103    x

20170104    y

20170104    z

20170105    y

20170105    z

20170106    z

Plain Text

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

小明 92

小红 87

小新 82

小丽 90

Database成绩:

小明 95

小红 81

小新 89

小丽 85

Python成绩:

小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

    (小红,83.67)

    (小新,88.33)

    (小明,89.67)

(小丽,88.67)

下载spark 可以使用下载pip install pyspark

官网下载spark

实际代码

from pyspark.sql import SparkSession

from pyspark.sql.functions import avg, col, countDistinct

# 初始化 SparkSession

spark = SparkSession.builder.appName("GradeAnalysis").getOrCreate()

# 读取数据

data_path = "data01.txt"

lines = spark.read.text(data_path).rdd

# 分割数据并转换为 DataFrame

data = lines.map(lambda line: line.value.split(',')) \

    .map(lambda x: (x[0], x[1], int(x[2]))) \

    .toDF(["Student", "Course", "Score"])

# 1. 计算该系总共有多少学生

students_count = data.select("Student").distinct().count()

print(f"学生总数: {students_count}")

# 2. 计算该系共开设了多少门课程

courses_count = data.select("Course").distinct().count()

print(f"课程总数: {courses_count}")

# 3. 计算Tom同学的总成绩平均分

tom_avg_score = data.filter(data.Student == "Tom").groupBy().agg(avg("Score").alias("AvgScore")).collect()[0]["AvgScore"]

print(f"Tom同学的总成绩平均分: {tom_avg_score}")

# 4. 计算每名同学的选修的课程门数

students_courses_count = data.groupBy("Student").agg(countDistinct("Course").alias("CourseCount")).show()

print("每名同学的选修的课程门数:")

students_courses_count.show()

# 5. 计算该系DataBase课程共有多少人选修

db_students_count = data.filter(data.Course == "DataBase").select("Student").distinct().count()

print(f"DataBase课程选修人数: {db_students_count}")

# 6. 计算各门课程的平均分

courses_avg_score = data.groupBy("Course").agg(avg("Score").alias("AvgScore")).show()

print("各门课程的平均分:")

courses_avg_score.show()

# 停止 SparkSession

spark.stop()

二.

from pyspark.sql import SparkSession

from pyspark.sql.functions import col

def merge_and_deduplicate(fileA, fileB, outputFile):

    # 创建SparkSession

    spark = SparkSession.builder \

        .appName("MergeAndDeduplicate") \

        .getOrCreate()

    # 读取文件A和B,假设它们是以空格分隔的文本文件,并且没有列头

    dfA = spark.read.csv(fileA, header=False, inferSchema=True, sep=" ")

    dfB = spark.read.csv(fileB, header=False, inferSchema=True, sep=" ")

    # 为DataFrame设置列名(这里我们假设只有两列,分别是日期和值)

    dfA = dfA.withColumnRenamed("_c0", "date").withColumnRenamed("_c1", "value")

    dfB = dfB.withColumnRenamed("_c0", "date").withColumnRenamed("_c1", "value")

    # 合并两个DataFrame

    dfCombined = dfA.unionByName(dfB)

    # 去重(基于两列的组合)

    dfDeduplicated = dfCombined.dropDuplicates()

    # 将结果写入新的文件C

    dfDeduplicated.write.csv(outputFile, header=True, sep=" ")

    # 停止SparkSession

    spark.stop()

# 调用函数,传入文件路径和输出路径

merge_and_deduplicate("A.txt", "B.txt", "C.txt")

三.

from pyspark.sql import SparkSession

from pyspark.sql.functions import avg

from pyspark.sql.group import GroupedData

def calculate_average_scores(input_files, output_file):

    # 创建SparkSession

    spark = SparkSession.builder \

        .appName("AverageScoresCalculator") \

        .getOrCreate()

    # 读取所有输入文件,假设它们是以空格分隔的文本文件,并且第一列是学生名字,第二列是成绩

    # 由于我们不知道有多少个文件,所以使用reduce来逐个union它们

    from functools import reduce

    dfs = [spark.read.csv(file, header=False, inferSchema=True, sep=" ") for file in input_files]

    df_combined = reduce(lambda df1, df2: df1.unionByName(df2), dfs)

    # 为DataFrame设置列名

    df_combined = df_combined.withColumnRenamed("_c0", "student_name").withColumnRenamed("_c1", "score")

    # 将成绩列转换为数值类型(如果inferSchema没有正确推断的话)

    df_combined = df_combined.withColumn("score", df_combined["score"].cast("double"))

    # 按学生名字分组,并计算平均成绩

    grouped = df_combined.groupBy("student_name")

    avg_scores = grouped.agg(avg("score").alias("average_score"))

    # 将结果写入新的文件

    avg_scores.write.csv(output_file, header=True, sep=" ")

    # 停止SparkSession

    spark.stop()

# 输入文件列表(这里假设有三个文件,分别代表不同学科的成绩)

input_files = ["1.txt", "2.txt", "3.txt"]

# 输出文件路径

output_file = "average_scores.txt"

# 调用函数,传入输入文件列表和输出文件路径

calculate_average_scores(input_files, output_file)