2016-10-12 4 views
1

Ich führe eine Spark-Submit-Anwendung auf einem AWS EMR-Cluster (EMR 5.0.0, Spark 2.0.0, 30 r3.4xlarge). Um das Skript zu starten, ich in den Master-Knoten SSH, dann führen Sie den folgenden Befehl ein:Verhindern von SparkListenerBus-Fehlern

time spark-submit --conf spark.sql.shuffle.partitions=5000 \ 
--conf spark.memory.storageFraction=0.3 --conf spark.memory.fraction=0.95 \ 
--executor-memory 8G --driver-memory 10G dataframe_script.py 

Die Anwendung verwendet die Standard AWS Funkenkonfiguration, die spark.master hat = Garn und bereitstellen-mode = Client.

Die Anwendung lädt ~ 220 GB Daten, macht SQL-ähnliche Aggregationen, schreibt dann auf s3. Die geschriebenen Daten sehen so aus, als ob sie korrekt verarbeitet wurden. Während der Code ausgeführt wird, sehe ich einen Fehler messaage, aber der Code wird weiterhin ausgeführt:

ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. 

Nachdem die Anwendung Schreiben erfolgt ist, wird die Anwendung nicht für> 10 Minuten, um die Befehlszeile zurückzukehren, aa aussendet Warnung:

WARN ExecutorAllocationManager: No stages are running, but numRunningTasks != 0 

dann Zehntausende von Zeilen mit der Fehlermeldung:

16/10/12 00:40:03 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(176,WrappedArray()) 

der Fortschrittsbalken auch weiterhin zwischen den Fehlermeldungen zu bewegen, zum Beispiel:

[Stage 17:=================================================> (465 + 35)/500] 

My-Code für den Schreib- und Ende des Hauptschritt:

def main(): 
    # some processing 
    df.select(selection_list).write.json('s3path', compression=codec) 
    print 'Done saving, shutting down' 
    sc.stop() 

Es gibt eine previous StackOverflow question, die this JIRA bezieht. Es sieht so aus, als ob es eine Reparatur für ältere Versionen von Spark gab, aber ich verstehe nicht ganz, was das Problem war.

Wie vermeide ich diese Fehlermeldungen?

Antwort

0

Ich glaube, ich habe das Problem gefunden. In meinem Spark-Skript initiiere ich den SparkContext außerhalb der main() - Funktion, stoppe ihn jedoch innerhalb der Hauptfunktion. Dies führt zu Problemen, wenn das Skript beendet wird und versucht, den SparkContext ein zweites Mal zu schließen. Durch das Verschieben der SparkContext-Initialisierung in die Hauptfunktion gingen die meisten dieser Fehler verloren.