Ich schreibe einen ETL-Prozess, wo ich stündliche Protokolldateien lesen, die Daten partitionieren und speichern muss. Ich benutze Spark (in Databricks). Die Protokolldateien sind CSV, also lese ich sie und wenden ein Schema an, dann führe meine Transformationen aus.Neue Daten an partitionierte Parkettdateien anhängen
Mein Problem ist, wie kann ich jede Stunde Daten als Parkett-Format speichern, sondern an den vorhandenen Datensatz anhängen? Beim Speichern muss ich durch 4 Spalten im Datenrahmen partitionieren.
Hier ist meine speichern Linie:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
Das Problem ist, dass, wenn der Zielordner das Speichern wirft einen Fehler vorhanden ist. Wenn das Ziel nicht existiert, werde ich meine Dateien nicht anhängen.
Ich habe versucht, .mode("append")
verwenden, aber ich finde, dass Funken manchmal auf halbem Weg nicht durch so dass ich verliere am Ende, wie viel von meinen Daten geschrieben werden und wie viel ich muß noch schreiben.
Ich benutze Parkett, weil die Partitionierung meine Abfrage in Zukunft wesentlich erhöht. Außerdem muss ich die Daten als Dateiformat auf Platte schreiben und kann keine Datenbank wie Druid oder Cassandra verwenden.
Irgendwelche Vorschläge, wie ich meinen Dataframe partitionieren und die Dateien speichern kann (entweder bei Parkett oder einem anderen Format bleiben) wird sehr geschätzt.
Können Sie den Fehler gemeinsam haben, erhalten Sie, wenn Sie Verwenden Sie '.mode (append)'. –
Der Fehler, den ich bekomme, ist dies: verursacht durch: java.io.IOException: Datei existiert bereits:/tracking/v4/010316/gif = a/partnerID = 111/Jahr = 2016/Monat = 1/Tag = 3/Teil -r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet Ich denke, dass dieser Fehler aufgrund einer Nichtübereinstimmung der Aufgabenplanung ausgelöst wird, wenn einige der Schreibvorgänge eine lange Zeit benötigen. – Saman