2015-12-08 7 views
9

Ich bin in einigen Schwierigkeiten zu verstehen die Semantik rund um Event Time Windowing. Das folgende Programm generiert einige Tupel mit Zeitstempeln, die als Ereigniszeit verwendet werden und eine einfache Fensteraggregation durchführen. Ich würde erwarten, dass die Ausgabe in der gleichen Reihenfolge wie die Eingabe ist, aber die Ausgabe ist anders angeordnet. Warum ist die Ausgabe in Bezug auf die Ereigniszeit nicht in Ordnung?Flink Streaming-Ereignis Zeitfenster Bestellung

import java.util.concurrent.TimeUnit 
import org.apache.flink.streaming.api.TimeCharacteristic 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.streaming.api.scala._ 

object WindowExample extends App { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.getConfig.enableTimestamps() 
    env.setParallelism(1) 

    val start = 1449597577379L 
    val tuples = (1 to 10).map(t => (start + t * 1000, t)) 

    env.fromCollection(tuples) 
     .assignAscendingTimestamps(_._1) 
     .timeWindowAll(Time.of(1, TimeUnit.SECONDS)) 
     .sum(1) 
     .print() 

    env.execute() 
} 

Der Eingang:

(1449597578379,1) 
(1449597579379,2) 
(1449597580379,3) 
(1449597581379,4) 
(1449597582379,5) 
(1449597583379,6) 
(1449597584379,7) 
(1449597585379,8) 
(1449597586379,9) 
(1449597587379,10) 

Ergebnis:

[info] (1449597579379,2) 
[info] (1449597581379,4) 
[info] (1449597583379,6) 
[info] (1449597585379,8) 
[info] (1449597587379,10) 
[info] (1449597578379,1) 
[info] (1449597580379,3) 
[info] (1449597582379,5) 
[info] (1449597584379,7) 
[info] (1449597586379,9) 

Antwort

10

Der Grund für dieses Verhalten ist, dass die Reihenfolge der Elemente in Flink (in Bezug auf den Zeitstempel) in nicht getroffen Konto. Nur die Korrektheit von Wasserzeichen und ihre Beziehung zu den Zeitstempeln von Elementen ist wichtig für Operationen, die Zeit berücksichtigen, da die Wasserzeichen normalerweise die Berechnung in zeitbasierten Operationen auslösen.

In Ihrem Beispiel speichert der Fensteroperator alle Elemente aus der Quelle in internen Fensterpuffern. Dann gibt die Quelle ein Wasserzeichen aus, das besagt, dass in der Zukunft keine Elemente mit einem kleineren Zeitstempel ankommen werden. Dies weist den Fensterbediener wiederum an, alle Fenster mit Endzeitstempeln zu verarbeiten, die unter den Wasserzeichen liegen (was für alle Fenster gilt). Sie sendet also alle Fenster (mit beliebiger Reihenfolge) aus und gibt danach selbst ein Wasserzeichen aus. Die dahinter liegenden Operationen erhalten selbst die Elemente und können die Verarbeitung durchführen, sobald sie Wasserzeichen erhalten haben.

Das Intervall, in dem Wasserzeichen von Quellen ausgegeben werden, beträgt standardmäßig 200 ms. Mit der kleinen Menge an Elementen, die Ihre Quelle emittiert, werden alle emittiert, bevor das erste Wasserzeichen emittiert wird. In einem realen Anwendungsfall, in dem die Wasserzeichen-Emissionsintervalle viel kleiner als die Fenstergröße sind, würden Sie das erwartete Verhalten von Fenstern in der Reihenfolge ihres Zeitstempels erhalten. Zum Beispiel, wenn Sie alle 500 ms 1 Stunde Windows und Wasserzeichen haben.

+1

Können Sie auf ein Beispiel einer nachgeordneten Operation verweisen oder verweisen, die Elemente basierend auf der Ereigniszeit neu anordnen könnte, nachdem sie ein Wasserzeichen erhalten hat? Vielen Dank! –

+1

@ MaximKolchin solche Neuordnung geschieht z. in der CEP-Bibliothek. Sie können hier nachsehen: https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java –