2016-03-22 4 views
10

ich einen Datenrahmen am Partitionieren wie folgt:Prevent DataFrame.partitionBy() von partitionierten Spalten von Schema zu entfernen

df.write.partitionBy("type", "category").parquet(config.outpath) 

Der Code gibt die erwarteten Ergebnisse (das heißt Daten aufgeteilt nach Typ & Kategorie). Die Spalten "Typ" und "Kategorie" werden jedoch aus dem Daten/Schema entfernt. Gibt es eine Möglichkeit, dieses Verhalten zu verhindern?

+0

Ist das nicht ein Punkt? Alle erforderlichen Daten sind noch in der Verzeichnisstruktur codiert, so dass kein Datenverlust auftritt. Wenn Sie einige Werte pro Datei wünschen, können Sie 'df.repartition (" type "," category ") versuchen. Write (...)' aber Sie werden keine schöne Struktur bekommen. – zero323

+0

@ Zero323: Ja, ich stimme zu, es gibt keinen Datenverlust. Das Wiederherstellen der für die Partitionierung verwendeten Spalten ist jedoch in einigen Anwendungsfällen nicht trivial. Zum Beispiel, wenn ich die Daten in Schwein laden möchte, wie würde ich die Spalten des Typs und der Kategorie wiederherstellen? – Michael

+0

Ich habe das Schwein schon eine Weile nicht mehr benutzt. Versteht 'ParquetLoader' nicht die Struktur aus der Box? – zero323

Antwort

8

Ich kann mir eine Workaround vorstellen, die eher lahm ist, aber funktioniert.

import spark.implicits._ 

val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category") 
duplicated.write.partitionBy("_type", "_category").parquet(config.outpath) 

Ich beantworte diese Frage in der Hoffnung, dass jemand eine bessere Antwort oder Erklärung, als hätte ich (wenn OP eine bessere Lösung gefunden hat), obwohl, da ich die gleiche Frage habe.

+1

Eigentlich sieht das nicht so lahm für mich aus. Scheint wie der beste Ansatz angesichts des Verhaltens von 'partitionBy()'. – Michael

1

Im Allgemeinen ist Ivans Antwort ein feiner Clugge. ABER ...

Wenn Sie strikt lesen und in Spark schreiben, können Sie einfach die basePath-Option verwenden, wenn Sie Ihre Daten lesen.

https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery

durch den Pfad vorbei/oder entweder SparkSession.read.parquet SparkSession.read.load zu/table, SQL Funke die Partitionierungsinformationen aus den Pfaden automatisch extrahieren.

Beispiel:

 val dataset = spark 
     .read 
     .format("parquet") 
     .option("basePath", hdfsInputBasePath) 
     .load(hdfsInputPath) 
Verwandte Themen