2016-12-20 3 views
0

ich einen Datenrahmen haben wie dieseSpark-2.0.1: split JSON Array Spalte in arraytype (String)

root 
|-- sum_id: long (nullable = true) 
|-- json: string (nullable = true) 

+-------+------------------------------+ 
|sum_id |json       | 
+-------+------------------------------+ 
|8124455|[{"itemId":11},{"itemId":12}] | 
|8124457|[{"itemId":53}]    | 
|8124458|[{"itemId":11},{"itemId":33}] | 
+-------+------------------------------+ 

und ich würde in dieser mit Scala zu explodieren wie

root 
|-- sum_id: long (nullable = true) 
|-- itemId: int(nullable = true) 

+-------+--------+ 
|sum_id |itemId | 
+-------+--------+ 
|8124455|11  | 
|8124455|12  | 
|8124457|53  | 
|8124458|11  | 
|8124458|33  | 
+-------+--------+ 

Was ich versucht:

  1. Mit get_json_object, aber die Spalte ist ein Array von JSON-Objekten, so denke ich, es in Objekt explodieren sollte zuerst, aber wie?

  2. Versuchte Spalte jsonStringType-ArrayType(StringType) zu werfen, bekam aber data type mismatch Ausnahmen.

Bitte führen Sie mich, wie Sie dieses Problem lösen.

+0

die Ausgabe von df.printSchema hinzufügen Bitte. –

+0

@ShyamendraSolanki: Ich habe hinzugefügt. Bitte schau es dir an. –

Antwort

1

Unten Code wird Ihre Arbeit genau tun.

val toItemArr = udf((jsonArrStr:String) => { 
     jsonArrStr.replace("[","").replace("]","").split(",") 
    }) 

inputDataFrame.withColumn("itemId",explode(toItemArr(get_json_object(col("json"),"$[*].itemId")))).drop("json").show 


+-------+------+ 
|  id|itemId| 
+-------+------+ 
|8124455| 11| 
|8124455| 12| 
|8124457| 53| 
|8124458| 11| 
|8124458| 33| 
+-------+------+ 
+0

Danke! Es klappt! –

+0

Nur noch eine Frage, @SanthoshPrasad, wenn ich die json Elemente mehr als 1 Attribut habe (zB 'gekaufte_Zeit') und ich will es auch bekommen. Wie kann ich das machen? –

+0

Sie können die Informationen in eine andere Spalte mit der gleichen Funktion (zum Beispiel get_json_object (col ("json"), "$ [*]. Gekauften_Zeit")) durch Angabe des Attributs Namen, oder wenn Sie möchten, können Sie zwei bekommen Attribute Werte und concat/merge sie in einzelne Spalte. – SanthoshPrasad

0

AS Sie Json verwenden, so könnte dies der beste Ansatz sein:

Bitte nehmen Sie sich einen Blick:

import org.apache.spark._ 
import com.fasterxml.jackson.module.scala.DefaultScalaModule 
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper 
import com.fasterxml.jackson.databind.ObjectMapper 
import com.fasterxml.jackson.databind.DeserializationFeature 

val df = sc.parallelize(Seq((8124455,"""[{"itemId":11},{"itemId":12}]"""),(8124457,"""[{"itemId":53}]"""),(8124458,"""[{"itemId":11},{"itemId":33}]"""))).toDF("sum_id","json") 
val result = df.rdd.mapPartitions(records => { 
     val mapper = new ObjectMapper with ScalaObjectMapper 
     mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 
     mapper.registerModule(DefaultScalaModule) 
     val values=records.flatMap(record => { 
      try { 
      Some((record.getInt(0),mapper.readValue(record.getString(1), classOf[List[Map[String,Int]]]).map(_.map(_._2).toList).flatten)) 
      } catch { 
      case e: Exception => None 
      } 
     }) 
values.flatMap(listOfList=>listOfList._2.map(a=>(listOfList._1,a))) 
    }, true) 

result.toDF.show()