2016-11-06 3 views
0

Ich verwende Flink 1.2-Snapshot. Meine Daten sieht wie folgt aus:Flink Streaming Windowing - Das letzte Ereignis jedes Fensters gehört zum nächsten Fenster

  • id = 25398102, sourceId = 1, ts = 2016.10.15 00.00.56, user = 14, Wert = 919
  • id = 25398185, sourceId = 1, ts = 2016-10-15 00:01:06, user = 14, Wert = 920
  • id = 25398210, sourceId = 1, ts = 2016-10-15 00:01:16, user = 14, value = 944
  • id = 25.398.235, sourceId = 1, ts = 2016.10.15 00.01.24, user = 3149, Wert = 944
  • id = 25.398.236, sourceId = 1, ts = 2016-10 -15 00:01:25, user = 71, Wert = 955
  • id = 25398239, sauer ceId = 1, ts = 2016-10-15 00:01:26, Benutzer = 71, Wert = 955
  • ID = 25398265, sourceId = 1, ts = 2016-10-15 00:01:36, user = 71, Wert = 955
  • id = 25.398.310, sourceId = 1, ts = 2016.10.15 00.02.16, user = 14, Wert = 960
  • id = 25.398.320, sourceId = 1, TS = 2016 -10-15 00.02.26, user = 14, Wert = 1000

ich den folgenden Code bin mit Windows basierten Benutzer-IDs zu erstellen:

stream.flatMap(new LogsParser()) 
      .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) 
      .keyBy("sourceId") 
      .window(GlobalWindows.create()) 
      .trigger(PurgingTrigger.of(new MySessionTrigger())) 
      .apply(new SessionWindowFunction()) 
      .print(); 

MySession Trigger sieht in die rece ieved-Ereignis und überprüfen Sie die Benutzer-ID, um das Fenster für Benutzer-ID-Änderungen auszulösen. Die SessionWindowFunction erstellt einfach eine Sitzung außerhalb des Fensters.

Hier sind die Sitzungen erstellt:

  1. Session:

    • id = 25398102, sourceId = 1, ts = 2016.10.15 00.00.56, user = 14, value = 919
    • id = 25.398.185, sourceId = 1, ts = 2016.10.15 00.01.06, user = 14, Wert = 920
    • id = 25.398.210, sourceId = 1, ts = 2016-10 -15 00:01:16, Benutzer = 14, Wert = 944
    • id = 25398235, sourceId = 1, ts = 2016.10.15 00.01.24, user = 3149, Wert = 944
  2. Session:

    • id = 25.398.236, sourceId = 1, ts = 2016-10-15 00:01:25, Benutzer = 71, Wert = 955
    • ID = 25398239, sourceId = 1, ts = 2016-10-15 00:01:26, user = 71, value = 955
    • id = 25.398.265, sourceId = 1, ts = 2016.10.15 00.01.36, user = 71, Wert = 955
    • id = 25.398.310, sourceId = 1, ts = 2016-10 -15 00:02:16, Benutzer = 14, Wert = 960
  3. Session:

    • id = 25.398.320, sourceId = 1, ts = 2016.10.15 00.02.26, user = 14, Wert = 1000

Das Problem, wie Sie sehen können, ist, dass in jeder Sitzung das letzte Ereignis tatsächlich zum nächsten Fenster gehört. Die Entscheidung, das Fenster zu starten, ist irgendwie zu spät, da das letzte Ereignis bereits im Fenster ist.

Wie kann ich das Fenster auslösen, ohne das letzte Ereignis in diesem Fenster zu berücksichtigen?

Antwort

0

Eine Idee wäre, eine Flatmap zu verwenden, um Marker in den Stream einzufügen, wenn sich die Benutzer-ID ändert. Dann kann Ihre Trigger-Funktion ausgelöst werden, wenn eine dieser Markierungen angezeigt wird, und Ihre Sitzungsfensterfunktion kann die Markierungen herausfiltern.

Verwandte Themen