在 Spark 中,RDD(Resilient Distributed Dataset)是分布式数据集的基本抽象。数据清洗是数据预处理中的一个重要步骤,通常包括去除重复数据、过滤无效数据、转换数据格式等操作。以下是一个使用 RDD 进行数据清洗的完整示例。
示例场景
假设我们有一个包含用户信息的文本文件 users.txt
,每行是一个用户记录,格式如下:
user1,25,China
user2,30,USA
user3,invalid,Australia
user4,22,China
user5,28,USA
user6,35,invalid
我们需要对数据进行清洗,包括:
- 过滤掉无效的年龄数据(非数字或不在合理范围)。
- 过滤掉无效的国家数据(只保留指定的国家,如
China
和USA
)。 - 去除重复的用户记录。
实现步骤
- 创建 SparkContext:初始化 Spark 环境。
- 读取数据:从文件中加载数据到 RDD。
- 数据清洗:过滤无效数据和重复数据。
- 保存结果:将清洗后的数据保存到文件。
以下是完整的代码实现:
import org.apache.spark.{SparkConf, SparkContext}
object DataCleaning {
def main(args: Array[String]): Unit = {
// 初始化 Spark 环境
val conf = new SparkConf()
.setAppName("DataCleaning")
.setMaster("local[*]") // 使用本地模式运行
val sc = new SparkContext(conf)
// 读取数据
val inputPath = "path/to/users.txt"
val rawData = sc.textFile(inputPath)
// 数据清洗
val cleanedData = rawData
.map(line => line.split(",")) // 将每行数据分割为数组
.filter(arr => arr.length == 3) // 确保每行有三个字段
.filter(arr => {
// 过滤无效年龄数据
val age = try {
arr(1).toInt
} catch {
case _: NumberFormatException => -1
}
age >= 18 && age <= 100 // 假设年龄范围为 18 到 100
})
.filter(arr => {
// 过滤无效国家数据
val country = arr(2)
country == "China" || country == "USA"
})
.map(arr => (arr(0), arr(1), arr(2))) // 转换为元组
.distinct() // 去除重复记录
// 保存清洗后的数据
val outputPath = "path/to/cleaned_users.txt"
cleanedData.saveAsTextFile(outputPath)
// 停止 SparkContext
sc.stop()
}
}
代码说明
初始化 Spark 环境:
- 使用
SparkConf
配置 Spark 应用程序的名称和运行模式(本地模式)。 - 创建
SparkContext
实例。
- 使用
读取数据:
- 使用
sc.textFile
方法从指定路径加载数据到 RDD。
- 使用
数据清洗:
- 使用
map
方法将每行数据分割为数组。 - 使用
filter
方法过滤无效的年龄数据和国家数据。 - 使用
distinct
方法去除重复记录。
- 使用
保存结果:
- 使用
saveAsTextFile
方法将清洗后的数据保存到指定路径。
- 使用
示例输入和输出
输入文件 users.txt
:
user1,25,China
user2,30,USA
user3,invalid,Australia
user4,22,China
user5,28,USA
user6,35,invalid
user1,25,China
输出文件 cleaned_users.txt
:
user1,25,China
user2,30,USA
user4,22,China
user5,28,USA
运行项目
- 将上述代码保存为
DataCleaning.scala
文件。 - 在 IntelliJ IDEA 中运行该程序。
- 查看输出文件
cleaned_users.txt
,确保数据清洗结果正确。
通过以上步骤,你可以使用 Spark 的 RDD API 完成数据清洗任务。