2017-11-19 2 views
0

Ich weiß nicht, warum die vom Hersteller gesendeten Daten nicht den Verbraucher erreichen. Ich arbeite an Cloudera virtuellen Maschine. Ich versuche, einfache Produzent Verbraucher zu schreiben, wo der Produzent Kafka verwendet und Verbraucher Sparks-Streaming verwendet.Kafka und Spark Streaming Simple Producer Verbraucher

Der Hersteller Kode in scala:

import java.util.Properties 
import org.apache.kafka.clients.producer._ 

object kafkaProducer { 

    def main(args: Array[String]) { 
    val props = new Properties() 
    props.put("bootstrap.servers", "localhost:9092") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 

    val producer = new KafkaProducer[String, String](props) 

    val TOPIC = "test" 

    for (i <- 1 to 50) { 
     Thread.sleep(1000) //every 1 second 
     val record = new ProducerRecord(TOPIC, generator.getID().toString(),generator.getRandomValue().toString()) 
     producer.send(record) 
    } 

    producer.close() 
    } 
} 

Der Consumer-Code in scala:

import java.util 

import org.apache.kafka.clients.consumer.KafkaConsumer 

import scala.collection.JavaConverters._ 
import java.util.Properties 

import kafka.producer._ 

import org.apache.spark.rdd.RDD 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.kafka._ 

object kafkaConsumer { 
     def main(args: Array[String]) { 


     var totalCount = 0L 
     val sparkConf = new SparkConf().setMaster("local[1]").setAppName("AnyName").set("spark.driver.host", "localhost") 
     val ssc = new StreamingContext(sparkConf, Seconds(2)) 
     ssc.checkpoint("checkpoint") 
     val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1)) 

     stream.foreachRDD((rdd: RDD[_], time: Time) => { 
      val count = rdd.count() 
      println("\n-------------------") 
      println("Time: " + time) 
      println("-------------------") 
      println("Received " + count + " events\n") 
      totalCount += count 
     }) 
     ssc.start() 
     Thread.sleep(20 * 1000) 
     ssc.stop() 

     if (totalCount > 0) { 
      println("PASSED") 
     } else { 
      println("FAILED") 
     } 
     } 
} 
+0

Ich schätze, dass Sie Hersteller und Verbraucher nacheinander starten ??? – nabongs

+0

Ja, ich starte den Verbraucher dann den Produzenten, während der Verbraucher läuft. – MennatAllahHany

+0

Haben Sie Ihren Herstellercode von einem Konsolenhersteller und Ihren Verbrauchercode von einem Konsolenhersteller getestet? Kafka - Spark-Integration kann knifflig werden ... –

Antwort

0

Das Problem durch eine Änderung im Consumer-Code behoben wird die Zeile:

 val stream = KafkaUtils.createStream(ssc, "localhost:9092", "spark-streaming-consumer-group", Map("test" -> 1)) 

Der zweite Parameter sollte der Zoowärter-Port sein, den 2181 nicht 9092 und der Tierpfleger eine Verbindung herstellen kann der Kafka-Port 9092 automatisch.

Hinweis: Kafka sollte vom Terminal gestartet werden, bevor sowohl der Hersteller als auch der Verbraucher ausgeführt wird.

Verwandte Themen