2016-10-22 1 views
1

Von einem zeitgefensterten, getasteten Stream möchte ich einen Stream des größten bisher gesehenen Fensters erhalten (in Bezug auf die Anzahl der Elemente am größten).Stream von Updates für das größte Zeitfenster in Flink

Zur Zeit habe ich den folgenden Code:

source 
    .keyBy(...) 
    .timeWindow(...) 
    .fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) } 
    .keyBy(_ =>()) 
    .maxBy(1) 

Das Ergebnis des fold ist ein Strom von (key, count) Elemente - so aus diesem Strom, ich einen Strom von Updates der „Taste mit der höchsten Zählung erhalten möchten ".

I Schlüssel dann mit einer Konstanten (keyBy(_ =>()) - wie dies eine globale Operation ist), und verwenden Sie maxBy - und die fast Werke: Ich bin immer einen Strom von höchsten zählt, aber die aktuelle höchste Zählung emittiert für jedes Element.

Ich denke, was ich suche ist eine Art Filter-mit-vorherigen Wert, der nur Elemente emittieren würde, wenn der neue Wert von der vorherigen abweicht.

Ist das in Flink derzeit möglich?

Antwort

2

Flink hat standardmäßig keinen solchen Filter, aber es sollte ziemlich einfach sein, einen selbst zu implementieren.

Sie können dies tun mit einer Stateful FlatMap ähnlich wie diese:

val source: DataStream[Int] = ??? 

source 
    .keyBy(_: Int => _) 
    .timeWindow(Time.minutes(10)) 
    .fold((1, 0)) { case ((_, current), key) => (key, current + 1) } 
    // move everything to the same key 
    .keyBy(_ => 0) 
    // use stateful flatmap to remember highest count and filter by that 
    .flatMapWithState((in, state: Option[Int]) => 
    // filter condition 
    if (in._2 > state.getOrElse(-1)) 
     // emit new value and update max count 
     (Seq(in), Some(in._2)) 
    else 
     // emit nothing (empty Seq()) and keep count 
     (Seq(), state) 
).setParallelism(1) 

Wenn die nicht-parallel (single threaded) Filteroperator einen Engpass wird, können Sie parallel Vorfilter hinzufügen, indem Sie einen keyBy Hinzufügen mit zufälligen Tasten und einem Stateful-Filter FlatMap mit höherer Parallelität.

+0

Ah! Ich hatte flink 1.0 im classpath, das die 'flatMapWithState' Operation nicht miteinschloß :) Danke! – adamw

+0

Wird 'setParallelism (1)' benötigt? Wird die Parallelität trotzdem nicht 1 sein, weil man mit dem gleichen Schlüssel gruppiert? – adamw

+0

Die Gruppierung erfolgt mit einer Tastenauswahlfunktion. Um diese Funktion zu flinken, ist ein schwarzes Feld, d.h., weiß nicht, dass die Funktion einen konstanten Wert zurückgibt. Flink würde mehrere parallele Instanzen starten, aber alle Daten würden auf nur eine gehen. Im Prinzip könnte die Funktion analysiert werden, um auf ihr Verhalten zu schließen, aber dies wird (noch) nicht gemacht. –

Verwandte Themen