2017-01-09 5 views
1

Ich versuche, die Daten von Kafka Thema in Flink Streaming zu lesen. Ich versuche, den folgenden Beispiel-Code auszuführen, die dort auf Seite APACHE Flink 1.1.3 Dokumentation als ein Beispiel ist: Apache kafka-Anschluss,Flink mit Kafka als Quelle

import java.util.Properties; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.util.serialization.DeserializationSchema; 
import org.apache.flink.streaming.util.serialization.SerializationSchema; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

public class stock_streaming_kafka { 

    public static void main(String[] args) throws Exception 
    { 
     StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties); 

    DataStream<String> stream = env 
     .addSource(myConsumer) 
     .print(); 
} 

}

Ich habe folgende Fehlermeldung:

Exception in thread "main" java.lang.Error: Unresolved compilation problems: 
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files 
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>) 

at stock_streaming_kafka.main(stock_streaming_kafka.java:25) 

Können Sie mir bitte helfen, dies zu beheben? Gibt es ein Abhängigkeitsproblem mit dem Kafka-Connector? Meine Versionen sind:

  1. Flink 1.1.3
  2. Kafka 2.10
  3. flink-connector-kafka-0.9_2.11-1.0.0.jar

Antwort

1

Bitte verwenden Sie die folgenden Versionen. Es wird mit Ihrer Kafka-Version funktionieren.

<dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-java_2.11</artifactId> 
      <version>1.1.4</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-kafka-0.9_2.10</artifactId> 
      <version>1.1.3</version> 
     </dependency> 

Ich sehe ein Kompilierungsproblem im Code. diese

Wechsel:

DataStream<String> stream = env 
     .addSource(myConsumer) 
     .print(); 

zu:

DataStream<String> stream = env 
     .addSource(myConsumer); 
stream.print(); 

Wenn es immer noch nicht für Sie arbeiten dann lass es mich wissen und ich werde den Arbeits Code teilen.

0

Die Versionen von Flink und der Flink-Konnektor muss übereinstimmen. Aktualisieren Sie die flink-connector Abhängigkeit zu 1.1.3.

+0

Danke Fabian. Ich habe die jar-Version in flink-connector-kafka-0.9_2.11-1.1.3.jar geändert. Aber ich habe den Fehler als _Typ org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema kann nicht aufgelöst werden. Es wird indirekt von erforderlichen .class files_ referenziert – kadsank

0

Da die Antwort noch nicht akzeptiert ist, hier eine complete Maven code example, um Daten von Kafka mit Flink zu lesen.

Möglicherweise müssen Sie die pom.xml anpassen, um Ihre Einstellungen von Kafka und Scala anzupassen.

Hoffe, das hilft.