2017-03-11 6 views
0

Ich bin neu bei Apache Flink und versuche, Daten von IoT-Geräten zu organisieren, die Geräte überwachen, die nach Wifi scannen. Ein typisches Ereignis wird wie folgt aussehen:Apache Flink und Ereignissequenzierung

{NodeId, Geräte-MAC-Adresse, Zeit, Typ}

Ich mag bei der Vorveranstaltung für eine MAC-Adresse suchen zu können und wenn die aktuelle Ereignis Zeit weniger als ein spezifiziertes Intervall vom letzten Ereignis für diese MAC-Adresse, zB 60 Sekunden, Ich möchte eine laufende Summe von Ereignissen für diesen MAC aktualisieren. Wenn jedoch die Intervallzeit abgelaufen ist, möchte ich das aggregierte Ereignis in eine Datenbank schreiben. Das aggregierte Ereignis würde in etwa so aussehen:

{NodeId, MAC-Adresse Letzte Veranstaltung, Veranstaltungen insgesamt, Datum, Stunde}

Um die Dinge zu komplizieren, ich stündlich und täglich laufenden Summen für jede NodeId behalten möchten Halten einer laufenden Summe für alle MAC-Adressenereignisse pro Knoten und nach Beendigung des relevanten Zeitraums, um sie an eine Datenbank auszugeben.

Ich habe die Dokumentation durchgesehen, habe aber ein wenig Mühe, die Teile zu verstehen, die ich für diese Aufgabe brauche.

Vielen Dank im Voraus

Antwort

0

Das klingt wie ein Problem von Aggregaten für session Fenster zu erzeugen.

Sie können wie etw tun:

stream.keyBy(new KeySelector<Event, Integer>() { 
    @Override 
    public Integer getKey(Event value) throws Exception { 
     return value.nodeId; 
    } 
}).window(EventTimeSessionWindows.withGap(Time.seconds(60))) 
    .apply(new ReduceFunction<LogRow>(), new WindowFunction<LogRow, Object, Integer, TimeWindow>())