Ich bin in einigen Schwierigkeiten zu verstehen die Semantik rund um Event Time Windowing. Das folgende Programm generiert einige Tupel mit Zeitstempeln, die als Ereigniszeit verwendet werden und eine einfache Fensteraggregation durchführen. Ich würde erwarten, dass die Ausgabe in der gleichen Reihenfolge wie die Eingabe ist, aber die Ausgabe ist anders angeordnet. Warum ist die Ausgabe in Bezug auf die Ereigniszeit nicht in Ordnung?Flink Streaming-Ereignis Zeitfenster Bestellung
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object WindowExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.enableTimestamps()
env.setParallelism(1)
val start = 1449597577379L
val tuples = (1 to 10).map(t => (start + t * 1000, t))
env.fromCollection(tuples)
.assignAscendingTimestamps(_._1)
.timeWindowAll(Time.of(1, TimeUnit.SECONDS))
.sum(1)
.print()
env.execute()
}
Der Eingang:
(1449597578379,1)
(1449597579379,2)
(1449597580379,3)
(1449597581379,4)
(1449597582379,5)
(1449597583379,6)
(1449597584379,7)
(1449597585379,8)
(1449597586379,9)
(1449597587379,10)
Ergebnis:
[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Können Sie auf ein Beispiel einer nachgeordneten Operation verweisen oder verweisen, die Elemente basierend auf der Ereigniszeit neu anordnen könnte, nachdem sie ein Wasserzeichen erhalten hat? Vielen Dank! –
@ MaximKolchin solche Neuordnung geschieht z. in der CEP-Bibliothek. Sie können hier nachsehen: https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java –