1、读取本地文件,转换成map
val path = "文件路径"
val source = Source.fromFile(path).getLines().toList.mkString("").replaceAll(" ","")
val key = JSON.parseObject(source).get("key").toString
val columns = JSON.parseObject(source).get("value").toString
val map = new util.HashMap[String, String]()
map.put("RK", getValue(key))
JSON.parseObject(columns.toString).keySet().asScala.foreach(elem => {
val valueJson = JSON.parseObject(columns.toString).get(elem).toString
map.put(elem, getValue(valueJson))
})
def getValue(str: String): String = {
val value = str.toString.replace("[","").replace("]","")
JSON.parseObject(value).get("value").toString
}
2、将map转变成rdd
val schema = StructType(map.asScala.toSeq.map {case(k,v) =>
StruchField(k, StringType, nullable = true)
})
val row = Row.fromSeq(map.values().asScala.toSeq)
val rowRDD = spark.sparkContext.parallelize(Seq(row))
val df = spark.createDataFrame(rowRDD, schema)
备注:数据格式
{
"key":[
{
"name":"RK",
"type":"String",
"value":"1234567890"
}
],
"columns":{
"column_name1":[
"name":"column_name1",
"type":"String",
"value":"111"
],
"column_name2":[
"name":"column_name2",
"type":"String",
"value":"222"
],
"column_name3":[
"name":"column_name3",
"type":"String",
"value":"333"
]
}
}