2017-01-28 5 views
0

Ich bekomme einen Fehler beim Implementieren von AggregatByKey in Spark-Scala-Shell.Wert _2 ist kein Mitglied von Double Spark-Shell

Das Stück Code, das ich auf Scala-Shell ausführen bin versucht, ist dies,

val orderItemsMapJoinOrdersMapMapAgg = orderItemsMapJoinOrdersMapMap 
    .aggregateByKey(0.0,0)(
     (a,b) => (a._1 + b , a._2 + 1), 
     (a,b) => (a._1 + b._1 , a._2 + b._2) 
) 

Aber ich die folgende Fehlermeldung erhalten,

<console>:39: error: value _1 is not a member of Double 
     val orderItemsMapJoinOrdersMapMapAgg = orderItemsMapJoinOrdersMapMap.aggregateByKey(0.0,0)((a,b) => (a._1 + b , a._2 +1), (a,b) => (a._1 + b._1 , a._2 + b._2)) 


scala> orderItemsMapJoinOrdersMapMap 
res8: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[16] at map at <console>:37 

mir jemand doppelt helfen kann verstehen und Float-Wert-Logik und wie man es repariert

Antwort

1

Das Problem ist, dass Sie das erste Curry-Argument den falschen Weg bieten. Es sollte so etwas sein,

val orderItemsMapJoinOrdersMapMap: RDD[(String, Float)] = ... 

// so elems of your orderItemsMapJoinOrdersMapMap are (String, Float) 

// And your accumulator looks like (Double, Int) 

// thus I believe that you just want to accumulate total number of elements and sum of the floats in them 

val orderItemsMapJoinOrdersMapMapAgg = orderItemsMapJoinOrdersMapMap 
    .aggregateByKey((0.0,0))(
     (acc, elem) => (acc._1 + elem._2 , acc._2 + 1), 
     (acc1, acc2) => (acc1._1 + acc2._1 , acc1._2 + acc._2) 
) 
+0

Vielen Dank @Sarvesh Kumar Singh für schnelle und nette Antwort. Ich möchte wissen, warum ich Ausgabe Rdd des Typs RDD [(String, (Double, Int))] anstelle von RDD [(String, (Float, Int))] wie ich Flot in früheren Kartenoperation –

+0

' 0.0 ist kein "Float". Wenn Sie 'Float' möchten, können Sie' 0.0f' verwenden. –

+0

Dank @Sarvesh Kumar Singh –

Verwandte Themen