0

Ich habe ein Thema mit dem Namen test erstellt und einige Zeichenfolgen in der Konsole mit Konsole Producer geschrieben.Lesen von Daten in Flink-Programm von Kafka-Konsolenhersteller

./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092 

Zum Glück, ich bin der Lage, Daten in der Konsole unter Verwendung console-consumer hergestellt zu lesen. Nun wollte ich die Ausgabe von console-consumer in Flink Programm mit folgendem Code gemacht verbrauchen

public class ReadFromKafka { 


    public static void main(String[] args) throws Exception { 
     // create execution environment 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "test"); 


     DataStream<String> message = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties)); 

     message.map(new MapFunction<String, String>() { 
      private static final long serialVersionUID = -6867736771747690202L; 

      @Override 
      public String map(String value) throws Exception { 
       return " Value: " + value; 
      } 
     }).print(); 

     env.execute(); 


    } //main 
} //ReadFromKafka 

Inhalt des pom.xml wird wie folgt

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 

    <modelVersion>4.0.0</modelVersion> 

    <groupId>org.stsffap</groupId> 
    <artifactId>cep-monitoring</artifactId> 
    <name>cep-monitoring</name> 
    <version>1.0</version> 

    <packaging>jar</packaging> 

    <properties> 
     <flink.version>1.0.1</flink.version> 

     <slf4j.version>1.7.7</slf4j.version> 
     <log4j.version>1.2.17</log4j.version> 
    </properties> 

    <dependencies> 

     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
      <version>${slf4j.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>${log4j.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-java_2.10</artifactId> 
      <version>${flink.version}</version> 
     </dependency> 
<dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-java</artifactId> 
     <version>1.3.2</version> 
    </dependency> 

     <dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-clients_2.10</artifactId> 
    <version>1.3.2</version> 
    </dependency> 


<dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-cep_2.10</artifactId> 
      <version>${flink.version}</version> 
     </dependency> 


     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-kafka-0.8_2.10</artifactId> 
      <version>1.3.2</version> 

     </dependency> 

    </dependencies> 

    <build> 
     <plugins> 

      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.1</version> 

       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
        <compilerId>jdt</compilerId> 
       </configuration> 
       <dependencies> 
        <dependency> 
         <groupId>org.eclipse.tycho</groupId> 
         <artifactId>tycho-compiler-jdt</artifactId> 
         <version>0.21.0</version> 
        </dependency> 
       </dependencies> 
      </plugin> 




     </plugins> 
    </build> 
</project> 

Jedes Mal, wenn ich diesen Code ausführen ich Fehler Nach bekam

objc[892]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x109f654c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10afd44e0). One of the two will be used. Which one is undefined. 
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedFunction 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at org.stsffap.cep.monitoring.ReadFromKafka.main(ReadFromKafka.java:24) 
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedFunction 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    ... 25 more 

auch Version von Kafka, ich von unten gefunden verwende Befehl

find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*' 

ist

kafka_2.11-0.9.0.0-javadoc.jar 

Muss ich Version von Kafka verwenden .8.x müssen, meinem Beispiel zu laufen?

enter image description here

Kommentare und Anregungen sehr appriciated sind. Danke im Voraus. Einen guten haben!

Antwort

0

Mein Programm, indem sie folgende Änderungen zu arbeiten begann, ich habe die Kafka-Version aktualisiert

<dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-kafka-0.9_2.10</artifactId> 
      <version>${flink.version}</version> 

     </dependency> 

ich die Flink Version von 1.0.1 auf 1.1.2 aktualisiert .9.x, wie unten gezeigt

<properties> 
     <!-- <flink.version>1.0.1</flink.version>--> 

     <flink.version>1.1.2</flink.version> 
     <slf4j.version>1.7.7</slf4j.version> 
     <log4j.version>1.2.17</log4j.version> 
    </properties> 

    <dependencies> 

     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
      <version>${slf4j.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>${log4j.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-java_2.10</artifactId> 
      <version>${flink.version}</version> 
     </dependency> 
<dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-java</artifactId> 
     <version>${flink.version}</version> 
    </dependency> 

     <dependency> 

    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-clients_2.10</artifactId> 
    <version>${flink.version}</version> 
    </dependency> 
+1

Wenn dies Ihr Problem gelöst hat, können Sie es als die lösende Antwort markieren? – twalthr