2017-05-11 1 views
2

Ich habe einen Anwendungsfall, wo wir Muster in Daten innerhalb eines Fensters finden müssen. Wir experimentieren mit Structured Streaming. Wir haben einen kontinuierlichen Strom von Ereignissen und suchen nach Mustern wie Ereignis A (Gerätetrennung) gefolgt von Ereignis B (Gerätewiederverbindung) innerhalb von 10 Sekunden. oder Ereignis A (Disconnect) wird nicht von Ereignis B (Reconnect) innerhalb von 10 Sekunden gefolgt.Erhalten Sie alle Zeilen eines Fensters in Spark strukturiertem Streaming

Ich dachte über die Verwendung eines Fensterfunktionsgruppierungs-Datasets in 10-Sekunden-Fenster-Buckets nach und suchte jedes Mal nach dem Muster, wenn die Fensterwerte aktualisiert wurden. Es sieht so aus, als ob die Fensterfunktion wirklich als groupBy im strukturierten Streaming verwendet wird, was mich zwingt, Aggregatfunktionen zu verwenden, um hohe agg-Werte für Spalten zu erhalten.

Ich frage mich, ob es eine Möglichkeit gibt, alle Werte der Spalte zu durchlaufen, wenn die Fensterfunktion im strukturierten Streaming verwendet wird.

+0

Haben Sie einen Weg finden, das zu tun, ohne die Gruppierung? Ich habe einen ähnlichen Anwendungsfall, bei dem ich keine Aggregation auf hoher Ebene durchführen möchte, sondern die Ereignisse in einem Windows-Fenster abrufen und CEP wie oben erwähnt ausführen soll. –

+0

@BiplobBishwas nein. Am Ende habe ich stattdessen dstream verwendet und nach Schlüssel gruppiert. –

+0

Danke für die Antwort, genau das wollen wir jetzt machen. Wenn das nicht so funktioniert, wie wir es erwarten, könnten wir bald zu Flink CEP wechseln. Wie auch immer, danke für die Antwort. –

Antwort

0

Sie könnten versuchen, mit mapGroupsWithState (structured Streaming) oder mapWithState (DStreams), es klingt wie es könnte gut für Ihren Fall funktionieren.

Sie können einen beliebigen Status für einen beliebigen Schlüssel beibehalten und den Status jedes Mal aktualisieren, wenn ein Update kommt. Sie können für jede Taste auch eine Zeitüberschreitung festlegen, nach deren Ablauf der Status gelöscht wird. Für Ihren Anwendungsfall könnten Sie den Anfangszustand für Ereignis A als den Zeitstempel von A speichern, und wenn Ereignis B kommt, können Sie überprüfen, ob der Zeitstempel innerhalb von 10 Sekunden von A liegt. Wenn dies der Fall ist, generieren Sie ein Ereignis.

Sie können möglicherweise auch Zeitlimits dafür verwenden, z. setze den Anfangszustand, wenn A kommt, setze das Timeout auf 10s, und wenn A noch da ist, wenn B kommt, dann erzeuge ein Ereignis.

Good blog post auf die Unterschiede b/w mapGroupsWithState und mapWithState