2017-08-31 4 views
0

Ich versuche, Funf und Kafka zu integrieren, um die Nachrichten von Kafka zu konsumieren. Ich habe Producer-Code auch, um Nachrichten über das Thema "temp" zu senden. Außerdem verwende ich Kafkas Console Producer, um die Nachrichten im Thema "temp" zu produzieren.Kafka Spark Streaming Consumer erhält keine Nachrichten vom Kafka Console Producer?

Ich habe unten Code erstellt, um die Nachrichten aus dem gleichen "Temp" Thema zu konsumieren, aber es wird auch keine einzelne Nachricht erhalten.

Programm:

import java.util.Arrays; 
import java.util.Map; 
import java.util.HashMap; 
import static org.apache.commons.lang3.StringUtils.SPACE; 

import org.apache.spark.SparkConf; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import scala.Tuple2; 
import org.apache.log4j.Logger; 
import org.apache.spark.api.java.JavaSparkContext; 
import scala.collection.immutable.ListSet; 
import scala.collection.immutable.Set; 

public class ConsumerDemo { 

    public void main() { 
     String zkGroup = "localhost:2181"; 
     String group = "test"; 
     String[] topics = {"temp"}; 
     int numThreads = 1; 

     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
     Map<String, Integer> topicMap = new HashMap<>(); 
     for (String topic : topics) { 
      topicMap.put(topic, numThreads); 
     } 
     System.out.println("topics : " + Arrays.toString(topics)); 
     JavaPairReceiverInputDStream<String, String> messages 
       = KafkaUtils.createStream(jssc, zkGroup, group, topicMap); 

     messages.print(); 

     JavaDStream<String> lines = messages.map(Tuple2::_2); 

     //lines.print(); 
     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); 

     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) 
       .reduceByKey((i1, i2) -> i1 + i2); 

     //wordCounts.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

    public static void main(String[] args) { 
     System.out.println("Started..."); 
     new ConsumerDemo().main(); 
     System.out.println("Ended..."); 
    } 
} 

Ich habe folgende Abhängigkeiten in der pom.xml-Datei:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.9.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.11.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.2.0</version> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>0.9.0-incubating</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.3</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

    <dependency> 
     <groupId>org.anarres.lzo</groupId> 
     <artifactId>lzo-core</artifactId> 
     <version>1.0.5</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.8.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.module</groupId> 
     <artifactId>jackson-module-scala_2.10</artifactId> 
     <version>2.8.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.msiops.footing</groupId> 
     <artifactId>footing-tuple</artifactId> 
     <version>0.2</version> 
    </dependency> 

Ist mir fehlt etwas Abhängigkeit oder Ausgabe ist im Code? Warum erhält dieser Code keine Nachrichten?

+0

Sind Sie in der Lage Nachrichten zu konsumieren Konsolen Verbraucher mit? Wenn nicht, könnte es ein Problem mit dem Produzenten geben. Überprüfen Sie auch, ob Ihre Portnummer korrekt ist oder nicht. Ich glaube nicht, dass es ein Problem in POM geben sollte, wenn es eines gibt, sollte es Ihnen nicht erlauben, das Projekt zu erstellen/zu kompilieren. –

+0

@ NileshPharate- Ja Ich bin in der Lage, Nachrichten mit Console Consumer von Kafka zu konsumieren, so dass wir sagen können, dass das Problem nicht mit Kafka oder Tierpfleger zusammenhängt, d. – kit

Antwort

0

Sie rufen nicht die Methode auf, über die Sie Code zum Herstellen einer Verbindung und zum Empfangen von Nachrichten von Kafka haben. Schreiben Sie diese Logik entweder in public static void main() oder rufen Sie die Methode auf, in der Sie diese Logik geschrieben haben.

0

Bei der Verwendung von Kafka Consumer, und insbesondere beim Testen und Debuggen in der Entwicklungsumgebung, darf der Hersteller Kafka Nachrichten nicht kontinuierlich senden. In diesem Szenario müssen wir uns um diesen Kafka-Consumer-Parameter kümmern auto.offset.reset, der bestimmt, ob nur neue Nachrichten gelesen werden sollen, die in Thema geschrieben werden, nachdem der Verbraucher gestartet wird? oder von Anfang des Themas

hier ist die offizielle Erklärung in Kafka documentation zu lesen gegeben:

auto.offset.reset
Was tun, wenn es keine anfänglichen Versatz in Kafka oder wenn die aktuelle Offset existiert nicht mehr auf dem Server (zB weil die Daten gelöscht wurden):

  1. frühestens: automatisch das die frühesten Offset zurückgesetzt Offset
  2. zuletzt: automatisch auf die neueste der Offset zurücksetzen Offset
  3. none: für den Verbraucher Exception werfen, wenn keine vorherige für die Verbrauchergruppe
  4. etwas anderes gefunden versetzt ist: throw Ausnahme für den Verbraucher.

ein Beispielcode-Schnipsel wie KafkaDStream mit kafkaParams wie unten zu erstellen:

Map<String,String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("zookeeper.connect", "localhost:2181"); 
    kafkaParams.put("group.id", "test02"); //While you are testing the codein develeopment system, change this groupid each time you run the consumer 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("metadata.broker.list", "localhost:9092"); 
    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    Map<String, Integer> topics = new HashMap<String, Integer>(); 
    topics.put("temp", 1); 
    StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); 

    JavaPairDStream<String, String> messages = 
     KafkaUtils.createStream(jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topics, 
       storageLevel)  
     ; 
    messages.print(); 
+0

@ remisharoon- Ich bekomme nur Nachrichten mit Zeitstempel in Millisekunden. Was bedeutet das?Unten ist die Beispielausgabe ------------------------------------------- Zeit: 1504785338000 ms ------------------------------------------- ------------------------------------------- Zeit: 1504785340000 ms - ------------------------------------------ – kit

+0

@kit, was das bedeutet ein "leerer DStream". dh. es liest keine Aufzeichnungen von Kafka. Bitte versuche das Kafka-Thema zu schreiben, nachdem du den SparkStreming-Job gestartet hast –

+0

@ remisharoon- Ich sende Nachrichten an dasselbe kafka-Thema von kafkas Konsolenhersteller ... es wird trotzdem leeres DStream gedruckt ... Was wird der Grund dafür sein? ? – kit

Verwandte Themen