3

Ich versuche, meine spark Anwendung im lokalen Modus auszuführen. Um alles einzurichten, folgte ich diesem Tutorial: http://blog.d2-si.fr/2015/11/05/apache-kafka-3/, (in Französisch) jeden Schritt zum Aufbau der lokalen kafka/zookeeper Umgebung.Lokale Kafka-Anwendung fehlgeschlagen mit: NoSuchMethodError: createEphemeral

Außerdem verwende ich IntelliJ mit folgenden Konfiguration:

val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]") 

Und meine config, für den Verbraucher:

"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1" 

Und für den Hersteller:

"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300 

Vorweg Ich habe überprüft, ob der Verbraucher die Eingaben vom Hersteller mit der Standardeinstellung erhalten hat console-producer und console-consumer von kafka_2.10-0.9.0.1; es tut.

Aber ich bin vor den folgenden Fehler:

java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V 
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921) 
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348) 
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) 
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636) 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) 
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967) 
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:254) 
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:156) 
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111) 
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

ich das nicht bei der Lösung nicht gelungen. Ich dachte, es wäre ein zookeeper -config-Fehler, aber nach dem Vergleich mit einer funktionierenden Version der Anwendung auf einem anderen Rechner mit den gleichen Konfigurationsdateien, scheint es nicht mehr.

Antwort

4

Sieht aus, als ob Sie hier ein Abhängigkeitsproblem haben.

Überprüfen Sie die Version der Bibliothek com.101tec.zkclient in unserem Klassenpfad. Kafka benötigt die Version 0.7

Darüber hinaus seit kafka_2.10-0.9.0.1 die APIs für Hersteller und Verbraucher nicht mehr zookee verwenden. Es scheint, dass Spark-Streaming in Ihrem Fall eine 0.8 Version von Kafka verwendet.

+0

Die Antwort war einfach, als ich dachte, aber schwer zu verstehen. Das hat mein Problem gelöst. Vielen Dank. – wipman

Verwandte Themen