Ich untersuche gerade ProcessWindowFunction
in Flinks neuem Release. Es besagt, dass der ProcessWindowFunction
globalen Status und Fensterstatus unterstützt. Ich benutze Scala API, um es zu versuchen. Ich kann den globalen Staat bisher arbeiten lassen, aber ich habe kein Glück, um den Fensterzustand zu erreichen. Ich bearbeite Systemprotokolle und zähle die Anzahl der Logs, die nach Hostname und Schweregrad verschlüsselt sind. Ich möchte den Unterschied in der Protokollzählung zwischen zwei benachbarten Fenstern berechnen. Hier ist mein Code implementiert ProcessWindowFunction
.Flink: ProcessWindowFunction
class LogProcWindowFunction extends ProcessWindowFunction[LogEvent, LogEvent, Tuple, TimeWindow] {
// Create a descriptor for ValueState
private final val valueStateWindowDesc = new ValueStateDescriptor[Long](
"windowCounters",
createTypeInformation[Long])
private final val reducingStateGlobalDesc = new ReducingStateDescriptor[Long](
"globalCounters",
new SumReduceFunction(),
createTypeInformation[Long])
override def process(key: Tuple, context: Context, elements: Iterable[LogEvent], out: Collector[LogEvent]): Unit = {
// Initialize the per-key and per-window ValueState
val valueWindowState = context.windowState.getState(valueStateWindowDesc)
val reducingGlobalState = context.globalState.getReducingState(reducingStateGlobalDesc)
val latestWindowCount = valueWindowState.value()
println(s"lastWindowCount: $latestWindowCount ......")
val latestGlobalCount = if (reducingGlobalState.get() == null) 0L else reducingGlobalState.get()
// Compute the necessary statistics and determine if we should launch an alarm
val eventCount = elements.size
// Update the related state
valueWindowState.update(eventCount.toLong)
reducingGlobalState.add(eventCount.toLong)
for (elem <- elements) {
out.collect(elem)
}
}
}
Ich bekomme immer 0
Wert aus dem Fenster Zustand anstelle der bisherigen aktualisierten Zählung soll es sein. Ich habe seit Tagen mit solchen Problemen zu kämpfen. Kann mir bitte jemand helfen, es herauszufinden? Vielen Dank.
Vielen Dank für Ihre Antwort. Tatsächlich wurde der Code, den ich gepostet habe, von der angegebenen URL nachgeahmt. Was ich dachte, ist, dass wir Zustände pro Fenster initiieren, Berechnungen mit diesen Zuständen durchführen und diese schließlich aktualisieren können, damit wir sie im nächsten Fenster abrufen können. Daher würde ich erwarten, dass der Wert des Fensterstatus im letzten Aufruf von 'ProcessWindowFunction' beibehalten wird. Ich bin verwirrt, warum Sie erwarten würden, dass '' nextWindowCount' 'null ist, auch wenn ich den entsprechenden Fensterzustand aktualisiere, wenn 'ProcessWindowFunction' aufgerufen wird. Missverstehe ich die Verwendung von pro-Fenster-Zustand? –
Ich habe meine Antwort erweitert, um hoffentlich klarer zu sein. Lass es mich wissen, wenn du noch Fragen hast. Grundsätzlich läuft es darauf hinaus, dass das "nächste Fenster" nicht das gleiche Fenster ist und keinen Zugriff auf den Fensterstatus des vorherigen Fensters hat. –
Vielen Dank für Ihr Update auf die Antwort. Tatsächlich habe ich die Definition des per-window-Zustands falsch verstanden. Jetzt kann ich es komplett herausfinden. –