2017-05-11 4 views
1

Ich bin es, in Python zu programmieren. Meine Firma hat jetzt einen Hadoop Cluster mit Jupyter installiert. Bisher habe ich Spark/Pyspark nie benutzt.Speichern mehrerer Elemente in HDFS mit (Spark, Python, Pyspark, Jupiter)

Ich bin in der Lage Dateien von HDFS so einfach wie diese zu laden:

text_file = sc.textFile("/user/myname/student_grades.txt") 

Und ich bin der Lage, Ausgabe wie folgt zu schreiben:

text_file.saveAsTextFile("/user/myname/student_grades2.txt") 

Die Sache Ich versuche zu erreichen, Verwenden Sie eine einfache "for-Schleife", um Textdateien einzeln zu lesen und den Inhalt in eine HDFS-Datei zu schreiben. Also habe ich versucht, dies:

list = ['text1.txt', 'text2.txt', 'text3.txt', 'text4.txt'] 

for i in list: 
    text_file = sc.textFile("/user/myname/" + i) 
    text_file.saveAsTextFile("/user/myname/all.txt") 

So funktioniert dies für das erste Element der Liste, aber dann gibt mir diese Fehlermeldung:

Py4JJavaError: An error occurred while calling o714.saveAsTextFile. 
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
XXXXXXXX/user/myname/all.txt already exists 

Um Verwirrung zu vermeiden I "verwischten" -out IP Adresse mit XXXXXXXX.


Was ist der richtige Weg? Ich werde Tonnen von Datensätzen haben (wie 'text1', 'text2' ...) und möchte eine Python-Funktion mit jedem von ihnen durchführen, bevor Sie sie in HDFS speichern. Aber ich möchte die Ergebnisse alle zusammen in "einer" Ausgabedatei haben.

Vielen Dank!
MG

EDIT: Es scheint so, dass mein endgültiges Ziel nicht wirklich klar war. Ich muss eine Funktion für jede Textdatei separat anwenden und dann möchte ich die Ausgabe an das vorhandene Ausgabeverzeichnis anhängen. Etwas wie folgt aus:

for i in list: 
    text_file = sc.textFile("/user/myname/" + i) 
    text_file = really_cool_python_function(text_file) 
    text_file.saveAsTextFile("/user/myname/all.txt") 

Antwort

0

Sie mehrere Dateien lesen und speichern Sie sie durch

textfile = sc.textFile(','.join(['/user/myname/'+f for f in list])) 
textfile.saveAsTextFile('/user/myname/all') 

Sie alle Teildateien innerhalb Ausgabeverzeichnis erhalten.

+0

Es scheint so, dass mein Endziel nicht wirklich klar war. Ich muss eine Funktion für jede Textdatei separat anwenden und dann möchte ich die Ausgabe an das vorhandene Ausgabeverzeichnis anhängen. Siehe EDIT – mgruber

+0

gleiche Funktion für alle Textdateien? – Suresh

+0

Ja die gleiche Funktion für alle Dateien, aber ich kann nicht die Textdateien zuvor, da jede Datei separat behandelt werden muss – mgruber

1

Ich wollte dies als Kommentar posten, konnte es aber nicht tun, da ich nicht genug Ansehen habe.

Sie müssen Ihre RDD in Datenframe konvertieren und dann im Append-Modus schreiben. So konvertieren RDD zu Datenrahmen schauen Sie bitte in dieser Antwort:
https://stackoverflow.com/a/39705464/3287419
oder diesen Link http://spark.apache.org/docs/latest/sql-programming-guide.html
zu Datenrahmen in Zufügen-Modus speichern unter Link nützlich sein können:
http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes

Fast gleiche Frage ist hier auch Spark: Saving RDD in an already existing path in HDFS. Aber die Antwort ist für scala. Ich hoffe, dass etwas ähnliches in Python auch getan werden kann.

Es gibt noch einen anderen (aber hässlichen) Ansatz. Konvertieren Sie Ihre RDD in eine Zeichenfolge. Lassen Sie die resultierende Zeichenfolge resultString sein. Verwenden Sie den Subprozess, um diese Zeichenfolge an die Zieldatei anzuhängen, d. H.

subprocess.call("echo "+resultString+" | hdfs dfs -appendToFile - <destination>", shell=True) 
0

Wenn der Text-Dateien alle das gleiche Schema haben, könnten Sie Hive verwenden den gesamten Ordner als eine einzige Tabelle zu lesen und direkt, dass die Ausgabe schreiben.