2017-03-28 7 views
0

I flink Job wie unten zu lesen Daten von Apache Kafka & Druck zu laufen versuchen:Apache Flink - mehrere Ausgangsleitungen

Java-Programm

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    Properties properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "test.net:9092"); 
    properties.setProperty("group.id", "flink_consumer"); 
    properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2"); 
    properties.setProperty("topic", "topic_name"); 

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties)); 

      messageStream.rebalance().map(new MapFunction<String, String>() { 
       private static final long serialVersionUID = -6867736771747690202L; 

       public String map(String value) throws Exception { 
        return "Kafka and Flink says: " + value; 
       } 
      }).print(); 

      env.execute(); 

Scala-Code

var properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092"); 
    properties.setProperty("group.id", "flink_consumer"); 
    properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2"); 
    properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel"); 
    var env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    var stream:DataStream[(String)] = env 
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties)); 
    stream.print(); 
    env.execute(); 

Immer wenn ich dies in App in Eclipse ausführen, sehe ich unten zu Beginn mit:

03/27/2017 20:06:19 Die Jobausführung wechselt in den Status RUNNING.

03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (1/4) zu SCHEDULED 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink : Unbenannt (1/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Waschbecken: Unbenannt (2/4) wechselte zu SCHEDULED 03/27/2017 20:06:19 Quelle : Benutzerdefinierte Quelle -> Sink: Unbenannt (2/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (3/4) wechselte zu SCHEDULED 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Senke: Unbenannt (3/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Senke: Unbenannt (4/4) geschaltet zu geplant 03/27/2017 20:06:19 Sauer ce: Benutzerdefinierte Quelle -> Sink: Unbenannt (4/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (4/4) wechselte zu RUNNING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (2/4) wechselte zu RUNNING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (1/4) wechselte zu RUNNING 2017.03.27 20.06.19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (3/4) geschaltet

Frage an RUNNING ich habe ist:

1) Warum sehe ich 4 Instanz der Senke in allen Fällen (geplant, bereitgestellt und ausgeführt).

2) Für jede in Apache Kafka empfangene Zeile sehe ich, dass sie hier mehrmals gedruckt wird, meistens 4 Mal. Was ist ein Grund?

Idealerweise möchte ich jede Zeile nur einmal lesen und damit weiter bearbeiten. Jede Eingabe/Hilfe wird spürbar sein!

+0

Wie viele Partitionen hat Ihr Thema? –

+0

Thema, das ich verwende, habe PartitionCount: 6 & ReplicationFactor: 2 –

Antwort

2

Wenn Sie das Programm in der LocalStreamEnvironment ausführen (die Sie erhalten, wenn Sie StreamExecutionEnvironment.getExecutionEnvironment() in einer IDE aufrufen) ist die Standardparallelität aller Operatoren gleich der Anzahl der CPU-Kerne.

Also in Ihrem Beispiel ist jeder Operator in vier Teilaufgaben parallelisiert. Im Protokoll sehen Sie eine Nachricht für jeden dieser vier Teilaufgaben (3/4 zeigt an, dass dies die dritte von insgesamt vier Aufgaben ist).

Sie können die Anzahl der Teilaufgaben steuern, indem Sie StreamExecutionEnvironment.setParallelism(int) aufrufen oder setParallelism(int) für jeden einzelnen Operator aufrufen.

In Anbetracht Ihres Programms sollten die Kafka-Datensätze nicht repliziert werden. Jeder Datensatz sollte nur einmal gedruckt werden. Da die Datensätze jedoch parallel geschrieben werden, wird der Ausgabezeile das Präfix x> vorangestellt, wobei x die ID der parallelen Teilaufgabe angibt, die die Zeile ausgegeben hat.

+0

Vielen Dank für Ihre Eingabe! Laut Ihrem Kommentar habe ich Folgendes versucht: 1) StreamExecutionEnvironment.setDefaultLocalParallelism (1); Ich möchte nur jeden Datensatz einmal drucken oder verarbeiten. Dies funktioniert besser als zuvor. Ich sehe, dass einige Zeilen nur einmal gedruckt werden, aber nach einer Weile beginnen sie wieder, sich ähnlich zu verhalten und mehrere Zeilen zu drucken. 2) env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelismus (1); Immer noch Druckausgabe Linie mehrmals. Irgendwann muss ich diese Ausgabe verarbeiten und die Daten in Cassandra ablegen, also ist es meine Hauptsorge, dass sie mehrere Zeilen verarbeiten und speichern wird. Bitte teilen Sie Ihre Eingaben! –

+0

Die Programme, die Sie gepostet haben, um die Daten nicht zu replizieren. Sind Sie sicher, dass die Daten nicht innerhalb des Kafka-Themas repliziert werden? –

+0

Geprüft, es scheint, als gäbe es auch Probleme mit Ereignissen, die in Kafka gedruckt wurden. Vielen Dank Fabian! –