阅读量:0
1、读取本地文件,转换成map
val path = "文件路径" val source = Source.fromFile(path).getLines().toList.mkString("").replaceAll(" ","") val key = JSON.parseObject(source).get("key").toString val columns = JSON.parseObject(source).get("value").toString val map = new util.HashMap[String, String]() map.put("RK", getValue(key)) JSON.parseObject(columns.toString).keySet().asScala.foreach(elem => { val valueJson = JSON.parseObject(columns.toString).get(elem).toString map.put(elem, getValue(valueJson)) }) def getValue(str: String): String = { val value = str.toString.replace("[","").replace("]","") JSON.parseObject(value).get("value").toString }
2、将map转变成rdd
val schema = StructType(map.asScala.toSeq.map {case(k,v) => StruchField(k, StringType, nullable = true) }) val row = Row.fromSeq(map.values().asScala.toSeq) val rowRDD = spark.sparkContext.parallelize(Seq(row)) val df = spark.createDataFrame(rowRDD, schema)
备注:数据格式
{ "key":[ { "name":"RK", "type":"String", "value":"1234567890" } ], "columns":{ "column_name1":[ "name":"column_name1", "type":"String", "value":"111" ], "column_name2":[ "name":"column_name2", "type":"String", "value":"222" ], "column_name3":[ "name":"column_name3", "type":"String", "value":"333" ] } }