2016-01-21 13 views
12

Ich schreibe einen ETL-Prozess, wo ich stündliche Protokolldateien lesen, die Daten partitionieren und speichern muss. Ich benutze Spark (in Databricks). Die Protokolldateien sind CSV, also lese ich sie und wenden ein Schema an, dann führe meine Transformationen aus.Neue Daten an partitionierte Parkettdateien anhängen

Mein Problem ist, wie kann ich jede Stunde Daten als Parkett-Format speichern, sondern an den vorhandenen Datensatz anhängen? Beim Speichern muss ich durch 4 Spalten im Datenrahmen partitionieren.

Hier ist meine speichern Linie:

data 
    .filter(validPartnerIds($"partnerID")) 
    .write 
    .partitionBy("partnerID","year","month","day") 
    .parquet(saveDestination) 

Das Problem ist, dass, wenn der Zielordner das Speichern wirft einen Fehler vorhanden ist. Wenn das Ziel nicht existiert, werde ich meine Dateien nicht anhängen.

Ich habe versucht, .mode("append") verwenden, aber ich finde, dass Funken manchmal auf halbem Weg nicht durch so dass ich verliere am Ende, wie viel von meinen Daten geschrieben werden und wie viel ich muß noch schreiben.

Ich benutze Parkett, weil die Partitionierung meine Abfrage in Zukunft wesentlich erhöht. Außerdem muss ich die Daten als Dateiformat auf Platte schreiben und kann keine Datenbank wie Druid oder Cassandra verwenden.

Irgendwelche Vorschläge, wie ich meinen Dataframe partitionieren und die Dateien speichern kann (entweder bei Parkett oder einem anderen Format bleiben) wird sehr geschätzt.

+1

Können Sie den Fehler gemeinsam haben, erhalten Sie, wenn Sie Verwenden Sie '.mode (append)'. –

+0

Der Fehler, den ich bekomme, ist dies: verursacht durch: java.io.IOException: Datei existiert bereits:/tracking/v4/010316/gif = a/partnerID = 111/Jahr = 2016/Monat = 1/Tag = 3/Teil -r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet Ich denke, dass dieser Fehler aufgrund einer Nichtübereinstimmung der Aufgabenplanung ausgelöst wird, wenn einige der Schreibvorgänge eine lange Zeit benötigen. – Saman

Antwort

9

Wenn Sie die Dateien anhängen müssen, müssen Sie auf jeden Fall den Append-Modus verwenden. Ich weiß nicht, wie viele Partitionen Sie es erwarten zu generieren, aber ich finde, dass, wenn Sie viele Partitionen haben, partitionBy eine Reihe von Problemen verursachen (Gedächtnis- und IO-Ausgaben wie).

Wenn Sie denken, dass Ihr Problem durch Schreiboperationen verursacht wird, zu lange dauert, empfehle ich Ihnen, diese beiden Dinge versuchen:

1) bissig Verwenden von der Konfiguration hinzufügen:

conf.set("spark.sql.parquet.compression.codec", "snappy") 

2 Deaktivieren) Generierung der Metadaten-Dateien im hadoopConfiguration auf dem SparkContext wie folgt aus:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

die Metadaten-Dateien werden etwas Zeit consumi ng zu erzeugen (siehe this blog post), aber nach this sind sie eigentlich nicht wichtig. Persönlich deaktiviere ich sie immer und habe keine Probleme.

Wenn Sie viele Partitionen (> 500) zu erzeugen, ich fürchte, das Beste, was ich tun kann, um Sie wird vorschlagen, dass Sie eine Lösung suchen in nicht mit Anfügen-Modus - ich einfach nie partitionBy zu bekommen geschaffen zu arbeiten mit so vielen Partitionen.

+0

Danke Glennie. Ich deaktiviere immer die Metadatendateien, die genau wegen dieses Blogposts erzeugt werden: D. Ich erstelle definitiv mehr als 500 Partitionen. Ich glaube, dass die meisten meiner Probleme auf die Tatsache zurückzuführen sind, dass das Parkettformat nicht als aktualisierbares Format verwendet werden sollte, und ich behandle es wie eine Datenbanktabelle. Haben Sie Vorschläge für eine andere Möglichkeit, meine täglichen Daten zu speichern? – Saman

+0

Ich habe ähnliches Problem, ich bin auf der Grundlage der aktuellen Zeitstempel partitionieren, mit jeder neuen Partition angehängt erstellt es insgesamt Aufgabe entspricht Partitionen so weit. Wenn es 1000 Partitionen und 1 neue hinzuzufügen gibt, wird 1001 Task ausgeführt und die Gesamtarbeitszeit wird erhöht. Mache ich hier etwas falsch? –

0

Wenn Sie unsortiert verwenden Partitionierung Ihrer Daten über alle Partitionen aufgeteilt werden soll. Das bedeutet, dass jede Aufgabe Daten für jede Ihrer Ausgabedateien generiert und schreibt.

Betrachten Sie Ihre Daten nach Ihrer Partition Spalten vor dem Schreiben repartitioning die Daten alle auf den gleichen Partitionen pro Ausgabedatei haben:

data 
.filter(validPartnerIds($"partnerID")) 
.repartition([optional integer,] "partnerID","year","month","day") 
.write 
.partitionBy("partnerID","year","month","day") 
.parquet(saveDestination) 

See: DataFrame.repartition