【Spark】HashMap转RDD

avatar
作者
筋斗云
阅读量:0

1、读取本地文件,转换成map

val path = "文件路径" val source = Source.fromFile(path).getLines().toList.mkString("").replaceAll(" ","")  val key = JSON.parseObject(source).get("key").toString val columns = JSON.parseObject(source).get("value").toString  val map = new util.HashMap[String, String]() map.put("RK", getValue(key))  JSON.parseObject(columns.toString).keySet().asScala.foreach(elem => {     val valueJson = JSON.parseObject(columns.toString).get(elem).toString     map.put(elem, getValue(valueJson)) })    def getValue(str: String): String = {      val value = str.toString.replace("[","").replace("]","")     JSON.parseObject(value).get("value").toString }

2、将map转变成rdd

val schema = StructType(map.asScala.toSeq.map {case(k,v) =>     StruchField(k, StringType, nullable = true) })  val row = Row.fromSeq(map.values().asScala.toSeq)  val rowRDD = spark.sparkContext.parallelize(Seq(row))  val df = spark.createDataFrame(rowRDD, schema)

备注:数据格式

{     "key":[         {             "name":"RK",             "type":"String",             "value":"1234567890"         }     ],     "columns":{         "column_name1":[             "name":"column_name1",             "type":"String",             "value":"111"            ],         "column_name2":[             "name":"column_name2",             "type":"String",             "value":"222"            ],         "column_name3":[             "name":"column_name3",             "type":"String",             "value":"333"            ]     } }

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!