2015-05-02 18 views
17

ich ein Parkett Tisch läuft mit einer der Säulen seinesWie eine verschachtelte Sammlung in Spark-

, Array < struct < col1, col2, .. coln >>

Kann zum Lesen Abfragen für diese Tabelle in Hive mit der LATERAL VIEW-Syntax.

Wie liest man diese Tabelle in eine RDD, und noch wichtiger, wie diese verschachtelte Sammlung in Spark filtern, zuordnen usw.?

In der Spark-Dokumentation konnten keine Referenzen gefunden werden. Vielen Dank im Voraus für jede Information!

ps. Felt könnte hilfreich sein, um einige Werte auf den Tisch zu bringen. Anzahl der Spalten in Haupttabelle ~ 600. Anzahl der Reihen ~ 200m. Anzahl der "Spalten" in verschachtelten Sammlung ~ 10. Durchschn. Anzahl der Datensätze in verschachtelter Sammlung ~ 35.

Antwort

18

Es gibt keine Magie im Falle der verschachtelten Sammlung. Spark wird auf die gleiche Weise eine RDD[(String, String)] und eine RDD[(String, Seq[String])] behandeln.

Das Lesen einer solchen verschachtelten Sammlung aus Parquet-Dateien kann jedoch schwierig sein.

Nehmen wir ein Beispiel nehmen von der spark-shell (1.3.1):

scala> import sqlContext.implicits._ 
import sqlContext.implicits._ 

scala> case class Inner(a: String, b: String) 
defined class Inner 

scala> case class Outer(key: String, inners: Seq[Inner]) 
defined class Outer 

die Parkett-Datei schreiben:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b"))))) 
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25 

scala> outers.toDF.saveAsParquetFile("outers.parquet") 

Lesen Sie die Parkett-Datei:

scala> import org.apache.spark.sql.catalyst.expressions.Row 
import org.apache.spark.sql.catalyst.expressions.Row 

scala> val dataFrame = sqlContext.parquetFile("outers.parquet") 
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>] 

scala> val outers = dataFrame.map { row => 
    | val key = row.getString(0) 
    | val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1))) 
    | Outer(key, inners) 
    | } 
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848 

die wichtige Teil ist row.getAs[Seq[Row]](1). Die interne Darstellung einer verschachtelten Sequenz von struct ist ArrayBuffer[Row], Sie könnten einen beliebigen Super-Typ anstelle von Seq[Row] verwenden. Der 1 ist der Spaltenindex in der äußeren Zeile. Ich habe die Methode getAs hier verwendet, aber es gibt Alternativen in den neuesten Versionen von Spark. Siehe den Quellcode der Row trait.

Jetzt, da Sie eine RDD[Outer] haben, können Sie jede gewünschte Transformation oder Aktion anwenden.

Beachten Sie, dass wir die Spark-SQL-Bibliothek nur zum Lesen der Parkettdatei verwendet haben. Sie können beispielsweise nur die gewünschten Spalten direkt auf dem Datenrahmen auswählen, bevor Sie sie einer RDD zuordnen.

dataFrame.select('col1, 'col2).map { row => ... } 
+1

Danke Lomig für ausführliche Antwort. Ich habe es als richtige Antwort markiert. Obwohl wir noch nicht bei Spark 1.3 sind, planen wir diesen Monat zu aktualisieren. Kann in Spark 1.2 auf die Datenrahmen-API verzichtet werden? Könnten Sie bitte lassen Sie mich wissen, wie getAs [Seq [Row]] (1) funktioniert? Index [1] ist die Position der Spalte, die das verschachtelte Array enthält. Ist das richtig? – Tagar

+1

Siehe meine Bearbeitung. Für Spark 1.2 können Sie den exakt gleichen Code für die Umwandlung von 'Row' in Ihre Fallklasse verwenden. Bitte lesen Sie die offizielle Dokumentation für die Syntax, um eine Parkett-Datei in älteren Versionen zu lesen, es ist sehr nahe. –

+0

Bekam es. Danke vielmals. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L268 GetSeq [Zeile] (1) würde tun auch? – Tagar

8

Ich gebe eine Python-basierte Antwort, da das ist, was ich benutze. Ich denke Scala hat etwas Ähnliches.

Die Funktion wurde in Spark 1.4.0 hinzugefügt, um verschachtelte Arrays in DataFrames gemäß der Python API docs zu behandeln.

Erstellen Sie eine Testdatenrahmen:

from pyspark.sql import Row 

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])]) 
df.show() 

## +-+--------------------+ 
## |a|    intlist| 
## +-+--------------------+ 
## |1|ArrayBuffer(1, 2, 3)| 
## |2|ArrayBuffer(4, 5, 6)| 
## +-+--------------------+ 

Verwenden explode die Liste Spalte zu glätten:

from pyspark.sql.functions import explode 

df.select(df.a, explode(df.intlist)).show() 

## +-+---+ 
## |a|_c0| 
## +-+---+ 
## |1| 1| 
## |1| 2| 
## |1| 3| 
## |2| 4| 
## |2| 5| 
## |2| 6| 
## +-+---+ 
+0

Danke dnlbrky. Es sieht einfacher aus als Scala. Ich werde definitiv versuchen, Ihr Python-Beispiel .. Wir hätten wahrscheinlich Spark 1.4 nicht bis irgendwann Ende dieses Jahres, sobald Cloudera CDH 5.5 veröffentlicht :-) Hoffe, Spark 1.5 zu dieser Zeit zu haben. – Tagar

3

Ein anderer Ansatz, wie diese mit Pattern-Matching würde:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
    case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match { 
    case List(a:String, b: String) => (a, b) 
    }).toList 
}) 

können Sie Muster stimmt direkt mit Row überein, aber es schlägt wahrscheinlich aus einigen Gründen fehl.

0

Die obigen Antworten sind alle gute Antworten und gehen diese Frage von verschiedenen Seiten an; Spark SQL ist auch eine sehr nützliche Möglichkeit, auf verschachtelte Daten zuzugreifen.

Hier ist ein Beispiel, wie Sie explode() in SQL direkt verwenden, um verschachtelte Auflistung abzufragen.

SELECT hholdid, tsp.person_seq_no 
FROM ( SELECT hholdid, explode(tsp_ids) as tsp 
     FROM disc_mrt.unified_fact uf 
    ) 

tsp_ids ist ein von structs verschachtelt, was darunter viele Attribute hat, person_seq_no die ich oben in der äußeren Abfrage bin Auswahl.

Oben wurde in Spark 2.0 getestet. Ich habe einen kleinen Test gemacht und es funktioniert nicht in Spark 1.6. Diese Frage wurde gestellt, als Spark 2 nicht verfügbar war. Diese Antwort ergänzt die Liste der verfügbaren Optionen für verschachtelte Strukturen.

Auffällig nicht aufgelöst Jiras auf explode() für SQL-Zugriff:

Verwandte Themen