2016-06-03 11 views
1

Ich habe versucht, einen Funke-Job zu bekommen, um für mehrere Tage zum Abschluss zu laufen, und ich konnte es endlich abschließen, aber es gab immer noch eine große Anzahl von fehlgeschlagenen Aufgaben, wo Executors mit getötet wurden die folgende Nachricht:Spark ExecutorLostFailure Speicher überschritten

ExecutorLostFailure (Executor 77 beendet aufgrund einer der ausgeführten Aufgaben) Grund: Container von YARN für das Überschreiten von Speicherlimits getötet. 45,1 GB von 44,9 GB physikalischem Speicher verwendet. Betrachten spark.yarn.executor.memoryOverhead Steigerung

Dies sind die Eigenschaften ich vorbei bin zum Cluster:

[ 
    { 
     "classification": "spark-defaults", 
     "properties": { 
      "spark.executor.memory": "41000m", 
      "spark.driver.memory": "8000m", 
      "spark.executor.cores": "6", 
      "spark.shuffle.service.enabled": "true", 
      "spark.executor.instances": "98", 
      "spark.yarn.executor.memoryOverhead": "5000" 
     } 
    } 
] 

Der Cluster besteht aus 20 Maschinen mit jeweils 32 Kernen und 240G Arbeitsspeicher. Sollte ich einfach weiter die Speicherkapazität erhöhen oder gibt es einen Punkt, an dem es ein tieferes Problem anzeigt. Der Fehler dieses Mal schien während einer Verschmelzung von 5000 Partitionen auf 500 vor dem Schreiben der resultierenden Daten auf S3 aufzutreten. Ich vermute, dass die Koaleszenz einen Shuffle verursacht hat und da der Cluster bereits zu wenig Speicher hatte, hat er ihn zu weit getrieben.

Der Arbeitsablauf ist wie folgt:

  1. Last Parkett- Dateien von s3 in Datenrahmen
  2. Extract Satz von eindeutigen Schlüsseln, die Gruppe, die die Daten unter Verwendung von SQL-Abfrage für Datenrahmen
  3. die Datenrahmen zu einem JavaRDD Transformation und Wenden Sie mehrere Kartenfunktionen an
  4. MapToPair die Daten
  5. combineByKey, das das folgende verwendet, verschmilzt einzelne Objekte im Wesentlichen in Arrays von Objekten nach Schlüssel

    combineByKey (neue Funktion, Funktion hinzufügen, Merge-Funktion, neue HashPartitioner (5000), false, null);

  6. Weitere Karten

  7. Für jede mehrere eindeutige Schlüssel, filtern die rdd nur um Tupel mit diesem Schlüssel dann jede dieser Teilmengen auf der Festplatte bleiben auch nach

Eine andere Frage koaleszierende ist, wie die 44,9 Nummer von oben wird abgeleitet. Ich dachte, der maximale Speicher wäre Executor-Speicher + MemoryOverhead, die 46G nicht 44.9G wäre.

Jede Hilfe wäre sehr dankbar, Nathan

+0

Nathan, du hast eine Antwort geschrieben, hast du nicht gesehen? :) – gsamaras

Antwort

6

Aus meiner Erfahrung zeigt dies af tiefer liegendes Problem und von dem, was Sie gebucht haben sehe ich ein paar Tücken.

Zunächst sollten Sie sich die Partitionsgrößen ansehen, da das OOM leicht durch Datenschräglagen verursacht werden kann, die während der Operationen combineByKey erstellt wurden. Vielleicht sind einige Schlüssel sehr häufig?

Wenn nicht, würde ich auf die coalesce Funktionsaufruf aussehen. Sie haben den Code nicht gepostet, daher kann ich nur vermuten, dass die DAG generiert wird, aber ich würde die Funktion coalesce und die anderen Operationen, die in der gleichen Schreibphase ausgeführt werden, kennen.

Spark führt in Stufen und aus was ich von Ihrer Erklärung erzählen kann, rufen Sie coalesce kurz vor write und so abhängig davon, wie viele Partition Sie in diese letzte Stufe gehen und abhängig von den Transformationen in dieser Phase durchgeführt werden können tatsächlich auf weniger Partitionen als benötigt arbeiten, was zu der OOM-Ausnahme führt.

Es ist ein wenig kompliziert, in Worten zu erklären, aber ich werde versuchen, ein einfaches Beispiel zu geben, was passieren könnte.

Stellen Sie sich das einfache Szenario vor, wo Sie in einer Datei Schlüssel-Wert-Paare von sagen (Int, Double) lesen, wenden Sie dann eine Funktion auf alle Werte wie sagen round. Sie möchten dann die Ausgabe in eine einzelne Datei zurückschreiben, also rufen Sie coalesce(1) gefolgt von write. Der Code würde wie folgt aussehen:

val df = sqlContext.read.parquet("/path/to/my/file/") 
df.map{case(key: Int, value: Double) => (key, round(value)} 
    .toDF() 
    .coalesce(1) 
    .write 
    .parquet("/my/output/path/") 

Nun könnte man denken, dass die map Operation parallel auf dem gesamten Cluster ausgeführt wird, aber wenn man die Aufmerksamkeit auf die Funken ui bezahlen, werden Sie feststellen, dass diese Aufgabe nicht über Ihren Cluster verteilt. Aufgrund der coalesce(1), Spark weiß, dass alles in einer einzigen Partition enden muss, so dass es einfach beginnt, alle Daten in einer Partition zu sammeln, die die map Funktion anwendet, während es geht. Wie Sie sich vorstellen können, kann dies leicht in OOM-Ausnahmen mit einer komplizierteren Transformation enden.

Ich hoffe, das gibt Ihnen ein paar Hinweise, wo Sie suchen. Viel Glück :)

+0

Bravo für die Erwähnung der Partitionen! – gsamaras

Verwandte Themen