2017-01-09 4 views
0

Ich bin neu in der Spark-Framework und arbeiten an einigen (~) kleinen Aufgaben auf meinem lokalen Computer zu üben. Meine Aufgabe war folgende: Ich habe 365 gezippte CSV-Dateien in S3 gespeichert, die tägliche Protokolle enthalten. Ich nehme an, einen Datenrahmen des ganzen Jahres zu konstruieren. Meine Methode bestand darin, die Schlüssel aus dem Bucket abzurufen, tägliche Datenrahmen zu erstellen, sie zu Monatsdatenrahmen zu vereinheitlichen, dasselbe für sie zu tun und dafür einen Datenrahmen für ein ganzes Jahr zu erhalten.Fehler beim Anwenden von Aktionen auf Spark Dataframe

Das funktionierte bei einigen Beispieldaten, die ich zum Testen abgerufen habe. Bevor ich die DataFrames erstellte, entpackte ich die Dateien, schrieb die unkomprimierte CSV-Datei auf die Festplatte und benutzte sie zum Erstellen des DataFrames.

Das Problem: Wenn ich die csv-Datei von der Festplatte löschen (make das temporäre), nachdem ich den Datenrahmen erstellt habe, kann ich keine Aktionen auf dem Dataframe durchführen (year_df.count() zum Beispiel). die Spark.exception wirft:

"Job aufgrund Stufe Ausfall abgebrochen: .... java.io.FileNotFoundException: Datei .csv existiert nicht"

Nach einiger Suche über SO, Ich habe herausgefunden, dass der Grund MetaData sein kann, die Spark verwendet, wenn SQL-Abfragen über die DataFrames (External Table not getting updated from parquet files written by spark streaming) angewendet werden. Ich habe die

spark.sql.parquet.cacheMetadata

von spark = SparkSession.builder.config("spark.sql.parquet.cacheMetadata", "false").getOrCreate() läuft. Sichergestellt, dass die spark.conf.get("spark.sql.parquet.cacheMetadata") falsch zurückgegeben wurde.

Konnte keine Lösung finden. Natürlich, das Entpacken aller Dateien in S3 wird funktionieren, aber das ist nicht nützlich für mich ..

Vielen Dank!

Antwort

0

Spark führt Aktionen auf faule Weise aus.

Das bedeutet, wenn Sie einige Transformationen durchführen können, aber Datei wird nur gelesen, wenn Sie eine Aktion aufrufen.

Es ist in der gleichen Weise wie in RDDs auf Datasets arbeiten, wie Datensätze von RDDs

Erwägen Code gesichert werden:

val df = sqlContext.read // read file 
val query = df.groupBy('x).count() 

query.show() // here the data will be read 

Also, wenn Sie Datei löschen, bevor sie gelesen werden würde, dann funkt eine Ausnahme auslösen Sie können das Lesen erzwingen, indem Sie eine Aktion ausführen, d. H. take oder show. Es wäre im Cache gespeichert werden, wenn Sie cache() tun:

val df = sqlContext.read // read file 
val query = df.groupBy('x).count().cache() 

query.show() // here the data will be read and cached 
+0

Hallo, vielen Dank für Ihre Antwort. Wenn ich also richtig verstanden habe, kann ich die Quelldatei sicher entfernen (my.csv in diesem Fall) nur, nachdem ich die Abfragen, die mich interessieren, zwischengespeichert und eine Aktion aufgerufen habe, um sie zu canculieren. Also, ich habe 2 Möglichkeiten: Entweder speichern Sie die unkomprimierten CSV-Dateien in meine S3 für den konstanten Zugriff, oder lesen Sie direkt die gezippten Dateien zu funken, und erstellen Sie DataFrames nach einigen Transformationen. Richtig? – Mike

+0

@Mike Flow: lesen -> Cache -> etwas Aktion -> Datei löschen sollte in Ordnung sein - es sei denn, Sie entdecken ungewöhnliches Spark-Verhalten;) Sie sind richtig, ich würde Dateien in S3 behalten, bis die Verarbeitung abgeschlossen ist - im Falle eines Fehlers Sie haben die Möglichkeit, Dateien erneut zu verarbeiten –

0

Cache() ist nach wie vor nur ein Hauch; Spark muss die Werte möglicherweise neu berechnen, wenn ein Fehler auftritt oder nur die zwischengespeicherten Daten aufgrund des Cachedrucks verworfen werden. Wenn Sie die Quelldaten löschen möchten, stellen Sie sicher, dass Sie Ihre Ergebnisse geschrieben haben und die Daten nicht mehr benötigt werden.

Ich würde empfehlen, CSV in ein beliebiges Spaltenformat (ORC, Parkett) & mit Snappy komprimieren. Viel effizienter für die Verarbeitung, vor allem mit Prädikat drücken

Verwandte Themen