Ich verwende Flink 1.2-Snapshot. Meine Daten sieht wie folgt aus:Flink Streaming Windowing - Das letzte Ereignis jedes Fensters gehört zum nächsten Fenster
- id = 25398102, sourceId = 1, ts = 2016.10.15 00.00.56, user = 14, Wert = 919
- id = 25398185, sourceId = 1, ts = 2016-10-15 00:01:06, user = 14, Wert = 920
- id = 25398210, sourceId = 1, ts = 2016-10-15 00:01:16, user = 14, value = 944
- id = 25.398.235, sourceId = 1, ts = 2016.10.15 00.01.24, user = 3149, Wert = 944
- id = 25.398.236, sourceId = 1, ts = 2016-10 -15 00:01:25, user = 71, Wert = 955
- id = 25398239, sauer ceId = 1, ts = 2016-10-15 00:01:26, Benutzer = 71, Wert = 955
- ID = 25398265, sourceId = 1, ts = 2016-10-15 00:01:36, user = 71, Wert = 955
- id = 25.398.310, sourceId = 1, ts = 2016.10.15 00.02.16, user = 14, Wert = 960
- id = 25.398.320, sourceId = 1, TS = 2016 -10-15 00.02.26, user = 14, Wert = 1000
ich den folgenden Code bin mit Windows basierten Benutzer-IDs zu erstellen:
stream.flatMap(new LogsParser())
.assignTimestampsAndWatermarks(new MessageTimestampExtractor())
.keyBy("sourceId")
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new MySessionTrigger()))
.apply(new SessionWindowFunction())
.print();
MySession Trigger sieht in die rece ieved-Ereignis und überprüfen Sie die Benutzer-ID, um das Fenster für Benutzer-ID-Änderungen auszulösen. Die SessionWindowFunction erstellt einfach eine Sitzung außerhalb des Fensters.
Hier sind die Sitzungen erstellt:
Session:
- id = 25398102, sourceId = 1, ts = 2016.10.15 00.00.56, user = 14, value = 919
- id = 25.398.185, sourceId = 1, ts = 2016.10.15 00.01.06, user = 14, Wert = 920
- id = 25.398.210, sourceId = 1, ts = 2016-10 -15 00:01:16, Benutzer = 14, Wert = 944
- id = 25398235, sourceId = 1, ts = 2016.10.15 00.01.24, user = 3149, Wert = 944
Session:
- id = 25.398.236, sourceId = 1, ts = 2016-10-15 00:01:25, Benutzer = 71, Wert = 955
- ID = 25398239, sourceId = 1, ts = 2016-10-15 00:01:26, user = 71, value = 955
- id = 25.398.265, sourceId = 1, ts = 2016.10.15 00.01.36, user = 71, Wert = 955
- id = 25.398.310, sourceId = 1, ts = 2016-10 -15 00:02:16, Benutzer = 14, Wert = 960
Session:
- id = 25.398.320, sourceId = 1, ts = 2016.10.15 00.02.26, user = 14, Wert = 1000
Das Problem, wie Sie sehen können, ist, dass in jeder Sitzung das letzte Ereignis tatsächlich zum nächsten Fenster gehört. Die Entscheidung, das Fenster zu starten, ist irgendwie zu spät, da das letzte Ereignis bereits im Fenster ist.
Wie kann ich das Fenster auslösen, ohne das letzte Ereignis in diesem Fenster zu berücksichtigen?