使用Spark处理数据文件
检查数据
检查$DATA_EXERCISE/activations
里的数据,每个XML文件包含了客户在指定月份活跃的设备数据。
拷贝数据到HDFS的/dw
目录
样本数据示例:
<activations>
<activation timestamp="1225499258" type="phone">
<account-number>316</account-number>
<device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id>
<phone-number>5108307062</phone-number>
<model>iFruit 1</model>
</activation>
…
</activations>
处理文件
读取XML文件并抽取账户号和设备型号,把结果保存到/dw/account-models
,格式为account_number:model
。
输出示例:
1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…
提供了解析XML的函数如下:
// Stub code to copy into Spark Shell
import scala.xml._
// Given a string containing XML, parse the string, and
// return an iterator of activation XML records (Nodes) contained in the string
def getActivations(xmlstring: String): Iterator[Node] = {
val nodes = XML.loadString(xmlstring) \\ "activation"
nodes.toIterator
}
// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {
(activation \ "model").text
}
// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {
(activation \ "account-number").text
}
上传数据
# 1. 检查并创建HDFS目录
hdfs dfs -mkdir -p /dw
# 2. 将本地数据上传到HDFS(替换$DATA_EXERCISE为实际路径)
hdfs dfs -put $DATA_EXERCISE/activations /dw/
# 3. 检查文件是否上传成功
hdfs dfs -ls /dw/activations
定义题目提供的解析函数
def getActivations(xmlstring: String): Iterator[Node] = {
(XML.loadString(xmlstring) \\ "activation").toIterator
}
def getModel(activation: Node): String = (activation \ "model").text
def getAccount(activation: Node): String = (activation \ "account-number").text
读取数据(像处理日志一样)
val xmlRDD = sc.wholeTextFiles("/dw/activations/*.xml")
测试解析(查看第一条记录)
val firstRecord = getActivations(xmlRDD.first()._2).next()
println(s"测试解析结果: ${getAccount(firstRecord)}:${getModel(firstRecord)}")
处理全部数据
val resultRDD = xmlRDD.flatMap { case (_, xml) =>
getActivations(xml).map(act => s"${getAccount(act)}:${getModel(act)}")
}
查看结果样例(10条)
resultRDD.take(10).foreach(println)
保存结果(先清理旧数据)
import org.apache.hadoop.fs._
val outputPath = "/dw/account-models"
val fs = FileSystem.get(sc.hadoopConfiguration)
if (fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath), true)
resultRDD.saveAsTextFile(outputPath)
println(s"结果已保存到 hdfs://$outputPath")
验证结果(在Linux终端执行)
# 查看输出结果
hdfs dfs -cat /dw/account-models/part-* | head -n 10
# 如果需要合并结果到单个文件
hdfs dfs -getmerge /dw/account-models ./account_models.txt
head account_models.txt