Arbeiten mit Kafka Spark-Streaming. Kann die vom Producer gesendeten Daten lesen und verarbeiten. Ich habe hier ein Szenario, nehmen wir an, Producer produziert Nachrichten und Consumer wird für eine Weile abgestellt und eingeschaltet. Jetzt liest der Conumser nur Live-Daten. Stattdessen sollten die Daten auch dort gespeichert worden sein, wo sie nicht mehr gelesen werden konnten. Hier ist die pom.xml, die ich benutzt habe.Kafka Spark-Streaming Offset Ausgabe
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.0.1</spark.version>
<kafka.version>0.8.2.2</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 -->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_2.11</artifactId>
<version>3.2.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>
Ich habe versucht, mit Kafka-v0.10.1.0 Producer und Conumser zu arbeiten. Das Verhalten ist wie erwartet (Verbraucher liest Daten von dort, wo sie geblieben sind). In dieser Version wird also der Offset korrekt aufgenommen.
Haben Sie versucht, die gleiche Version in oben pom.xml zu verwenden, aber mit java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
fehlgeschlagen.
Ich verstehe die Kompatibilität von Versionen, aber ich bin auch auf der Suche nach kontinuierlichen Stream.
Haben Sie sich meine Antwort angesehen? Wurden die Probleme gelöst? – oh54