2016-03-19 15 views
2

Ich habe einige unerwartete Verhalten bei der Verwendung von Spark reduce Funktion mit java.lang.Math.max aufgetreten. Hier ist Beispielcode:Apache Spark Reduzieren mit java.lang.Math.max unerwartetes Verhalten

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator)); 
System.out.println(populationWithFitness.values().collect().toString()); 
long currentMaxFitness = populationWithFitness.values().reduce(Math::max); 
System.out.println("After Reduce: " + currentMaxFitness); 

-Code oben mehrmals aufgerufen wird und es die meiste Zeit zu unerwarteten Ergebnis wie folgt aus:

[-2754285, -2535458, -2626449, -3182283] //printed RDD after collect 
After Reduce: -2392513 //value produced by reducer 

Wie Sie sehen können Minderer Wert erzeugt -2392513 jedoch ist dieser Wert nicht einmal in der RDD beim Vergleich mit gedruckten Werten von RDD. Warum ist es? Beeinflusst collect()reduce()? Ich habe es auch anders herum versucht, zuerst RDD zu reduzieren und dann zu sammeln und ich beobachte immer noch dieses seltsame Verhalten. Ich dachte, dass die Weitergabe der statischen Methode aus java.Math-Bibliothek Probleme beim Serialisieren verursachen könnte, aber unter Bezugnahme auf diese Spark Quick Start Tutorial verwenden sie auch Math.max in reducer und anscheinend soll es funktionieren.

Irgendwelche Ideen?

Danke

EDIT:

Zusätzliche Informationen: Dieser Code-Schnipsel Teil eines größeren Programms ist es, die in jeder Iteration mehrere Iterationen und es heißt hat. Erste Iteration erzeugt korrekte Ergebnis, wo maxValue von reducer produziert richtigen Wert ist, aber alle anderen Iterationen produzieren seltsame Ergebnisse

EDIT2:

Wenn ich drucken populationWithFitness.values().collect().toString() dreimal in einer Reihe wie folgt aus:

JavaPairRDD<Island, Long> populationWithFitness = parallelizedIslandPop.mapToPair(isl -> evaluateFitness(isl, fitnessCalculator)); 
System.out.println(populationWithFitness.values().collect().toString()); 
System.out.println(populationWithFitness.values().collect().toString()); 
System.out.println(populationWithFitness.values().collect().toString()); 
long currentMaxFitness = populationWithFitness.values().reduce(Math::max); 
System.out.println("After Reduce: " + currentMaxFitness); 

Ich bekomme Ausgabe, die so aussieht:

Generation 1 
[-3187591, -3984035, -3508984, -3054649] 
[-3187591, -3984035, -3508984, -3054649] 
[-3187591, -3984035, -3508984, -3054649] 
After Reduce: -3054649 
Generation 2 
[-3084310, -3931687, -3508984, -3054649] 
[-3084310, -3847178, -3508984, -2701881] 
[-3148206, -3984035, -2806859, -2989184] 
After Reduce: -2949478 
Generation 3 
[-3187591, -3984035, -3696853, -3054649] 
[-3187591, -3984035, -3178920, -3015411] 
[-3148206, -3804759, -3657984, -2701881] 
After Reduce: -2710313 
Generation 4 
[-3187591, -2982220, -3310753, -3054649] 
[-3148206, -2985628, -3657984, -2701881] 
[-3148206, -2706580, -3451228, -2989184] 
After Reduce: -2692651 
. 
. 
. 

Wie Sie in fi sehen können Erste Iteration alles funktioniert gut, aber in allen folgenden Iterationen produziert es seltsame Ausgabe. Ich denke, das Problem ist, dass es etwas mit faul Bewertung zu tun hat und wenn ich collect es ist nicht Transformation angewendet hat, aber ich bin mir nicht sicher.

Ich habe auch versucht reduce(Math::max) mit JavaDoubleRDD und max auf dieser JavaDoubleRDD aber Ergebnis war das gleiche genannt zu ersetzen:

JavaDoubleRDD stats = populationWithFitness.mapToDouble(tup -> tup._2()); 
long currentMaxFitness = stats.max().longValue(); 

Ein weiterer wichtiger Punkt, den ich testen werde diesen Code in lokalen Modus mit Parametern ausgeführt wird:

spark --class "main.TravellingSalesmanMain" --master local[4] exampletravellingsalesman-1.0-SNAPSHOT.jar > sparkoutput.txt 
+0

Ist das wirklich der eigentliche Code? –

+0

Ja, außer dass 'sampleRdd', auf das ich' map() 'anwende, eine Sammlung von Objekten ist, die nach dem Mapping' Long'-Werte erzeugen. Andere Teile sind Schnipsel aus meinem Code mit ersetzten Variablennamen zur leichteren Bezugnahme. Bereitgestellte Beispielausgabe ist auch die tatsächliche Ausgabe, die ich bekomme. – MichaelDD

+0

Was ist die Quelle Ihrer RDD? –

Antwort

1

Dies ist höchstwahrscheinlich (99%) irgendwo innerhalb von evaluateFitness(isl, fitnessCalculator) passiert. Es scheint, dass es eine Art nicht reproduzierbare Quelle verwendet und daher Ergebnisse zurücksendet, die unterschiedliche Läufe sind. Denken Sie daran, dass Spark faul ist und die Ausführung bei jeder folgenden Aktion erneut ausgeführt wird. Sie können Caching verwenden, um dies zu unterstützen, selbst das kann jedoch fehlschlagen (Knoten fehlschlägt/Daten fallen nicht mehr aus dem Cache). Sie sollten hier am besten Checkpointing verwenden, aber Sie sollten auch die Ausführung selbst so ändern, dass sie idempotent ist.

+0

Vielen Dank. 'Cache' funktioniert für mich. Sie haben Recht mit der nichtdeterministischen 'map'-Funktion, aber es war nicht' evaluateFitness', sondern die nachfolgende 'selection'- und' crossover'-Funktion (ich vermute, Sie schlossen aus der Variablenbenennung Ich spreche von genetischen Algorithmen), die einen Zufallsfaktor benötigen um zu arbeiten, kann ich das Implementierungsbit nicht ändern. Ich kette "Map" -Phasen und rufe 'reduce' nur für maximale Fitness und für die endgültige Population auf, um die beste Lösung zu erhalten. Ich habe einige Forschungsarbeiten verfolgt, bei denen sie es auf Hadoop implementiert haben, aber Spark lazy eval macht es etwas komplizierter. – MichaelDD