2016-03-28 12 views
3

Ich habe einige Code, der einige Schritte ausführt, und ich weiß, wie lange der gesamte Prozess dauert. Ich möchte jedoch gerne berechnen können, wie lange jede einzelne Transformation dauert. Hier sind einige einfache Beispiele für die Schritte:Wie kann man eine Transformation in Spark zeitlich verzögern?

rdd1 = sc.textFile("my/filepath/*").map(lambda x: x.split(",")) 
rdd2 = sc.textFile("other/filepath/*").map(lambda x: x.split(",")) 
to_kv1 = rdd1.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric 
to_kv2 = rdd2.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric 
reduced1 = to_kv1.reduceByKey(lambda a, b: a+b) 
reduced2 = to_kv1.reduceByKey(lambda a, b: a+b) 
outer_joined = reduced1.fullOuterJoin(reduced2) # let's just assume there is key overlap 
outer_joined.saveAsTextFile("my_output") 

Jetzt: Wie bench ich einen bestimmten Teil dieses Codes? Ich weiß, dass das Ausführen von Ende zu Ende eine gewisse Zeit in Anspruch nimmt (die saveAsTextFile wird es zur Ausführung zwingen), aber wie benchmarkiere ich nur den reduceByKey oder Teil des Codes? Ich weiß, dass ich count() nach jeder Operation ausführen konnte, um Ausführung zu erzwingen, aber das würde die Operation nicht ordnungsgemäß benchmarken, da es die Zeit hinzufügt, die benötigt wird, um die count sowie die Zeit auszuführen, die Transformation auszuführen.

Was ist der beste Weg, um Spark-Transformationen zu bewerten, angesichts ihrer faulen Ausführung?

Bitte beachten Sie, dass ich nicht frage, wie man Zeit misst. Ich weiß über das time Modul, start = time.time(), etc. Ich frage, wie Benchmark angesichts der faulen Ausführung Stil von Spark-Transformationen, die nicht ausgeführt werden, bis Sie eine Aktion aufrufen, die Informationen an den Treiber zurückgegeben werden benötigt.

+0

Sie schreiben "Ich weiß über das Zeitmodul, start = time.time(), etc." was genau meinst du damit? Ich suche nach einer Möglichkeit, eine einzelne Transformation/eine Reihe von Transformationen zeitlich festzulegen. Vielen Dank! –

+0

Wenn Sie eine Reihe von Ausführungen zeitlich abmessen wollen, die mit einer Aktion enden, die die Ausführung erzwingt, können Sie 'start = time.time()' zu Beginn, 'elapsed = time.time() - start' am Ende danach einfügen der Aktionsaufruf, und dann schauen, was "verstrichen" enthält. –

Antwort

4

Am besten verwenden Sie die Spark UI, um diese Informationen zu lesen. Das Problem ist zweifach:

  • Die Berechnungen so auch verteilt werden, wenn Sie einen Zeitmechanismus innerhalb jedem der Transformationen hinzugefügt, wäre es etwas schwierig sein, zu sagen, wenn die Aufgabe wirklich könnte getan wird, wie es in getan werden eine Maschine, aber keine andere. Das heißt, Sie könnten die Protokollierung hinzufügen und die erste Instanz der Ausführung finden und dann die endgültige Ausführung finden. Beachten Sie jedoch den nächsten Punkt
  • Die Transformationen sind so weit wie möglich in Pipelines. Daher wird Spark mehrere Transformationen zur gleichen Zeit zur gleichen Zeit ausführen, sodass Sie JUST genau diese eine Aktion explizit testen müssen.
+0

Ich denke, der zweite Punkt ist hier sehr wertvoll. Ich hatte nicht bemerkt, dass Spark mehrere Transformationen gleichzeitig ausführt. Mein Ziel ist relativ: Welche Operationen dauern am längsten? Selbst wenn ich sie nicht genau zeitlich abmessen kann (meine ideale Situation), würde es mir eine relative Vorstellung davon geben, welche lange dauern, dann kann ich mich darauf konzentrieren, diese Aktionen effizienter zu machen (wenn möglich). –

Verwandte Themen