2017-10-04 3 views
0

Ich untersuche gerade ProcessWindowFunction in Flinks neuem Release. Es besagt, dass der ProcessWindowFunction globalen Status und Fensterstatus unterstützt. Ich benutze Scala API, um es zu versuchen. Ich kann den globalen Staat bisher arbeiten lassen, aber ich habe kein Glück, um den Fensterzustand zu erreichen. Ich bearbeite Systemprotokolle und zähle die Anzahl der Logs, die nach Hostname und Schweregrad verschlüsselt sind. Ich möchte den Unterschied in der Protokollzählung zwischen zwei benachbarten Fenstern berechnen. Hier ist mein Code implementiert ProcessWindowFunction.Flink: ProcessWindowFunction

class LogProcWindowFunction extends ProcessWindowFunction[LogEvent, LogEvent, Tuple, TimeWindow] { 
    // Create a descriptor for ValueState 
    private final val valueStateWindowDesc = new ValueStateDescriptor[Long](
    "windowCounters", 
    createTypeInformation[Long]) 

    private final val reducingStateGlobalDesc = new ReducingStateDescriptor[Long](
    "globalCounters", 
    new SumReduceFunction(), 
    createTypeInformation[Long]) 

    override def process(key: Tuple, context: Context, elements: Iterable[LogEvent], out: Collector[LogEvent]): Unit = { 
    // Initialize the per-key and per-window ValueState 
    val valueWindowState = context.windowState.getState(valueStateWindowDesc) 
    val reducingGlobalState = context.globalState.getReducingState(reducingStateGlobalDesc) 
    val latestWindowCount = valueWindowState.value() 
    println(s"lastWindowCount: $latestWindowCount ......") 
    val latestGlobalCount = if (reducingGlobalState.get() == null) 0L else reducingGlobalState.get() 
    // Compute the necessary statistics and determine if we should launch an alarm 
    val eventCount = elements.size 
    // Update the related state 
    valueWindowState.update(eventCount.toLong) 
    reducingGlobalState.add(eventCount.toLong) 
    for (elem <- elements) { 
     out.collect(elem) 
    } 
    } 
} 

Ich bekomme immer 0 Wert aus dem Fenster Zustand anstelle der bisherigen aktualisierten Zählung soll es sein. Ich habe seit Tagen mit solchen Problemen zu kämpfen. Kann mir bitte jemand helfen, es herauszufinden? Vielen Dank.

Antwort

1

Der Gültigkeitsbereich des Status pro Fenster ist eine einzelne Fensterinstanz. Im Fall Ihrer obigen Methode process ist jedes Mal, wenn es aufgerufen wird, ein neues Fenster im Gültigkeitsbereich, und daher ist der neusteWindowCount immer Null.

Für ein normales Vanilla-Fenster, das nur einmal ausgelöst wird, ist der Status pro Fenster nutzlos. Nur wenn ein Fenster irgendwie mehrere Auslösungen hat (z. B. späte Auslösungen), können Sie den Zustand pro Fenster gut ausnutzen. Wenn Sie versuchen, sich von einem Fenster zum nächsten zu erinnern, können Sie dies mit dem globalen Fensterstatus tun.

Ein Beispiel für die Verwendung des Status pro Fenster zur Erinnerung an Daten, die bei späten Zündungen verwendet werden, finden Sie in den Folien 13-19 in Flink's advanced window training.

+0

Vielen Dank für Ihre Antwort. Tatsächlich wurde der Code, den ich gepostet habe, von der angegebenen URL nachgeahmt. Was ich dachte, ist, dass wir Zustände pro Fenster initiieren, Berechnungen mit diesen Zuständen durchführen und diese schließlich aktualisieren können, damit wir sie im nächsten Fenster abrufen können. Daher würde ich erwarten, dass der Wert des Fensterstatus im letzten Aufruf von 'ProcessWindowFunction' beibehalten wird. Ich bin verwirrt, warum Sie erwarten würden, dass '' nextWindowCount' 'null ist, auch wenn ich den entsprechenden Fensterzustand aktualisiere, wenn 'ProcessWindowFunction' aufgerufen wird. Missverstehe ich die Verwendung von pro-Fenster-Zustand? –

+0

Ich habe meine Antwort erweitert, um hoffentlich klarer zu sein. Lass es mich wissen, wenn du noch Fragen hast. Grundsätzlich läuft es darauf hinaus, dass das "nächste Fenster" nicht das gleiche Fenster ist und keinen Zugriff auf den Fensterstatus des vorherigen Fensters hat. –

+0

Vielen Dank für Ihr Update auf die Antwort. Tatsächlich habe ich die Definition des per-window-Zustands falsch verstanden. Jetzt kann ich es komplett herausfinden. –