2017-02-21 2 views
1

Ich habe folgendes Funke einfaches Beispiel:auf Funken, einige Operationen ausgeführt werden, bevor eine Aktion definiert ist?

#1 val lines: RDD[String] = sc.textFile("/data/non_existing_file.txt") 
#2 val words: RDD[String] = lines.flatMap(line => line.split(" ")) 
#3 val pairs: RDD[(String, Int)] = words.map(word => (word, 1)) 
#4 val counts: RDD[(String, Int)] = pairs.reduceByKey(_ + _) 
#5 counts.saveAsTextFile("/tmp/result") 

Wenn ich das Programm laufen lasse, erhalte ich eine Ausnahme Input path does not exist: file:/data/non_existing_file.txt", wie erwartet.

Was ist Ackward ist, dass ich diese Ausnahme auf Linie # 4 bekomme. Ich verstehe, dass ich diesen Fehler in Zeile # 1, # 2 und # 3 nicht bekomme, weil die Berechnung noch nicht ausgeführt wird. Die Berechnung wird nur in Zeile 5 ausgeführt, wenn ich die Aktion habe, das Ergebnis in eine Datei zu schreiben. Also, warum bekomme ich eine Ausnahme in Zeile 4 statt Zeile 5?

Antwort

1

Dies geschieht unter zwei Bedingungen:

  • spark.default.parallelism nicht gesetzt ist.
  • Sie stellen weder Partitioner noch die Anzahl der Partitionen für reduceByKey

In diesem Fall reduceByKey eifrig neue HashPartitioner mit der Anzahl der Partitionen gleich die Anzahl der Partitionen der Mutter RDD erzeugt. Um die Nummer der Partition zu erhalten, muss sie Input-Splits berechnen. Dies erfordert das Vorhandensein einer Datei im Eingabedateipfad, die anscheinend fehlt, daher der Fehler.

Der tatsächliche reduceByKey Vorgang wird nur nach einem Aktionsaufruf durchgeführt.

Dies ist ein sehr ähnliches Problem zu Why does sortBy transformation trigger a Spark job?

Verwandte Themen