2016-10-11 2 views
1

Ich möchte etwas tun, ich fühle mich sehr seltsam in Spark Streaming und ich möchte ein Feedback dazu bekommen.Berechnen Sie einen Durchschnitt in einer RDD und filtern Sie dann diese RDD basierend auf dem Durchschnitt in Spark Streaming

Ich habe einen DStream eines Tupel (String, Int). Nehmen wir an, die Zeichenfolge ist eine ID und die Ganzzahl ein Wert.

Also für ein Microbatch, ich möchte den Durchschnitt des Feldes Int berechnen, und basierend auf diesem Durchschnitt möchte ich das gleiche Microbatch filtern, zum Beispiel Feld2> Durchschnitt. Also schrieb ich diesen Code:

lineStreams 
    .foreachRDD(
    rdd => { 
     val totalElement = rdd.count() 
     if(totalElement > 0) { 
     val totalSum = rdd.map(elem => elem.apply(1).toInt).reduce(_ + _) 
     val average = totalSum/totalElement 
     rdd.foreach(
      elem => { 
      if(elem.apply(1).toInt > average){ 
       println("Element is higher than average") 
      } 
      } 
     ) 
     } 
    }) 

Aber eigentlich dieser Code wird nicht ausgeführt, der erste Teil der Berechnung sieht ok, aber nicht den Test. Ich weiß, dass es in diesem Code einige schmutzige Dinge gibt, aber ich möchte nur wissen, ob die Logik gut ist.

Danke für Ihre Ratschläge!

Antwort

0

Versuchen:

lineStreams.transform { rdd => { 
    val mean = rdd.values.map(_.toDouble).mean 
    rdd.filter(_._2.toDouble > mean) 
}} 
Verwandte Themen