2016-04-22 13 views
1

Ich versuche, von einem Kafka Thema mit Spark-Streaming direkt Strom zu lesen, aber ich erhalte die folgenden Fehler:Konnte nicht von Funken Verbindung zu Kafka Streaming: org.apache.spark.SparkException: java.net.SocketTimeoutException

INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException 
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException 
java.net.SocketTimeoutException 
org.apache.spark.SparkException: java.net.SocketTimeoutException 
java.net.SocketTimeoutException 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at scala.util.Either.fold(Either.scala:97) 
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) 

Ich habe Kafka 0.7.1 und Spark 1.5.2.

Ich verwende den folgenden Code:

val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60)) 
    val topicsSet = Set("myTopic") 
    val kafkaParams = Map[String, String] 
      ("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092") 

    val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet) 

Ich bin sicher, dass das Thema bereits vorhanden ist, weil andere Anwendungen richtig von ihm zu lesen sind.

+0

Sind Sie sicher, dass die Netzwerkkommunikation von dem Ort, Sie versuchen, Kafka zuzugreifen richtig konfiguriert? –

Antwort

0

Versuchen Sie nicht, ältere Version von Kafka zu verwenden, in Ihrem Fall ist es (0.7.1). Wenn Sie einen starken Grund haben, 0.7.1 zu verwenden, lassen Sie es mich wissen. Wenn Sie Ihre Ausnahme betrachten, sieht es so aus, als ob die Anwendung keine Verbindung zu Kafka-Brokern herstellen kann.

Ich habe diesen direkten Strom api verwendet, um von Kafka 0.8.2 zu lesen. https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala

Hoffe, das wird Ihr Problem lösen.

Dank & Grüße, Vikas Gite

+0

Sie haben Recht. Ich habe es mit Kafka 0.8.2 versucht und es funktioniert reibungslos. Leider laufen unsere Anwendungen derzeit auf Kafka 0.7.1, so dass wir es auf 0.8.2 migrieren müssen, es sei denn, irgendjemand kennt eine Problemumgehung. Danke für Ihre Hilfe! – nicola

Verwandte Themen