2016-04-29 6 views
0

Ich analysiere Streaming Spark, und es funktioniert gut außer Drucken RMSE. Es gibt also die Vorhersagen output.print() aus, es zeigt jedoch nicht RMSE. Irgendeine Idee?Spark-Streaming druckt einige Daten nicht

def calculateRMSE(output: DStream[(Double, Double)], n: DStream[Long]): Double = { 
    var summse = 0.0 
    output.foreachRDD { rdd => 
     rdd.map { 
      case pair: (Double, Double) => 
      val err = math.abs(pair._1 - pair._2) 
      err*err 
     }.foreach(summse += _) 
    } 
    math.sqrt(summse) 
    } 

//.. 
val trainingData = ssc.textFileStream("file:///home/gosper/Desktop/data/streaming/train").map(LabeledPoint.parse).cache() 
val testData = ssc.textFileStream("file:///home/gosper/Desktop/data/streaming/test").map(LabeledPoint.parse) 

model.trainOn(trainingData) 
val output = model.predictOnValues(testData.map(lp => (lp.label, lp.features))) 

output.print() 

val rmse = calculateRMSE(output,testData.count()) 
println(s"RMSE = $rmse") 

ssc.start() 
ssc.awaitTermination() 

Antwort

0

Erstens würde ich äußere var summse Aktualisierung zu vermeiden, legen nahe, und verwenden .sum (was eine Operation in der Karte zu reduzieren, ist/reduzieren Rahmen)

UPD ich über die Lösung zunächst etwas falsch war. Hier ist noch eine Idee. So drucken Sie den Wert, nachdem der Stream beendet wurde.

var rmse : Double = 0 
output.map { 
    case pair: (Double, Double) ⇒ 
    val err = math.abs(pair._1 - pair._2) 
    err*err 
}.reduce(_ + _).map(math.sqrt).foreachRDD(rdd ⇒ rdd map { rmse = _ }) 

ssc.start() 
ssc.awaitTermination() 

println(s"RMSE = $rmse") 
+0

Danke. Ich habe das versucht, aber es sieht so aus, als wäre 'summse'' Unit' anstatt 'Double':' val summse = output.foreachRDD {rdd => rdd.map { Fallpaar: (Double, Double) => val err = Math.abs (pair._1 - pair._2) err * err } .sum() } math.sqrt (summse) ' – Klue

+0

erwägen sie es wie folgt aus: ' val summse = output.map {Fallpaar: (Double, Double) => val err = math.abs (Paar._1 - Paar._2); err * err}} .reduce (_ + _) val res = math.sqrt (summse) ' – akivanov

+0

Es besagt, dass' math.sqrt' 'Double' statt' DStream [Double] 'erwartet, das ist jetzt der Typ von 'summse'. Außerdem sollte wahrscheinlich '} .reduce' statt'}} .reduce' sein. – Klue

Verwandte Themen