2017-08-02 7 views
1

Ich versuche, die Ausführungszeit jeder mapPartition Operation an einem RDD mit einem Code wie folgt (in Scala) zu protokollieren:Apache Funken mapPartition seltsame Verhalten (lazy evaluation?)

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    } 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result 
} 

Das Problem ist, dass es protokolliert die "Partitionszeit" sofort, bevor sie mit der Ausführung der Map-Operation beginnt, so dass ich immer eine Zeit von 2 ms bekomme.

Ich bemerkte es durch das Spark Web UI, in der Protokolldatei erscheint die Zeile bezüglich der Ausführungszeit unmittelbar nach dem Start der Aufgabe, nicht am Ende wie erwartet.

Jemand ist in der Lage, mir zu erklären, warum? Innerhalb der mapPartitions sollte der Code linear ausgeführt werden, oder ich falsch liegen?

Dank

Grüße Luca

+0

Transformationen werden langsam ausgewertet. – philantrovert

+0

Ok, danke! Ich löste das Setzen einer "result.size" vor der endTime. Ich dachte, dass die Karte innerhalb der mapPartitions, die eine Scala-Operation ist, standardmäßig nicht faul war. – Gaglia88

+0

@philantrovert Nein, das ist nicht der Grund, Karte in mapPartitions ist keine Spark-Transformation, das ist reine scala-bezogene –

Antwort

3

partitions innerhalb von mapPartitions ist ein Iterator[Row] und ein Iterator wird träge in Scala ausgewertet (das heißt, wenn der Iterator verbraucht wird). Das hat nichts mit Sparks fauler Bewertung zu tun!

Der Aufruf partitions.size löst die Auswertung Ihres Mappings aus, verbraucht jedoch den Iterator (weil er nur einmal iteriert werden kann). Ein Beispiel

val it = Iterator(1,2,3) 
it.size // 3 
it.isEmpty // true 

Was Sie tun können, ist den Iterator auf einen nicht-faul Sammlung Typen zu konvertieren:

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    }.toVector // now the statements are evaluated 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result.toIterator 
} 

EDIT: Beachten Sie, dass Sie System.currentTimeMillis() (oder sogar System.nanoTime()) anstelle von Calendar verwenden können .