2017-04-23 1 views
1

Jeder Datensatz in einer RDD enthält einen JSON. Ich verwende SqlContext einen Datenrahmen aus dem Json wie diese zu erstellen:Wie Array in DataFrame (von JSON) abwickeln?

val signalsJsonRdd = sqlContext.jsonRDD(signalsJson) 

Unter dem Schema ist. datepayload ist ein Array von Elementen. Ich möchte das Array von Elementen explodieren, um einen Datenrahmen zu erhalten, bei dem jede Zeile ein Element von data payload ist. Ich habe versucht, etwas zu tun, basierend auf this Antwort, aber es scheint, dass ich die gesamte Struktur des Elements in der Fall Row (arr: Array [...]) Anweisung modellieren müsste. Vermutlich vermisse ich etwas.

val payloadDfs = signalsJsonRdd.explode($"data.datapayload"){ 
    case org.apache.spark.sql.Row(arr: Array[String]) => arr.map(Tuple1(_)) 
} 

Der obige Code wirft einen scala.MatchError, weil die Art der tatsächlichen Row sehr unterschiedlich von Zeile ist (arr: Array [String]). Es gibt wahrscheinlich einen einfachen Weg, um zu tun, was ich will, aber ich kann es nicht finden. Bitte helfen Sie.

Schema geben unter

signalsJsonRdd.printSchema() 

root 
|-- _corrupt_record: string (nullable = true) 
|-- data: struct (nullable = true) 
| |-- dataid: string (nullable = true) 
| |-- datapayload: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- Reading: struct (nullable = true) 
| | | | |-- A2DPActive: boolean (nullable = true) 
| | | | |-- Accuracy: double (nullable = true) 
| | | | |-- Active: boolean (nullable = true) 
| | | | |-- Address: string (nullable = true) 
| | | | |-- Charging: boolean (nullable = true) 
| | | | |-- Connected: boolean (nullable = true) 
| | | | |-- DeviceName: string (nullable = true) 
| | | | |-- Guid: string (nullable = true) 
| | | | |-- HandsFree: boolean (nullable = true) 
| | | | |-- Header: double (nullable = true) 
| | | | |-- Heading: double (nullable = true) 
| | | | |-- Latitude: double (nullable = true) 
| | | | |-- Longitude: double (nullable = true) 
| | | | |-- PositionSource: long (nullable = true) 
| | | | |-- Present: boolean (nullable = true) 
| | | | |-- Radius: double (nullable = true) 
| | | | |-- SSID: string (nullable = true) 
| | | | |-- SSIDLength: long (nullable = true) 
| | | | |-- SpeedInKmh: double (nullable = true) 
| | | | |-- State: string (nullable = true) 
| | | | |-- Time: string (nullable = true) 
| | | | |-- Type: string (nullable = true) 
| | | |-- Time: string (nullable = true) 
| | | |-- Type: string (nullable = true) 
+0

Sieht sehr ähnlich zu http://stackoverflow.com/q/43411832/1305344. –

Antwort

2

tl; drexplode Funktion ist dein Freund (oder mein Favorit flatMap).

explode Funktion erstellt eine neue Zeile für jedes Element in der angegebenen Array- oder Map-Spalte.

So etwas wie die folgenden funktionieren sollte:

signalsJsonRdd.withColumn("element", explode($"data.datapayload")) 

Siehe functions Objekt.

+0

Danke, das hat funktioniert. Könntest du mir bitte helfen zu verstehen, was der Unterschied zwischen der Funktion explode und der Methode signalsRdd.explode ist? Ich habe die Methode benutzt, weil ich das in jedem Beispiel gesehen habe und sie nicht zum Laufen bringen konnte. –

+1

Wie Sie wünschen ... http: //stackoverflow.com/q/43582989/1305344 –