Ich habe einen Job, der Flinks Side-Output-Fähigkeit nutzt, um Daten in verschiedene Kafka-Senken zu schreiben. Die Seitenausgabe ruft Daten ab, wenn sie in der IDE ausgeführt wird, aber nicht im Flink-Cluster. Irgendeine Idee warum? HierDaten, die auf IDE-Seite geschrieben werden, aber nicht auf Cluster
ist ein Beispiel für den Code:
final OutputTag<SideOutputObject> sideOutputTag = new OutputTag<SideOutputObject>("side-output-tag"){};
SingleOutputStreamOperator<String> processedDataStream = outputStream
.process(new ProcessAndSortBinaryData(sideOutputTag))
.startNewChain()
.name("processedDataStream")
.uid("processedDataStream");
DataStream<String> sideOutputObjectStream = processedDataStream.getSideOutput(sideOutputTag)
.flatMap(new FlatMapFunction<SideOutputObject, String>() {
@Override
public void flatMap(SideOutputObject sideOutputObject, Collector<String> collector) throws Exception {
System.out.println("sideOutputObject in side output flat map!");
collector.collect(sideOutputObject.toString());
}
})
.startNewChain()
.name("sideOutputStream")
.uid("sideOutputStream");
sideOutputObjectStream.addSink(new FlinkKafkaProducer010<>(
"sideOutputKafkaTopic",
new SimpleStringSchema(),
kafkaSinkProperties)
).name("sideOutput-KafkaSink")
.uid("sideOutput-KafkaSink");
Die flatmap nie zeigen, dass es Aufzeichnungen im Cluster GUI und die System.out.printin Nachricht wird nie geschrieben stdout entweder erhalten hat.
Jede Hilfe würde sehr geschätzt werden. Danke im Voraus!