Es gibt keine Magie im Falle der verschachtelten Sammlung. Spark wird auf die gleiche Weise eine RDD[(String, String)]
und eine RDD[(String, Seq[String])]
behandeln.
Das Lesen einer solchen verschachtelten Sammlung aus Parquet-Dateien kann jedoch schwierig sein.
Nehmen wir ein Beispiel nehmen von der spark-shell
(1.3.1):
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
die Parkett-Datei schreiben:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
Lesen Sie die Parkett-Datei:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
die wichtige Teil ist row.getAs[Seq[Row]](1)
. Die interne Darstellung einer verschachtelten Sequenz von struct
ist ArrayBuffer[Row]
, Sie könnten einen beliebigen Super-Typ anstelle von Seq[Row]
verwenden. Der 1
ist der Spaltenindex in der äußeren Zeile. Ich habe die Methode getAs
hier verwendet, aber es gibt Alternativen in den neuesten Versionen von Spark. Siehe den Quellcode der Row trait.
Jetzt, da Sie eine RDD[Outer]
haben, können Sie jede gewünschte Transformation oder Aktion anwenden.
Beachten Sie, dass wir die Spark-SQL-Bibliothek nur zum Lesen der Parkettdatei verwendet haben. Sie können beispielsweise nur die gewünschten Spalten direkt auf dem Datenrahmen auswählen, bevor Sie sie einer RDD zuordnen.
dataFrame.select('col1, 'col2).map { row => ... }
Danke Lomig für ausführliche Antwort. Ich habe es als richtige Antwort markiert. Obwohl wir noch nicht bei Spark 1.3 sind, planen wir diesen Monat zu aktualisieren. Kann in Spark 1.2 auf die Datenrahmen-API verzichtet werden? Könnten Sie bitte lassen Sie mich wissen, wie getAs [Seq [Row]] (1) funktioniert? Index [1] ist die Position der Spalte, die das verschachtelte Array enthält. Ist das richtig? – Tagar
Siehe meine Bearbeitung. Für Spark 1.2 können Sie den exakt gleichen Code für die Umwandlung von 'Row' in Ihre Fallklasse verwenden. Bitte lesen Sie die offizielle Dokumentation für die Syntax, um eine Parkett-Datei in älteren Versionen zu lesen, es ist sehr nahe. –
Bekam es. Danke vielmals. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L268 GetSeq [Zeile] (1) würde tun auch? – Tagar