2016-03-25 14 views
1

Ich bin wirklich neu auf Spark und Scala, und ich verwende ReduceByKeyAndWindows, um Wörter in Kafka-Nachrichten zu zählen, weil ich Fensterfunktion verwenden muss.Neustart ReduceByKeyAndWindows

der Zweck meiner Anwendung wird eine Warnung gesendet, wenn "x" mal Nachrichten von Kafka entdeckt, die ein bestimmtes Wort in einer bestimmten Zeit enthält. Starten Sie dann von Anfang an neu.

Der Code unten erkennt das Wort, aber ich kann nicht machen, dass meine Anwendung neu gestartet wird. Ich denke, wenn möglich, starten Sie die Akkumulation von ReduceByKeyAndWindows oder eine andere Möglichkeit, dies zu tun.

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 

object KafKaWordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafKaWordCount") 
    val ssc = new StreamingContext(conf, Seconds(2)) 

    ssc.checkpoint("checkpoint") 

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 
    val wordCounts = 
     lines.map(x => (x, 1)) 
      .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2) 

     //if the value from the key (word) exceeds 10 , sent alert and Restart the values 
     wordCounts.print() 
     ssc.start() 
     ssc.awaitTermination() 
    } 
} 

das zweite Beispiel von Yuval Itzchakov Verwendung und die Anzahl der Reichweite von 10 bis 3 und Senden 7 Meldungen zu reduzieren. Wenn Sie mit Funken 1.6.0 und höher, können Sie die experimentellen DStream.mapWithState verwenden zu halten

Die Ausgabe aus dem zweiten asnwer ist

Word: hello reached count: 1 
Word: hello reached count: 2 
//No print this message, its OK but the next word not start with 1 
Word: hello reached count: 4 
Word: hello reached count: 5 
Word: hello reached count: 6 
Word: hello reached count: 7 

Die Ausgabe, die ich

Word: hello reached count: 1 
Word: hello reached count: 2 

Word: hello reached count: 1 
Word: hello reached count: 2 

Word: hello reached count: 1 
+1

Es ist ein bisschen schwer zu verstehen, was Sie wollen, aber ich habe den Eindruck, dass Sie tatsächlich eine allgemeinere Funktion wie 'updateStateByKey' benötigen. – zero323

+0

danke, ich werde darüber lesen, Sie können mir ein Beispiel zeigen, wie ich die Funktion nutzen kann? nehme an, dass ich recibe dies: ** Hallo ** i dies akkumulieren ** (Hallo, 1) ** ** (hallo, 2) ** ** (hallo, 3) ** Und wenn , ** hallo ** übertrifft ** 5 ** ich werde den alarm senden und ** hallo ** muss ** 0 ** sein und neu beginnen .. Entschuldigung, ich bin auch neu auf dieser Seite. Danke noch einmal! –

+0

Nichtsdestotrotz dieses hübsche Beispiel: http://StackOverflow.com/a/35565682/1560062 – zero323

Antwort

0

erwarten Der aktualisierte Status Ihrer Wortzählung. Sobald das Limit erreicht, können Sie den Zustand entfernen und lassen Sie ihn an der Pipeline, und drucken Sie es aus mit DStream.foreach:

object KafKaWordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf() 
    .setMaster("local[2]") 
    .setAppName("KafKaWordCount") 

    val ssc = new StreamingContext(conf, Seconds(2)) 
    ssc.checkpoint("checkpoint") 

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 
    val stateSpec = StateSpec.function(updateWordCount _) 

    lines.map(x => (x, 1)) 
     .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2) 
     .mapWithState(stateSpec) 
     .filter(_.isDefined) 
     .foreachRDD(rdd => 
        rdd.foreach { case (word, count) => 
        println(s"Word: $word reached count: $count") }) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

    def updateWordCount(key: String, 
         value: Option[Int], 
         state: State[(String, Int)]): Option[(String, Int)] = { 
    def updateCountState(count: Int): Option[(String, Int)] = { 
     if (count == 10) { 
     if (state.exists()) state.remove() 
     Some((key, count)) 
     } 
     else { 
     state.update((key, count)) 
     None 
     } 
    } 

    value match { 
     case Some(count) => updateCountState(count) 
     case _ => None 
    } 
    } 
} 

Wenn nicht, können Sie auf die langsamere DStream.updateStateByKey verschieben:

object KafKaWordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf() 
    .setMaster("local[2]") 
    .setAppName("KafKaWordCount") 
    val ssc = new StreamingContext(conf, Seconds(2)) 

    ssc.checkpoint("checkpoint") 

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 

    lines.map(x => (x, (x, 1))) 
     .reduceByKeyAndWindow((first: (String, Int), second: (String, Int)) => 
           (first._1, first._2 + second._2), Seconds(60), Seconds(60), 2) 
     .updateStateByKey(updateSeqCount _) 
     .print(1) 

    ssc.start() 
    ssc.awaitTermination() 
    } 

    def updateSeqCount(values: Seq[(String, Int)], 
        state: Option[(String, Int]): Option[(String, Int)] = { 
     if (values.isEmpty) state 
     else { 
      val (word, count) = values.head 
      if (count == 10) { 
      println(s"Key: $word reached count $count!") 
      None 
      } 
      else Some((word, count)) 
     } 
    } 
} 
+0

Vielen Dank, ich werde es versuchen –

+0

Wie StateSpec verwenden? Ich möchte reproduzieren Ihr Beispiel, ich vermisse einige Import? Wirklich Entschuldigung für meine Fragen! Ich bin wirklich neu mit Funken. –

+0

Siehe das Buttom Beispiel mit 'updateStateByKey', nicht die obere.' StateSpec' ist für 1.6.0 –