2017-11-02 4 views
0

Ich habe folgendes Schema, das ich von csv lesen:De normalisierenden Daten in Funken scala

val PersonSchema = StructType(Array(StructField("PersonID",StringType,true), StructField("Name",StringType,true))) 
val AddressSchema = StructType(Array(StructField("PersonID",StringType,true), StructField("StreetNumber",StringType,true), StructField("StreetName",StringType,true))) 

Eine Person kann mehrere Adressen hat und durch PersonID verwendet.

Kann jemand helfen, die Datensätze in eine PersonAddress-Datensätze zu transformieren, wie in der folgenden Fallklassendefinition?

case class Address(StreetNumber:String, StreetName:String) 
case class PersonAddress(PersonID:String, Name:String, Addresses:Array[Address]) 

Ich habe folgendes versucht, aber es Ausnahme im letzten Schritt geben:

val results = personData.join(addressData, Seq("PersonID"), "left_outer").groupBy("PersonID","Name").agg(collect_list(struct("StreetNumber","StreetName")) as "Addresses") 
val personAddresses = results .map(data => PersonAddress(data.getAs("PersonID"),data.getAs("Name"),data.getAs("Addresses"))) 
personAddresses.show 

Gibt einen Fehler:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to $line26.$read$$iw$$iw$Address

+0

Was ist der Typ von 'data.getAs (" Adressen ")'? Ist das nicht eine Liste von Strukturen? Sie sollten auch darüber "mappen", was ich für das Verständnis schön finde. –

+0

Wie mache ich das? Wenn ich folgendes probierte, beklagt es sich, dass "Wertkarte ist kein Mitglied von Nichts" val personAddresses = Ergebnisse .map (Daten => PersonAddress (data.getAs ("PersonID"), data.getAs ("Name") , data.getAs ("Adressen"). map (df => Adresse (df.getAs ("StreetNumber"), df.getAs ("StreetName")))) – SYL

+0

'data.getAs [Adressen] (" Adressen ") '? –

Antwort

0

Die einfachste Lösung in dieser Situtation zu verwenden wäre, ein UDF. Sammeln Sie zuerst die Straßennummern und Namen als zwei separate Listen, und verwenden Sie dann die , um alles in einen Datenrahmen von PersonAddress zu konvertieren.

val convertToCase = udf((id: String, name: String, streetName: Seq[String], streetNumber: Seq[String]) => { 
    val addresses = streetNumber.zip(streetName) 
    PersonAddress(id, name, addresses.map(t => Address(t._1, t._2)).toArray) 
}) 

val results = personData.join(addressData, Seq("PersonID"), "left_outer") 
    .groupBy("PersonID","Name") 
    .agg(collect_list($"StreetNumber").as("StreetNumbers"), 
     collect_list($"StreetName").as("StreetNames")) 
val personAddresses = results.select(convertToCase($"PersonID", $"Name", $"StreetNumbers", $"StreetNames").as("Person")) 

Dies wird Ihnen ein Schema wie folgt geben.

root 
|-- Person: struct (nullable = true) 
| |-- PersonID: string (nullable = true) 
| |-- Name: string (nullable = true) 
| |-- Addresses: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- StreetNumber: string (nullable = true) 
| | | |-- StreetName: string (nullable = true) 
+0

Wie nutzt dies die Vorteile der verteilten Berechnung von Funken? Oder wie können wir dies optimieren, um dies für die verteilte Parallelverarbeitung in Spark zu nutzen? – SYL

+0

@SYL Spark kümmert sich automatisch um Sie, solange Sie es auf einem Cluster und nicht lokal ausführen. – Shaido