Ich versuche, von einem Kafka Thema mit Spark-Streaming direkt Strom zu lesen, aber ich erhalte die folgenden Fehler:Konnte nicht von Funken Verbindung zu Kafka Streaming: org.apache.spark.SparkException: java.net.SocketTimeoutException
INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
Ich habe Kafka 0.7.1 und Spark 1.5.2.
Ich verwende den folgenden Code:
val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))
val topicsSet = Set("myTopic")
val kafkaParams = Map[String, String]
("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
Ich bin sicher, dass das Thema bereits vorhanden ist, weil andere Anwendungen richtig von ihm zu lesen sind.
Sind Sie sicher, dass die Netzwerkkommunikation von dem Ort, Sie versuchen, Kafka zuzugreifen richtig konfiguriert? –