0

Arbeiten mit Kafka Spark-Streaming. Kann die vom Producer gesendeten Daten lesen und verarbeiten. Ich habe hier ein Szenario, nehmen wir an, Producer produziert Nachrichten und Consumer wird für eine Weile abgestellt und eingeschaltet. Jetzt liest der Conumser nur Live-Daten. Stattdessen sollten die Daten auch dort gespeichert worden sein, wo sie nicht mehr gelesen werden konnten. Hier ist die pom.xml, die ich benutzt habe.Kafka Spark-Streaming Offset Ausgabe

<properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <spark.version>2.0.1</spark.version> 
     <kafka.version>0.8.2.2</kafka.version> 
    </properties> 


    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.11</artifactId> 
      <version>1.6.2</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json/json --> 
     <dependency> 
      <groupId>org.json</groupId> 
      <artifactId>json</artifactId> 
      <version>20160810</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 --> 
     <dependency> 
      <groupId>org.json4s</groupId> 
      <artifactId>json4s-ast_2.11</artifactId> 
      <version>3.2.11</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.2.0</version> 
     </dependency> 

Ich habe versucht, mit Kafka-v0.10.1.0 Producer und Conumser zu arbeiten. Das Verhalten ist wie erwartet (Verbraucher liest Daten von dort, wo sie geblieben sind). In dieser Version wird also der Offset korrekt aufgenommen.

Haben Sie versucht, die gleiche Version in oben pom.xml zu verwenden, aber mit java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker fehlgeschlagen.

Ich verstehe die Kompatibilität von Versionen, aber ich bin auch auf der Suche nach kontinuierlichen Stream.

+0

Haben Sie sich meine Antwort angesehen? Wurden die Probleme gelöst? – oh54

Antwort

0

Das unterschiedliche Verhalten ergibt sich wahrscheinlich aus der Tatsache, dass Kafka einige ziemlich große Änderungen zwischen Versionen 0.8 und 0.10 erfahren hat.

Sofern Sie nicht unbedingt die alte Version verwenden müssen, schlage ich vor, zu neueren zu wechseln.

einen Blick auf diesen Link nehmen:

https://spark.apache.org/docs/latest/streaming-kafka-integration.html

Das Kafka-Projekt einen neuen Verbraucher api zwischen den Versionen 0.8 und 0.10 eingeführt, so gibt es 2 separate entsprechenden Spark-Streaming-Pakete zur Verfügung.

Wenn Sie Kafka v0.10.1.0 verwenden möchten, müssen Sie also etwas kafka Funken geben Integration Abhängigkeit https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 bei Streaming.

So etwas wie dies zum Beispiel:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

Zusätzliche Anmerkung: Sie verwenden hadoop 2.2.0, die im Oktober veröffentlicht wurde, 2013 und ist damit alte in Hadoop Bedingungen, sollten Sie es auf ein sich änderndes neuere Version.

Lassen Sie mich wissen, ob dies hilft.

Verwandte Themen