Ich bin neu in der Spark-Framework und arbeiten an einigen (~) kleinen Aufgaben auf meinem lokalen Computer zu üben. Meine Aufgabe war folgende: Ich habe 365 gezippte CSV-Dateien in S3 gespeichert, die tägliche Protokolle enthalten. Ich nehme an, einen Datenrahmen des ganzen Jahres zu konstruieren. Meine Methode bestand darin, die Schlüssel aus dem Bucket abzurufen, tägliche Datenrahmen zu erstellen, sie zu Monatsdatenrahmen zu vereinheitlichen, dasselbe für sie zu tun und dafür einen Datenrahmen für ein ganzes Jahr zu erhalten.Fehler beim Anwenden von Aktionen auf Spark Dataframe
Das funktionierte bei einigen Beispieldaten, die ich zum Testen abgerufen habe. Bevor ich die DataFrames erstellte, entpackte ich die Dateien, schrieb die unkomprimierte CSV-Datei auf die Festplatte und benutzte sie zum Erstellen des DataFrames.
Das Problem: Wenn ich die csv-Datei von der Festplatte löschen (make das temporäre), nachdem ich den Datenrahmen erstellt habe, kann ich keine Aktionen auf dem Dataframe durchführen (year_df.count() zum Beispiel). die Spark.exception wirft:
"Job aufgrund Stufe Ausfall abgebrochen: .... java.io.FileNotFoundException: Datei .csv existiert nicht"
Nach einiger Suche über SO, Ich habe herausgefunden, dass der Grund MetaData sein kann, die Spark verwendet, wenn SQL-Abfragen über die DataFrames (External Table not getting updated from parquet files written by spark streaming) angewendet werden. Ich habe die
spark.sql.parquet.cacheMetadata
von spark = SparkSession.builder.config("spark.sql.parquet.cacheMetadata", "false").getOrCreate()
läuft. Sichergestellt, dass die spark.conf.get("spark.sql.parquet.cacheMetadata")
falsch zurückgegeben wurde.
Konnte keine Lösung finden. Natürlich, das Entpacken aller Dateien in S3 wird funktionieren, aber das ist nicht nützlich für mich ..
Vielen Dank!
Hallo, vielen Dank für Ihre Antwort. Wenn ich also richtig verstanden habe, kann ich die Quelldatei sicher entfernen (my.csv in diesem Fall) nur, nachdem ich die Abfragen, die mich interessieren, zwischengespeichert und eine Aktion aufgerufen habe, um sie zu canculieren. Also, ich habe 2 Möglichkeiten: Entweder speichern Sie die unkomprimierten CSV-Dateien in meine S3 für den konstanten Zugriff, oder lesen Sie direkt die gezippten Dateien zu funken, und erstellen Sie DataFrames nach einigen Transformationen. Richtig? – Mike
@Mike Flow: lesen -> Cache -> etwas Aktion -> Datei löschen sollte in Ordnung sein - es sei denn, Sie entdecken ungewöhnliches Spark-Verhalten;) Sie sind richtig, ich würde Dateien in S3 behalten, bis die Verarbeitung abgeschlossen ist - im Falle eines Fehlers Sie haben die Möglichkeit, Dateien erneut zu verarbeiten –