实操作业02:Spark核心开发
作业说明
- 请严格按照步骤操作,并将最终结果文件(命名为:
sparkcore_result.txt
)于20点前上传。 - 结果文件需包含每一步的关键命令执行结果文本输出。
一、数据读取与转换操作
- 上传账户数据
$DATA_EXERCISE/accounts
到HDFS的/dw/accounts
目录,从HDFS路径/dw/accounts
读取accounts数据文件
hadoop fs -mkdir -p /dw/accounts
hadoop fs -put $DATA_EXERCISE/accounts /dw/accounts/
- 将每行数据按逗号分割成字段数组
- 以邮政编码字段(第9个字段)作为key,创建键值对RDD
- 查看转换后的数据结构,显示前2条记录
step1. 创建 RDD(读取所有 part 文件)
// 使用通配符 * 读取目录下所有 part 文件
val accountsRDD = sc.textFile("hdfs://master:8020/dw/accounts/accounts/part-*")
// 验证数据加载
accountsRDD.take(2).foreach(println)
step2:数据转换
// 一、数据转换操作
val zipKeyValueRDD = accountsRDD.map(line => {
val fields = line.split(",") // 按逗号分割每行数据
val zipCode = fields(8) // 提取第9个字段(索引8)作为邮政编码
(zipCode, fields) // 返回(邮编, 字段数组)的键值对
})
// 显示前2条
zipKeyValueRDD.take(2).foreach {
case (zip, fields) => println(s"Key: $zip | Value: ${fields.mkString(",")}")
}
为什么可以确定line是一个数组属性的变量呢
解析:
line: 是一个 String 类型的变量,表示CSV文件的某一行(例如:"1,John,Doe,10001")
.split(","):
是String类的方法,按指定的分隔符(这里是逗号 ,)切割字符串
返回一个 Array[String](字符串数组)
val fields:
定义一个 不可变变量(Scala中用val声明)
类型由编译器自动推断为 Array[String]
存储切割后的字段数组
可以通过Spark-shell直接验证类型:
// 示例数据
val line = "1,John,Doe,10001"
// 执行切割
val fields = line.split(",") // 得到:Array("1", "John", "Doe", "10001")
// 查看类型
println(fields.getClass) // 输出:class [Ljava.lang.String; (表示String数组)
这三行代码也只能输出一行数据的转换结果,但是这个是一整个文件的转换结果,应该是有循环在里面的,循环体现在哪呢
这里的关键在于 Spark的分布式计算模型 和 RDD的转换操作 的运作机制。让我们深入解析这个"隐式循环"的实现原理:
1. 表面现象 vs 底层实现
你看到的代码(看似单行处理)
scala
val zipKeyValueRDD = accountsRDD.map(line => {
val fields = line.split(",") // 似乎只处理一行
val zipCode = fields(8)
(zipCode, fields)
})
实际发生的处理(分布式并行处理)
python
# 伪代码:Spark集群的实际执行逻辑
for partition in accountsRDD.partitions: # 遍历所有数据分区
for line in partition: # 分区内逐行处理(真正的循环)
fields = line.split(",")
emit((fields[8], fields)) # 输出键值对
2. 循环的隐藏位置
Spark通过以下机制实现"隐式循环":
组件 | 作用 | 类比传统编程 |
---|---|---|
RDD.map() | 定义转换逻辑 | 相当于循环体 |
Partitions(分区) | 数据分片 | 相当于外层循环 |
Tasks(任务) | 并行处理单元 | 相当于多线程循环 |
3. 完整数据处理流程
输入阶段
假设原始文件有100万行,被Spark自动划分为N个分区(如HDFS的block数)
并行执行阶段
每个分区的数据会被不同的Executor进程处理
每个Executor对分配到的分区数据 逐行执行map函数
输出阶段
所有节点的处理结果合并成新的RDD
4. 关键问题解答
Q: 为什么代码没有显式循环却能处理整个文件?
Spark的RDD操作是声明式的:你只需要定义"做什么"(业务逻辑),Spark框架自动处理"怎么做"(分布式循环)
数据分区和并行计算:
scala
// 假设文件被分为3个分区
accountsRDD.partitions.size // 返回分区数每个分区会触发一个Task,每个Task内部会循环处理其分配到的数据
Q: 如何验证确实处理了所有数据?
scala
// 查看处理后的记录数
zipKeyValueRDD.count()
// 查看分区处理情况(需在Spark UI观察)
zipKeyValueRDD.toDebugString
5. 与传统编程的对比
特性 | Spark RDD | 传统Java/Python |
---|---|---|
循环方式 | 隐式(由框架管理) | 显式(for/while) |
并行度 | 自动分区并行 | 需手动线程/进程管理 |
数据规模 | 支持TB/PB级 | 通常单机内存限制 |
代码示例 | rdd.map(x => f(x)) |
for x in list: f(x) |
循环确实存在:但被Spark框架隐藏在分布式运行时中
你的代码是"循环体":
map
中的逻辑会被应用到每一行数据优势:开发者无需关心并行化和数据分发,专注业务逻辑
二、数据聚合操作
- 对上述RDD提取每个记录的姓名字段:
- 提取第5个字段(first_name)和第4个字段(last_name)
- 将姓和名用逗号连接
val nameByZipRDD = zipKeyValueRDD.mapValues(fields =>
s"${fields(4)},${fields(3)}" // 格式化为"姓,名"
)
在Scala中,s"${fields(4)},${fields(3)}"
是一种称为 字符串插值(String Interpolation) 的语法
1. 字符串插值的组成
部分 | 含义 | 示例 |
---|---|---|
开头的s |
表示启用字符串插值 | s"..." |
${} |
插入变量/表达式的语法 | ${fields(4)} |
引号内内容 | 固定字符串+动态变量组合 | "姓,名" |
2. 具体到代码
scala
s"${fields(4)},${fields(3)}"
等效的普通写法:
scala
fields(4) + "," + fields(3) // 直接字符串拼接
执行过程:
取出数组
fields
的第5个元素(索引4)取出第4个元素(索引3)
用逗号连接两者
3. 对比其他语言
语言 | 类似语法 | 示例 |
---|---|---|
Scala | s"${var}" |
s"Hello, ${name}" |
Python | f-string | f"Hello, {name}" |
JavaScript | 模板字符串 | `Hello, ${name}` |
1. map
vs mapValues
的本质区别
操作 | 函数签名 | 输入 → 输出 | 在你的代码中的应用 |
---|---|---|---|
map |
(T) => U |
整个元素 → 新元素 | line => (zipCode, fields) |
mapValues |
(V) => U |
仅值部分 → 新值(键不变) | fields => "姓,名" |
2.代码中两个阶段的解析
(1)第一阶段:数据转换 (map
)
scala
val zipKeyValueRDD = accountsRDD.map(line => {
val fields = line.split(",") // String → Array[String]
val zipCode = fields(8) // 提取key
(zipCode, fields) // 返回: (String, Array[String])
})
line =>
的含义:输入:原始字符串(如
"1,John,Doe,10001"
)输出:完全新建的键值对
(String, Array[String])
数据流:
text
"1,John,Doe,10001" → split → ["1","John","Doe","10001"] → 取fields(8)作为key → 输出 ("10001", ["1","John","Doe","10001",...])
(2)第二阶段:聚合 (mapValues
)
scala
val nameByZipRDD = zipKeyValueRDD.mapValues(fields =>
s"${fields(4)},${fields(3)}" // 仅修改value部分
)
fields =>
的含义:输入:已有键值对的值部分(即之前的
Array[String]
)输出:仅更新值(键
zipCode
保持不变)
数据流:
text
输入: ("10001", ["1","John","Doe","10001",...]) → 提取fields(4)和fields(3) → 输出 ("10001", "Doe,John") // 键未改变!
3. =>
的本质
=>
是Scala中的函数定义符号,表示:scala
val func: InputType => OutputType = (input) => { // 处理input output }
在代码中:
line => ...
:定义了一个从String
到(String, Array[String])
的函数fields => ...
:定义了一个从Array[String]
到String
的函数
- 按邮政编码分组
- 查看聚合结果,显示前2条记录
val groupedByNameRDD = nameByZipRDD.groupByKey()
// 显示前2组
groupedByNameRDD.take(2).foreach {
case (zip, names) => println(s"$zip -> ${names.mkString("; ")}")
}
三、数据排序与展示
- 对分组后的RDD按邮政编码进行升序排列
- 取前5条记录进行展示
- 对每条记录,先打印邮政编码,然后打印该邮政编码下的所有姓名列表
groupedByNameRDD.sortByKey().take(5).foreach {
case (zip, names) =>
println(s"\n=== 邮政编码: $zip ===")
names.foreach(println)
}
四、提交要求
代码和结果文件:将代码及其执行后的输出结果保存到
sparkcore_result.txt
文件中结果文件应包含:
- 数据读取与转换操作的代码和输出结果
- 数据聚合操作的代码和输出结果
- 数据排序与展示的代码和输出结果