2017-02-03 2 views
3

Ich bin ein Funke auf Garn Cluster mit Pyspark. Ich habe ein Dataset, das das Laden mehrerer Binärdateien pro Schlüssel und das Ausführen einiger Berechnungen erfordert, die schwer in Teile zu zerlegen sind. Daher muss es im Allgemeinen über alle Daten für einen einzelnen Schlüssel arbeiten.Spark + Garn - Skala Speicher mit Eingangsgröße

Momentan setze ich spark.executor.memory und spark.yarn.executor.memoryOverhead auf "gesunde" Werte, die die meiste Zeit arbeiten, aber bestimmte Schlüssel am Ende haben eine viel größere Menge an Daten als der Durchschnitt, und In diesen Fällen reicht der Speicher nicht aus und der Executor wird getötet.

ich momentan tut Sie eine der folgenden: 1) Führen Sie Aufträge mit der Standardspeichereinstellung und erneut ausführen, nur, wenn bestimmte Tasten mit mehr Speichern nicht 2) Wenn ich weiß, eine meines Schlüssel hat viel mehr Daten, kann ich maßstab Speicher für den Job als Ganzes, aber dies hat den Nachteil, dass die Anzahl laufender Container, die ich bekomme/die Anzahl der Jobs, die parallel laufen, drastisch reduziert wird.

Idealerweise hätte ich ein System, bei dem ich einen Job abschicken könnte und den Speicher in einer Executor-Skala mit Eingabegröße hätte, aber ich weiß, das ist kein Funkenmodell. Gibt es irgendwelche zusätzlichen Einstellungen, die mir hier helfen können oder irgendwelche Tricks, um mit diesem Problem umzugehen? Alles offensichtliche, was ich als Fix fehlt?

Antwort

0

Sie können den folgenden Ansatz testen: Setzen Sie Executor Memory und Executor Garnoverhead auf Ihre Maximalwerte und fügen Sie spark.executor.cores mit der Nummer größer als 1 hinzu (beginnend mit 2). Setzen Sie zusätzlich spark.task.maxFailures auf eine große Zahl (sagen wir 10).

Dann auf normalen Schlüsseln wird Funke wahrscheinlich Aufgaben wie üblich beenden, aber einige Partitionen mit größeren Schlüsseln mit Fehler. Sie werden zur Wiederholungsstufe hinzugefügt, und da die Anzahl der zu wiederholenden Partitionen viel niedriger ist als die der anfänglichen Partitionen, verteilt Spark sie gleichmäßig an die Executoren. Wenn die Anzahl der Partitionen niedriger oder die gleiche Anzahl an Executoren ist, hat jede Partition doppelt so viel Speicher wie die ursprüngliche Ausführung und kann erfolgreich sein.

Lassen Sie mich wissen, ob es für Sie funktioniert.

+0

Das funktioniert zwar, aber es ist immer noch halb flockig auf lange laufende Jobs und einen Teil der Zeit habe ich immer noch mit vollem Job-Fehler (auch mit einem sehr hohen maxFailures [> 30-50]). Aber immer noch viel besser als vorher. Vielen Dank – user2721897

Verwandte Themen