2016-11-07 5 views
6

Ich habe eine Reihe von Dateien. Der Pfad zu den Dateien wird in einer Datei gespeichert, zB "all_files.txt". Mit apache spark muss ich alle Dateien bearbeiten und die Ergebnisse zusammenführen.PySpark Throwing error Methode __getnegargs __ ([]) existiert nicht

Die Schritte, die ich tun möchte, sind:

  • eine RDD Erstellen von „all_files.txt“
  • Für jede Zeile in „all_files.txt“ Lesen (Jede Zeile ist ein Pfad zu einem gewissen Datei), lesen den Inhalt jeder der Dateien in einer einzigen RDD
  • Dann werden alle Inhalte einer Operation tun

Dies ist der Code, den ich für den gleichen schrieb:

def return_contents_from_file (file_name): 
    return spark.read.text(file_name).rdd.map(lambda r: r[0]) 

def run_spark(): 
    file_name = 'path_to_file' 

    spark = SparkSession \ 
     .builder \ 
     .appName("PythonWordCount") \ 
     .getOrCreate() 

    counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file 
     .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files 
     .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files 

Dies wirft den Fehler:

line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o25.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

Kann mir bitte jemand sagen, was ich falsch mache und wie ich sollte weiter gehen. Danke im Voraus.

Antwort

10

Verwenden spark innerhalb flatMap oder eine Umwandlung, die auf Executors auftritt, ist nicht zulässig (spark Sitzung ist nur für Treiber verfügbar). Es ist auch nicht möglich, RDD von RDDs zu erstellen (siehe: Is it possible to create nested RDDs in Apache Spark?)

Aber man kann diese Transformation auf eine andere Weise erreichen - alle Inhalte von all_files.txt in Datenrahmen lesen, verwenden lokalemap ihnen Datenrahmen zu machen und lokalenreduce zu Union alle, siehe Beispiel:

>>> filenames = spark.read.text('all_files.txt').collect() 
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames) 
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes) 
+0

Vielen Dank für Ihre Antwort. Aber wie parallelisiere ich den gesamten Prozess? würde nicht zuordnen (Lambda r: spark.read.text (r [0]), Dateinamen) den gesamten Prozess serialisieren? – UnderWood

+1

Der Prozess des Lesens von Dateien läuft parallel, der einzige serialisierte Teil erstellt einen Ausführungsplan. Versuch es! – Mariusz

Verwandte Themen