2016-11-11 1 views
0

Ich habe Code, um Zeilen Dataframe zu konvertieren, aber ich habe Problem, in Array auszugeben.Wie Zeilenargument zu Array Json-Ausgabe mit Spark-Datenrahmen konvertieren

Input: datei.txt

+-------------------------------+--------------------+-------+ 
|id        |var     |score | 
+-------------------------------+--------------------+-------+ 
|12345       |A     |8  | 
|12345       |B     |9  | 
|12345       |C     |7  | 
|12345       |D     |6  | 
+-------------------------------+--------------------+-------+ 

output:

{"id":"12345","props":[{"var":"A","score":"8"},{"var":"B","score":"9"},{"var":"C","score":"7"},{"var":"D","score":"6"}]} 

ich versuchte collect_lis nicht erfolgreich verwenden. Mein Code ist mit scala

val sc = new SparkContext(conf); 
val sqlContext = new HiveContext(sc) 

val df = sqlContext.read.json("file.txt") 
val dfCol = df.select(
    df("id"), 
    df("var"), 
    df("score")) 
dfCol.show(false) 

val merge = udf { (var: String, score: Double) => 
     { 
     var + "," + score  } 
    } 

val grouped = dfCol.groupBy(col("id")) 
     .agg(collect_list(merge(col("var"),col("score")).alias("props")) 
grouped.show(false) 

Meine Frage ist, wie Datenzeile in Ausgabe Array Json konvertieren?

Danke.

+0

Warum versuchen Sie nicht, die DF nach ID zu gruppieren und dann die DF in die JSON-Datei selbst zu schreiben? Ich hoffe, dass sollte als Array von var und Requisiten zurückkehren. – Shankar

Antwort

0

Oke, ich habe Antwort in meiner Frage.

  case class Props(var: String, score: Double) 
      case class PropsArray(id: String, props: Seq[Props]) 
      val sc = new SparkContext(conf); 
      val sqlContext = new HiveContext(sc) 

      val df = sqlContext.read.json("file.txt") 
      val dfCol = df.select(
       df("id"), 
       df("var"), 
       df("score")) 


      val merge = udf { (var: String, score: Double) => 
        { 
        var + "," + score  } 
       } 

      val grouped = dfCol.groupBy(col("id")) 
        .agg(concat_ws("|", collect_list(merge(col("var"), col("score")))).alias("props")) 

     val merging = grouped.map(x => { 
       val list: ListBuffer[Props] = ListBuffer() 
       val data = x.getAs[String]("props").split("\\|") 

       data.foreach { x => 
       val arr = x.split(",") 

       try { 

        list.+=:(Props(arr.apply(0).toString(),arr.apply(1).toDouble)) 

       } catch { 
        case t: Throwable => t.getMessage 
       } 

       } 

       PropsArray(x.getAs("id"), list.toSeq) 

      }).toDF() 

können Sie

merging.show(false) 

laufen und Sie müssen Bibliothek in Ihrer pom.xml

<dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>1.6.0</version> 
      <exclusions> 
       <exclusion> 
        <artifactId>kryo</artifactId> 
        <groupId>com.esotericsoftware.kryo</groupId> 
       </exclusion> 
      </exclusions> 
     </dependency> 

Dank hinzufügen.