在 Spark 中,agg
是用于对 DataFrame 进行聚合操作的函数。它可以同时对多个列应用多个聚合函数,并返回一个新的 DataFrame。agg
通常与 groupBy
结合使用,用于对分组后的数据进行聚合操作。
以下是 agg
的详细用法和示例。
1. agg
的基本用法
语法
val aggregatedDF = df.agg(
F.sum("column1").as("total_column1"),
F.avg("column2").as("average_column2")
)
F.sum("column1")
:对column1
列求和。F.avg("column2")
:对column2
列计算平均值。as("alias")
:为聚合结果指定别名。
2. agg
与 groupBy
结合使用
agg
通常与 groupBy
结合使用,用于对分组后的数据进行聚合操作。
示例
假设有一个 DataFrame,包含用户的姓名、部门和工资:
import org.apache.spark.sql.{SparkSession, functions => F}
val spark = SparkSession.builder()
.appName("Agg Example")
.master("local[*]")
.getOrCreate()
// 示例数据
val data = Seq(
("Alice", "HR", 3000),
("Bob", "IT", 4000),
("Charlie", "HR", 3500),
("David", "IT", 4500),
("Eva", "Finance", 5000)
)
// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")
// 按部门分组,并计算工资总和、平均工资、最高工资和最低工资
val aggregatedDF = df.groupBy("department").agg(
F.sum("salary").as("total_salary"),
F.avg("salary").as("average_salary"),
F.max("salary").as("max_salary"),
F.min("salary").as("min_salary")
)
// 显示结果
aggregatedDF.show()
输出:
+----------+------------+--------------+----------+----------+
|department|total_salary|average_salary|max_salary|min_salary|
+----------+------------+--------------+----------+----------+
| HR| 6500| 3250.0| 3500| 3000|
| IT| 8500| 4250.0| 4500| 4000|
| Finance| 5000| 5000.0| 5000| 5000|
+----------+------------+--------------+----------+----------+
groupBy("department")
:按部门分组。agg
:对每个部门计算工资总和、平均工资、最高工资和最低工资。
3. agg
的多种聚合函数
agg
可以同时应用多个聚合函数。以下是一些常用的聚合函数:
聚合函数 | 描述 |
---|---|
F.sum("column") |
对列求和 |
F.avg("column") |
计算列的平均值 |
F.min("column") |
计算列的最小值 |
F.max("column") |
计算列的最大值 |
F.count("column") |
统计列的非空值数量 |
F.collect_list("column") |
将列的值收集为列表 |
F.collect_set("column") |
将列的值收集为集合(去重) |
示例
统计每个部门的员工数量、工资总和、平均工资、最高工资、最低工资,以及员工姓名列表:
val aggregatedDF = df.groupBy("department").agg(
F.count("name").as("employee_count"),
F.sum("salary").as("total_salary"),
F.avg("salary").as("average_salary"),
F.max("salary").as("max_salary"),
F.min("salary").as("min_salary"),
F.collect_list("name").as("employees")
)
aggregatedDF.show(truncate = false)
输出:
+----------+--------------+------------+--------------+----------+----------+----------------------+
|department|employee_count|total_salary|average_salary|max_salary|min_salary|employees |
+----------+--------------+------------+--------------+----------+----------+----------------------+
|HR |2 |6500 |3250.0 |3500 |3000 |[Alice, Charlie] |
|IT |2 |8500 |4250.0 |4500 |4000 |[Bob, David] |
|Finance |1 |5000 |5000.0 |5000 |5000 |[Eva] |
+----------+--------------+------------+--------------+----------+----------+----------------------+
4. 全局聚合(不分组)
如果不使用 groupBy
,agg
会对整个 DataFrame 进行全局聚合。
示例
计算所有员工的工资总和、平均工资、最高工资和最低工资:
val globalAggDF = df.agg(
F.sum("salary").as("total_salary"),
F.avg("salary").as("average_salary"),
F.max("salary").as("max_salary"),
F.min("salary").as("min_salary")
)
globalAggDF.show()
输出:
+------------+--------------+----------+----------+
|total_salary|average_salary|max_salary|min_salary|
+------------+--------------+----------+----------+
| 20000| 4000.0| 5000| 3000|
+------------+--------------+----------+----------+
5. 多列分组和聚合
可以对多列进行分组,并对多列应用聚合函数。
示例
假设有一个 DataFrame,包含用户的姓名、部门、职位和工资:
val data = Seq(
("Alice", "HR", "Manager", 3000),
("Bob", "IT", "Developer", 4000),
("Charlie", "HR", "Analyst", 3500),
("David", "IT", "Developer", 4500),
("Eva", "Finance", "Manager", 5000)
)
val df = spark.createDataFrame(data).toDF("name", "department", "role", "salary")
// 按部门和职位分组,并计算工资总和和平均工资
val multiGroupDF = df.groupBy("department", "role").agg(
F.sum("salary").as("total_salary"),
F.avg("salary").as("average_salary")
)
multiGroupDF.show()
输出:
+----------+---------+------------+--------------+
|department| role|total_salary|average_salary|
+----------+---------+------------+--------------+
| HR| Manager| 3000| 3000.0|
| IT|Developer| 8500| 4250.0|
| HR| Analyst| 3500| 3500.0|
| Finance| Manager| 5000| 5000.0|
+----------+---------+------------+--------------+
6. 使用表达式字符串
除了使用函数外,agg
还支持使用表达式字符串。
示例
val aggregatedDF = df.groupBy("department").agg(
"salary" -> "sum",
"salary" -> "avg",
"salary" -> "max",
"salary" -> "min"
)
aggregatedDF.show()
输出:
+----------+-----------+-----------+-----------+-----------+
|department|sum(salary)|avg(salary)|max(salary)|min(salary)|
+----------+-----------+-----------+-----------+-----------+
| HR| 6500| 3250.0| 3500| 3000|
| IT| 8500| 4250.0| 4500| 4000|
| Finance| 5000| 5000.0| 5000| 5000|
+----------+-----------+-----------+-----------+-----------+
总结
agg
用于对 DataFrame 进行聚合操作,通常与groupBy
结合使用。可以同时应用多个聚合函数,并为结果指定别名。
支持全局聚合(不分组)和多列分组聚合。
可以使用函数或表达式字符串定义聚合操作。