2017-12-10 1 views
0

Beginnend mit pyspark und laufen in einen Engpass arbeiten ich mit meinem Code erstellt haben:parallelisieren pyspark 2.2.0 Datenrahmen partitioniert Schreib bis S3

I „Gruppierung von“ pyspark 2.2.0 Datenrahmen in Partitionen von drive_id bin und jede Partition (Gruppe) in eine eigene Position auf S3 schreiben.

Ich brauche es, Athena Tabelle auf S3-Speicherort von drive_id partitioniert zu definieren - dies ermöglicht mir, Daten sehr effizient zu lesen, wenn von drive_id abgefragt wird.

Das Problem ist, dass die Schleife Serielle Verarbeitung macht und Laufwerk Partitionen nur eins nach dem anderen schreibt.

Offensichtlich skaliert das nicht gut, weil einzelne Partition Schreibaufgabe ziemlich klein ist und parallelisieren es nicht viel gibt.

Wie ersetze ich die Schleife mit einem einzigen Schreibbefehl, der alle Partitionen in einer einzigen Operation an verschiedenen Orten schreibt?

Dieser Vorgang sollte parallelisiert werden, damit er auf Funkenarbeitern und nicht auf den Fahrer läuft.

Antwort

1

Ich fand die Antwort - überraschend einfach.

dataframe.write.parquet hat den optionalen Parameter partitionBy (names_of_partitioning_columns).

Also keine Notwendigkeit, in der "Gruppe von" und keine Notwendigkeit, in der Schleife: die einzige Zeile mit:

df.write.partitionBy(drive_id).parquet("s3n://s3bucket/dir") 

erstellt Partitionen in Standard-hive-Format „S3N: // s3Bucket/dir/drive_id = 123 "