2016-09-11 8 views
3

Ich versuche, ein Schema des vorhandenen Datenrahmens mit verschachtelten Feldern zu glätten. Struktur meines Datenrahmen ist so etwas wie die:Explodieren Array in Apache Funke Datenrahmen

root 
|-- Id: long (nullable = true) 
|-- Type: string (nullable = true) 
|-- Uri: string (nullable = true)  
|-- Type: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- Gender: array (nullable = true) 
| |-- element: string (containsNull = true) 

Art und Geschlecht Anordnung von Elementen enthalten können, ein Element oder Nullwert. Ich habe versucht, den folgenden Code zu verwenden:

var resDf = df.withColumn("FlatType", explode(df("Type"))) 

Aber als Ergebnis in einem resultierenden Datenrahmen I lose Zeilen, für die ich null-Werte für Spalte Typ hatte. Es bedeutet zum Beispiel, wenn ich 10 Zeilen habe und in 7 Zeilen ist der Typ Null und in 3 ist der Typ nicht null, nachdem ich explodieren im resultierenden Datenrahmen habe ich nur drei Zeilen.

Wie kann ich Zeilen mit NULL-Werten behalten, aber Array von Werten explodieren?

Ich fand eine Art Workaround, aber immer noch an einem Ort fest.

def customExplode(df: DataFrame, field: String, colType: String): org.apache.spark.sql.Column = { 
var exploded = None: Option[org.apache.spark.sql.Column] 
colType.toLowerCase() match { 
    case "string" => 
    val avoidNull = udf((column: Seq[String]) => 
    if (column == null) Seq[String](null) 
    else column) 
    exploded = Some(explode(avoidNull(df(field)))) 
    case "boolean" => 
    val avoidNull = udf((xs: Seq[Boolean]) => 
    if (xs == null) Seq[Boolean]() 
    else xs) 
    exploded = Some(explode(avoidNull(df(field)))) 
    case _ => exploded = Some(explode(df(field))) 
} 
exploded.get 

}

Und danach ist es nur verwenden, wie diese: Für Standardtypen wir folgende tun können, jedoch

val explodedField = customExplode(resultDf, fieldName, fieldTypeMap(field)) 
resultDf = resultDf.withColumn(newName, explodedField) 

, habe ich ein Problem für Strukturtyp für den folgenden Typen der Struktur:

|-- Address: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- AddressType: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- DEA: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- Number: array (nullable = true) 
| | | | | |-- element: string (containsNull = true) 
| | | | |-- ExpirationDate: array (nullable = true) 
| | | | | |-- element: timestamp (containsNull = true) 
| | | | |-- Status: array (nullable = true) 
| | | | | |-- element: string (containsNull = true) 

Wie können wir diese Art von Schema verarbeiten, wenn DEA null ist?

Vielen Dank im Voraus.

P.S. Ich habe versucht, Seitenansichten zu verwenden, aber das Ergebnis ist das gleiche.

Antwort

1

Vielleicht können Sie versuchen, when mit:

val resDf = df.withColumn("FlatType", when(df("Type").isNotNull, explode(df("Type"))) 

Wie documentation in der when Funktion gezeigt, der Wert null wird für die Werte eingefügt, die die Bedingungen nicht übereinstimmen.

+0

Entschuldigung, aber wenn ich diese Lösung versuche, habe ich die folgende Ausnahme: java.lang.UnsupportedOperationException. Wenn ich explode (df ("Typ")) mit nur einem Wert ersetzen - es funktioniert gut. Ich nehme an, wenn die Funktion keine explodierte Spalte als Wert – Artem

+0

@ Artem unterstützt, sind Sie richtig, tut mir leid. Ist eine "Gewerkschaft" eine Option für Sie? Du könntest 'df.where ($" Type ".isNull) .withColumn (" FlatType ", leuchtet (null)). UnionAll (df.withColumn (" FlatType ", explode ($" Type ")))' –

+0

ja Danke, ich habe über diese Option nachgedacht, aber ich entwickle einen generischen Algorithmus für das Reduzieren des Schemas und ich fürchte, dass die Vereinigung sehr langsam sein kann. Ich hoffe, eine bessere Lösung zu finden, aber Union ist eine Backup-Option für mich. – Artem