2017-08-17 1 views
1

nimmt wir die folgende Stufe DAG laufen und lange Shuffle Lesezeit für relativ kleine Shuffle Datengrößen (ca. 19MB pro Aufgabe)Funken Shuffle Lese viel Zeit für kleine Daten

enter image description here

Ein interessanter Aspekt erleben, ist Die wartenden Aufgaben in jedem Executor/Server haben die gleiche Shuffle-Lesezeit. Hier ist ein Beispiel, was es bedeutet: Für den folgenden Server wartet eine Gruppe von Aufgaben etwa 7,7 Minuten und eine andere wartet etwa 26 Sekunden.

enter image description here

Hier ist ein weiteres Beispiel aus der gleichen Stufe laufen. Die Abbildung zeigt 3 Executoren/Server mit jeweils einheitlichen Aufgabengruppen mit gleicher Shuffle-Read-Zeit. Die blaue Gruppe repräsentiert Killed Aufgaben aufgrund der spekulativen Ausführung:

enter image description here

Nicht alle Vollstrecker sind so. Es gibt einige, die alle ihre Aufgaben innerhalb von Sekunden ziemlich gleichmäßig abschließen, und die Größe der Fernlesedaten für diese Aufgaben ist die gleiche wie für diejenigen, die lange Zeit auf anderen Servern warten. Außerdem läuft diese Art von Stufe 2 Mal innerhalb unserer Anwendungslaufzeit. Die Server/Executoren, die diese Gruppen von Tasks mit großer Shuffle-Lesezeit erzeugen, unterscheiden sich in jedem Ausführungsschritt. Hier

ist ein Beispiel für Aufgabe der Statistiktabelle für eine der severs/hosts:

enter image description here

Es sieht aus wie der Code für dieses DAG ist folgende:

output.write.parquet("output.parquet") 
comparison.write.parquet("comparison.parquet") 
output.union(comparison).write.parquet("output_comparison.parquet") 
val comparison = data.union(output).except(data.intersect(output)).cache() 
comparison.filter(_.abc != "M").count() 

Wir Ich würde deine Gedanken dazu sehr schätzen.

+1

Seltsam. Code und Datenproben würden geschätzt werden. Ich sehe, dass jeder Schritt dieser DAG einen Cache-Aufruf hat, speicherst du alles? – Garren

+0

Hallo. Danke, für ihre Frage. Ich habe den Code in der obigen Beschreibung veröffentlicht. Wir cachen nur, wenn wir es für nötig halten. – Dimon

+0

Die Ausnahmen und Intersect-Anrufe sind auf meinem Radar für Bedenken. Deine DAG verweist auf einen Sortierergejoin; Weißt du schon, welche Linie (n) den Ärger verursacht? – Garren

Antwort

0

Anscheinend war das Problem JVM Garbage Collection (GC). Die Aufgaben mussten warten, bis GC auf den Remote-Executoren ausgeführt wurde. Die äquivalente Shuffle-Lesezeit ergab sich aus der Tatsache, dass auf einem einzelnen Remote-Host, der GC ausführte, mehrere Aufgaben warten mussten. Wir folgten dem Hinweis here und das Problem verringerte sich um eine Größenordnung. Es gibt immer noch eine kleine Korrelation zwischen der GC-Zeit auf Remote-Hosts und der lokalen Shuffle-Lesezeit. In Zukunft werden wir Shuffle Service ausprobieren.