2017-02-02 2 views
0

Ich führe 3 Spark-Streaming-Prozesse zur gleichen Zeit auf EMR-Cluster von Amazon. Das Problem ist, dass eine dieser drei Spark-Streaming Jobs die Verarbeitung auf toLocalIterator basiert tut:Ausführung von drei parallelen Spark-Streaming-Jobs

dstreamdata.foreachRDD(entry => { 
     entry.toLocalIterator.foreach 

Ich habe bemerkt, dass es klemmt (sieht aus wie es Ressourcen oder so fehlt), aber es gibt nicht einen Fehler, macht einfach keine Datenverarbeitung.

Ich verwende die folloiwng Parameter spark-submit für jeden Job:

spark-submit --deploy-mode cluster --executor-cores 6 --executor-memory 10g --num-executors 2 --conf spark.yarn.submit.waitAppCompletion=false --queue queue_name_of_spark_job 

Jede Idee, wie dieses Problem zu beheben, ohne den Code zu ändern?

Antwort

1

1.1) Wenn Sie Kinesis als Warteschlange verwenden, stellen Sie sicher, dass Sie doppelt so viele Executor-Cores wie Kinesis-Shards haben. Das mag auf Kafka zutreffen, ich vergesse, wie der Kafka-Konnektor funktioniert. Dies liegt daran, dass der Connector pro Shard einen Core verbraucht. Sie müssen also sicherstellen, dass Sie über Executor-Kerne verfügen, um die Daten tatsächlich zu verarbeiten.

In der Vergangenheit habe ich einen Executor pro Kinesis-Shard verwendet, wobei jeder Executor zwei oder mehr Kerne hatte, was in meinen Anwendungsfällen gut funktionierte.

1.2) Im Moment holt Ihr Code alle Daten zurück zum Treiber als Iterator. Wenn Sie viele Daten haben, müssen Sie möglicherweise dem Treiber mehr Ressourcen zuweisen, damit er alle Daten in der RDD verarbeiten kann. Das fühlt sich ein bisschen falsch an: - Wenn Sie alle Daten in einer Instanz anpassen können, brauchen Sie nicht wirklich die Komplexität von Spark!

Spark 2.0.x Configuration enthält die Details der verfügbaren Konfiguration.

Ich empfehle am Anfang driver.cores und/oder driver.memory zu betrachten. Ich vermute, du brauchst mehr Kerne, aber du musst experimentieren.

2) Ich schätze, dass Sie den Code nicht ändern möchten, aber ... Wenn möglich, können Sie entry.foreachPartition() verwenden.

Dieser Ansatz vermeidet die Leistungsprobleme bei der Verarbeitung aller Daten im Treiberprozess. Es oder eine Variation der Logik sollte Ihnen helfen, Ihr Problem zu lösen, abhängig von Ihrem genauen Anwendungsfall.

Hier ist ein Beispielcode mit einem Link für weitere Informationen:

dstream.foreachRDD { rdd => 
    // code here is executed by the driver 
    rdd.foreachPartition { partitionOfRecords => 
    // code here is executed by the workers per partition 
    } 
} 

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

+0

ich Kafka bin mit und jeder der drei Funken liest Streaming Aufträge aus verschiedenen Kafka Warteschlange. Kann das ein Grund sein? – Dinosaurius

+0

Es ist möglich, aber ich weiß nicht, wie der Kafka-Connector Ressourcen verbraucht. Aber Sie müssen die Protokolle untersuchen. Es könnte sein, dass die Testamentsvollstrecker von Ressourcen oder dem Fahrer ausgehungert werden. Versuchen Sie herauszufinden, was - es könnte entweder sein. – ImDarrenG

Verwandte Themen