2016-04-26 7 views
1

Ich versuche, zwei Streams mit Apache Flink Streaming-API zu verbinden, aber nichts verbunden ist, und ich habe keine Ahnung, nach der Lektüre docs, was habe ich falsch gemachtJoin von zwei Strömen nicht funktioniert

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola"))) 
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>()); 
    DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela"))) 
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>()); 
    DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector). 
      window(GlobalWindows.create()).apply(joinFunction); 
    joined.print(); 
    env.execute("Window"); 

Key-Funktion ist einfach myPojo.getFirst()

Antwort

2

Das Fenster GlobalWindows wird nur ausgelöst, wenn Sie eine benutzerdefinierte Trigger angeben. In Ihrem Beispiel, wenn Sie etwas wie TumblingEventTimeWindows.of(Time.seconds(5)) verwenden, sollten Sie Ergebnisse sehen.

+0

Gibt es also eine Möglichkeit, zwei Streams in ihrer vollständigen Geschichte zu verbinden, mit Streaming-API? Ich möchte die Batch-API nicht verwenden, weil ich das erste Ergebnis so schnell wie möglich erhalten möchte, aber vielleicht gibt es einen Parameter zur Optimierung der Batch-API. – Artur

+0

Es ist nicht möglich, sie in ihrer vollen Geschichte zu verbinden, da es sich um (technisch) unendliche Ströme handelt. Sie können einen Trigger angeben, der regelmäßig mit einem bestimmten Intervall ausgelöst wird. Auf diese Weise würden Sie sich dem anschließen, was gerade dort ist. Dazu würden Sie beispielsweise 'ContinuousProcessingTimeTrigger.of (Time.minutes (40))' 'verwenden. Wenn Sie den Inhalt beim Auslösen entfernen möchten, können Sie ihn auch in einen 'PurgingTrigger' einbinden. – aljoscha