2016-04-12 4 views
1

ich Ihre Hilfe und Beratung benötigen für Apache Spark KafkaWordCount Beispiel auf einem eigenständigen Funken Cluster ausgeführt:Spark-Streaming - KafkaWordCount nicht auf einem Spark-Standalone-Cluster laufen kann

I Beispiel Zündkerzen laufen kann, KafkaWordCount, im lokalen Modus durch

und ich kann die Nachricht von Kafka Server erhalten, die in einem anderen Knoten (Virtual Machine) ist und die Ergebnisse auf Terminal-Konsole gedruckt bekommen.

jedoch, wenn die antragstell eigenständigen Cluster zu entfachen (via

spark-submit .... --master spark://master:7077 .... 

), fand ich Ausnahmen in einzelnen Verzeichnissen Arbeiter Knoten $ SPARK_HOME/Arbeit /../../ Stderr Verzeichnis. Und das Ergebnis jedes Wortzählungsstapels ist NICHT gedruckt auf $ SPARK_HOME/work/../ .. stdout in jedem Worker-Knoten.

Hier ist meine jeder Funke Arbeiter Knoten Einstellungen von Ressource in $ SPARK_HOME/conf/spark-env.sh:

export SPARK_MASTER_IP=master 
export SPARK_WORKER_CORES=4 
export SPARK_WORKER_MEMORY=3g 
export SPARK_WORKER_INSTANCES=2 

Ich habe 5 virtuelle Maschine Knoten (in Host-Namen hier): mykafka, Meister, data1 , Daten2 und Daten3.

Vielen Dank für Ihre Hilfe und Beratung im Voraus.

Die folgenden sind die Ausnahmen von RpcTimeoutException in jedem Arbeiter gefunden:

16/04/11 23:07:30 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval 
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 
    at scala.util.Try$.apply(Try.scala:161) 
    at scala.util.Failure.recover(Try.scala:185) 
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
    .... 
    .... 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds 
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
    ... 7 more 
16/04/11 23:07:31 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 
beat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval 
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
    .... 
    .... 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds 
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
    ... 7 more 
+0

Sie gesagt, dass Sie 5 virtuelle Maschinen sind in der Lage sie zu jedem verbinden andere? Gibt es eine Firewall, die den Zugriff auf bestimmte Ports verhindert? Sind Server an eine öffentliche Schnittstelle gebunden? – Vishnu667

+0

Ja, sie können sich über SSH-Passwort-Einstellungen miteinander verbinden. Und ich kann auch andere Funke-Jobs im Garn-Client-Modus erfolgreich einreichen. Auch vom Master-Knoten aus kann ich mich mit dem Knoten "mykafka" verbinden. – Alan

+0

Ich habe es im Garn-Client-Modus eingereicht und festgestellt, dass einer der Executoren nicht gestartet werden kann: {{JAVA_HOME}}/bin/java-server -XX: OnOutOfMemoryError = 'kill% p' -Xms4096m -Xmx4096m -Djava.io .tmpdir = {{PWD}}/tmp '-Dspark.driver.port = 44618' -Dspark.yarn.app.container.log.dir = -XX: MaxPermSize = 256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url funke: // CoarseGrainedScheduler – Alan

Antwort

3

So hatte ich genau das gleiche Problem mit diesem Beispiel, und es scheint zu diesem Fehler verwandt zu sein https://issues.apache.org/jira/browse/SPARK-13906

Nicht sicher, wie dies für das Beispiel eingestellt wird, aber ich experimentierte mit dem Code, baute eine kleine Scala-App und hatte einen zusätzlichen Config-Parameter zum SparkCo hinzugefügt (Nf)

val conf = new SparkConf() 
.setAppName('name') 
.set("spark.rpc.netty.dispatcher.numThreads","2") 

Credit David Gomez und der Funken Mailer wo nach eine Menge googleing ich die Lösung gefunden

https://mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/%[email protected].com%3E

+0

Kein Problem Ich stieß gestern auf diesen Beitrag mit dem gleichen Problem gefunden, die richtige google-Satz, um diese Mailer zu finden – Robrotheram

+0

Robrotheram, ich danke Ihnen noch einmal. Ihre Hilfe ist mir sehr wichtig. Davon profitieren auch meine anderen laufenden Projekte und Pläne. Ich habe versucht, alle Parameter oder Konfigurationen für das Senden von Funken zu optimieren, aber keiner von ihnen funktioniert im Entwicklungscluster, und ich kann nicht sehen, ob dies auf der Registerkarte "Streaming" von Spark UI gut funktioniert. Diese Information ist hilfreich und ein großer Schritt für mich. Alan – Alan

Verwandte Themen