2017-09-11 6 views
0

Ich möchte für jedes Wort die Anzahl der Wörter mit Fensterfunktion erhalten:Wie auf Flink im Fensterstromzähler jedes Wortes bekommen

Wenn ich diesen Code verwenden:

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 

I-Ausgang erhalten nach 5 seconde (gefensterten Zeit) wie folgt aus:

input:

first input : hello 
seconde input : hello 
third input : word 
fifth input : hello 
sixth input : word 

Ausgang

first output : hello : 3 | word : 2 

aber ich möchte für jedes Wort die Ausgabe mit Zahl haben.

wie folgt aus: Eingang:

first input: hello 
seconde input:hello 
third input:word 
fifth input:hello 
sixth input:word 

heraus gesetzt:

first output: hello : 1 
seconde output:hello : 2 
third output:word : 1 
fifth output:hello : 3 
sixth output:word : 2 

wie kann ich das tun?

Antwort

0

Wäre das Beispielprogramm für die Kafka Streaming API nicht genau das, wonach Sie suchen? https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#example-program

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 
+0

Das Beispiel Druck alle 5 Second Und ich möchte für jedes Ereignis drucken. – FlinkNoob

+0

Um ein wenig Klarheit zu schaffen, fragen Sie nach Verarbeitungszeitfenstern (im Gegensatz zu Ereigniszeitfenstern), und sind die Fenster ausgerichtet (dh wollen Sie die Zähler für alle Schlüssel auf Null gleichzeitig gelöscht werden, jeder fünf Sekunden)? –

+0

Ja, ich frage nach Verarbeitungszeit Windows, ich möchte nicht die Daten auf Null zu reinigen. Ich möchte nur die Anzahl der gleichen Wörter aus den letzten 5 Sekunden für jedes Wort zählen. – FlinkNoob

Verwandte Themen