2015-03-13 4 views
6

ich den nächsten Code verwenden:Wie kann ich Spark Ergebnisdateien ohne Repartition und CopyMerge zusammenführen?

csv.saveAsTextFile(pathToResults, classOf[GzipCodec]) 

pathToResults Verzeichnis viele Dateien wie Teil 0000, Teil-0001 usw. I() kann verwendet werden FileUtil.copyMerge hat, aber es ist wirklich langsam, dann ist es herunterladen alle auf Dateien Treiberprogramm und lade sie dann in hadoop hoch. Aber FileUtil.copyMerge() schneller als:

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec]) 

Wie kann ich fusionieren Funkenergebnisdateien ohne repartition und FileUtil.copyMerge()?

Antwort

8

Leider gibt es keine andere Option, um eine einzelne Ausgabedatei in Spark zu erhalten. Anstelle repartition(1) können Sie coalesce(1) verwenden, aber mit Parameter 1 würde ihr Verhalten das gleiche sein. Spark würde Ihre Daten in einer einzigen Partition im Speicher sammeln, was zu einem OOM-Fehler führen kann, wenn Ihre Daten zu groß sind. Eine andere Option zum Zusammenführen von Dateien in HDFS könnte darin bestehen, einen einfachen MapReduce-Job (oder Pig Job- oder Hadoop Streaming-Job) zu schreiben, der das gesamte Verzeichnis als Eingabe erhält und mit einem einzelnen Reducer eine einzige Ausgabedatei generiert. Beachten Sie jedoch, dass bei der MapReduce-Methode alle Daten zuerst in das lokale Dateisystem des Reducers kopiert werden, was zu einem "out of space" -Fehler führen kann.

Hier sind einige nützliche Links zum gleichen Thema:

0

hatte genau die gleiche Frage und hatte pySpark Code (mit Aufrufen schreiben Hadoop API), die copyMerge implementiert:

https://github.com/Tagar/stuff/blob/master/copyMerge.py

Leider wird copyMerge als eigenständiger Hadoop-API-Aufruf veraltet und in Hadoop 3.0 entfernt. Diese Implementierung hängt also nicht von Hadoop copyMerge ab (es wird erneut implementiert).

Verwandte Themen