2015-11-20 7 views
6

Ich habe eine DataFrame, die ich nach einer bestimmten Partitionierung in S3 schreiben muss. Der Code sieht wie folgt aus:Wie man die Anzahl der Parkettdateien steuert, die bei der Verwendung von partitionBy generiert wurden

dataframe 
    .write 
    .mode(SaveMode.Append) 
    .partitionBy("year", "month", "date", "country", "predicate") 
    .parquet(outputPath) 

Die partitionBy die Daten in eine ziemlich große Anzahl von Ordnern aufgeteilt (~ 400) mit nur ein wenig von Daten (~ 1 GB) in jedem. Und hier kommt das Problem - weil der Standardwert von spark.sql.shuffle.partitions 200 ist, wird die 1 GB Daten in jedem Ordner in 200 kleine Parkett-Dateien aufgeteilt, woraus insgesamt etwa 80000 Parkett-Dateien geschrieben werden. Dies ist aus mehreren Gründen nicht optimal und ich möchte dies vermeiden.

Ich kann natürlich die spark.sql.shuffle.partitions auf eine viel kleinere Zahl, sagen wir 10, aber wie ich verstehe, steuert diese Einstellung auch die Anzahl der Partitionen für Shuffle in Joins und Aggregation, also will ich nicht wirklich das ändern.

Weiß jemand, ob es eine andere Möglichkeit gibt zu steuern, wie viele Dateien geschrieben werden?

+1

Haben Sie versucht, den Datenrahmen vor '.write' erneut zu partitionieren? Auf den ersten Blick scheint 'spark.sql.shuffle.partitions' nur in Shuffles und Joins verwendet zu werden, aber nirgendwo sonst. Andernfalls sollten Sie ein Ticket für einen zusätzlichen Parameter 'numParameter' in partitionBy öffnen. –

+0

@MariusSoutier Hmmm ... Ich würde denken, dass das Aufrufen von 'repartition' _before_' write' dazu führen würde, dass mein ursprünglicher 'Datenrahmen' neupartitioniert wird, bevor er dann von der 'partitionBy'-Funktion neupartitioniert wird. Die Neupartitionierung des ursprünglichen Datenrahmens in nur 10 Partitionen würde definitiv zu einer OOM-Ausnahme führen. Allerdings habe ich gerade angefangen, es zu testen. Ich werde mit einem Update zurück, sobald es abgeschlossen ist. –

+0

@MariusSoutier es funktioniert! Fantastisch. Vielen Dank! Willst du es als Antwort posten - dann werde ich es als beantwortet markieren :-) –

Antwort

6

Wie Sie richtig bemerkt haben, gilt spark.sql.shuffle.partitions nur für Shuffle und Joins in SparkSQL.

partitionBy in DataFrameWriter (Sie bewegen von DateFrame zu DateFrameWriter, sobald Sie write nennen) arbeitet einfach auf die vorherige Anzahl der Partitionen. (Die Partition des Writers weist der Tabelle/Parkett-Datei nur Spalten zu, die ausgeschrieben werden, hat also nichts mit der Anzahl der Partitionen zu tun. Das ist etwas verwirrend.)

Lange Rede, kurzer Sinn, einfach den DataFrame neu partitionieren bevor Sie es in einen Schriftsteller verwandeln.

Verwandte Themen