Ich versuche, Funf und Kafka zu integrieren, um die Nachrichten von Kafka zu konsumieren. Ich habe Producer-Code auch, um Nachrichten über das Thema "temp" zu senden. Außerdem verwende ich Kafkas Console Producer, um die Nachrichten im Thema "temp" zu produzieren.Kafka Spark Streaming Consumer erhält keine Nachrichten vom Kafka Console Producer?
Ich habe unten Code erstellt, um die Nachrichten aus dem gleichen "Temp" Thema zu konsumieren, aber es wird auch keine einzelne Nachricht erhalten.
Programm:
import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import static org.apache.commons.lang3.StringUtils.SPACE;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.immutable.ListSet;
import scala.collection.immutable.Set;
public class ConsumerDemo {
public void main() {
String zkGroup = "localhost:2181";
String group = "test";
String[] topics = {"temp"};
int numThreads = 1;
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port", "7077").set("spark.executor.memory", "1g");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
System.out.println("topics : " + Arrays.toString(topics));
JavaPairReceiverInputDStream<String, String> messages
= KafkaUtils.createStream(jssc, zkGroup, group, topicMap);
messages.print();
JavaDStream<String> lines = messages.map(Tuple2::_2);
//lines.print();
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
//wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
public static void main(String[] args) {
System.out.println("Started...");
new ConsumerDemo().main();
System.out.println("Ended...");
}
}
Ich habe folgende Abhängigkeiten in der pom.xml-Datei:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>0.9.0-incubating</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.3</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.anarres.lzo</groupId>
<artifactId>lzo-core</artifactId>
<version>1.0.5</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.msiops.footing</groupId>
<artifactId>footing-tuple</artifactId>
<version>0.2</version>
</dependency>
Ist mir fehlt etwas Abhängigkeit oder Ausgabe ist im Code? Warum erhält dieser Code keine Nachrichten?
Sind Sie in der Lage Nachrichten zu konsumieren Konsolen Verbraucher mit? Wenn nicht, könnte es ein Problem mit dem Produzenten geben. Überprüfen Sie auch, ob Ihre Portnummer korrekt ist oder nicht. Ich glaube nicht, dass es ein Problem in POM geben sollte, wenn es eines gibt, sollte es Ihnen nicht erlauben, das Projekt zu erstellen/zu kompilieren. –
@ NileshPharate- Ja Ich bin in der Lage, Nachrichten mit Console Consumer von Kafka zu konsumieren, so dass wir sagen können, dass das Problem nicht mit Kafka oder Tierpfleger zusammenhängt, d. – kit