2017-11-28 3 views
1

Ich habe einen Job, der Flinks Side-Output-Fähigkeit nutzt, um Daten in verschiedene Kafka-Senken zu schreiben. Die Seitenausgabe ruft Daten ab, wenn sie in der IDE ausgeführt wird, aber nicht im Flink-Cluster. Irgendeine Idee warum? HierDaten, die auf IDE-Seite geschrieben werden, aber nicht auf Cluster

ist ein Beispiel für den Code:

final OutputTag<SideOutputObject> sideOutputTag = new OutputTag<SideOutputObject>("side-output-tag"){}; 

SingleOutputStreamOperator<String> processedDataStream = outputStream 
       .process(new ProcessAndSortBinaryData(sideOutputTag)) 
       .startNewChain() 
       .name("processedDataStream") 
       .uid("processedDataStream"); 

DataStream<String> sideOutputObjectStream = processedDataStream.getSideOutput(sideOutputTag) 
       .flatMap(new FlatMapFunction<SideOutputObject, String>() { 
        @Override 
        public void flatMap(SideOutputObject sideOutputObject, Collector<String> collector) throws Exception { 
         System.out.println("sideOutputObject in side output flat map!"); 
         collector.collect(sideOutputObject.toString()); 
        } 
       }) 
       .startNewChain() 
       .name("sideOutputStream") 
       .uid("sideOutputStream"); 


sideOutputObjectStream.addSink(new FlinkKafkaProducer010<>(
        "sideOutputKafkaTopic", 
        new SimpleStringSchema(), 
        kafkaSinkProperties) 
      ).name("sideOutput-KafkaSink") 
        .uid("sideOutput-KafkaSink"); 

Die flatmap nie zeigen, dass es Aufzeichnungen im Cluster GUI und die System.out.printin Nachricht wird nie geschrieben stdout entweder erhalten hat.

Jede Hilfe würde sehr geschätzt werden. Danke im Voraus!

Antwort

0

Ich habe gerade versucht, die Prozessfunktion ProcessAndSortBinaryData() - Logik zu einer anonymen inneren Funktion (nach dem erneuten Lesen der Dokumentation ...) zu verschieben und es funktioniert jetzt im Cluster.

Verwandte Themen