Ich möchte für jedes Wort die Anzahl der Wörter mit Fensterfunktion erhalten:Wie auf Flink im Fensterstromzähler jedes Wortes bekommen
Wenn ich diesen Code verwenden:
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
I-Ausgang erhalten nach 5 seconde (gefensterten Zeit) wie folgt aus:
input:
first input : hello
seconde input : hello
third input : word
fifth input : hello
sixth input : word
Ausgang
first output : hello : 3 | word : 2
aber ich möchte für jedes Wort die Ausgabe mit Zahl haben.
wie folgt aus: Eingang:
first input: hello
seconde input:hello
third input:word
fifth input:hello
sixth input:word
heraus gesetzt:
first output: hello : 1
seconde output:hello : 2
third output:word : 1
fifth output:hello : 3
sixth output:word : 2
wie kann ich das tun?
Das Beispiel Druck alle 5 Second Und ich möchte für jedes Ereignis drucken. – FlinkNoob
Um ein wenig Klarheit zu schaffen, fragen Sie nach Verarbeitungszeitfenstern (im Gegensatz zu Ereigniszeitfenstern), und sind die Fenster ausgerichtet (dh wollen Sie die Zähler für alle Schlüssel auf Null gleichzeitig gelöscht werden, jeder fünf Sekunden)? –
Ja, ich frage nach Verarbeitungszeit Windows, ich möchte nicht die Daten auf Null zu reinigen. Ich möchte nur die Anzahl der gleichen Wörter aus den letzten 5 Sekunden für jedes Wort zählen. – FlinkNoob