I flink Job wie unten zu lesen Daten von Apache Kafka & Druck zu laufen versuchen:Apache Flink - mehrere Ausgangsleitungen
Java-Programm
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "test.net:9092");
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("zookeeper.connect", "dev.com:2181,dev2.com:2181,dev.com:2181/dev2");
properties.setProperty("topic", "topic_name");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>("topic_name", new SimpleStringSchema(), properties));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute();
Scala-Code
var properties = new Properties();
properties.setProperty("bootstrap.servers", "msg01.staging.bigdata.sv2.247-inc.net:9092");
properties.setProperty("group.id", "flink_consumer");
properties.setProperty("zookeeper.connect", "host33.dev.swamp.sv2.tellme.com:2181,host37.dev.swamp.sv2.tellme.com:2181,host38.dev.swamp.sv2.tellme.com:2181/staging_sv2");
properties.setProperty("topic", "sv2.staging.rtdp.idm.events.omnichannel");
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var stream:DataStream[(String)] = env
.addSource(new FlinkKafkaConsumer082[String]("sv2.staging.rtdp.idm.events.omnichannel", new SimpleStringSchema(), properties));
stream.print();
env.execute();
Immer wenn ich dies in App in Eclipse ausführen, sehe ich unten zu Beginn mit:
03/27/2017 20:06:19 Die Jobausführung wechselt in den Status RUNNING.
03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (1/4) zu SCHEDULED 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink : Unbenannt (1/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Waschbecken: Unbenannt (2/4) wechselte zu SCHEDULED 03/27/2017 20:06:19 Quelle : Benutzerdefinierte Quelle -> Sink: Unbenannt (2/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (3/4) wechselte zu SCHEDULED 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Senke: Unbenannt (3/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Senke: Unbenannt (4/4) geschaltet zu geplant 03/27/2017 20:06:19 Sauer ce: Benutzerdefinierte Quelle -> Sink: Unbenannt (4/4) wechselte zu DEPLOYING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (4/4) wechselte zu RUNNING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (2/4) wechselte zu RUNNING 03/27/2017 20:06:19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (1/4) wechselte zu RUNNING 2017.03.27 20.06.19 Quelle: Benutzerdefinierte Quelle -> Sink: Unbenannt (3/4) geschaltet
Frage an RUNNING ich habe ist:
1) Warum sehe ich 4 Instanz der Senke in allen Fällen (geplant, bereitgestellt und ausgeführt).
2) Für jede in Apache Kafka empfangene Zeile sehe ich, dass sie hier mehrmals gedruckt wird, meistens 4 Mal. Was ist ein Grund?
Idealerweise möchte ich jede Zeile nur einmal lesen und damit weiter bearbeiten. Jede Eingabe/Hilfe wird spürbar sein!
Wie viele Partitionen hat Ihr Thema? –
Thema, das ich verwende, habe PartitionCount: 6 & ReplicationFactor: 2 –