2015-07-18 11 views
5

Ich versuche, zu definieren, einen Akkumulator Variable vom Typ String in Scala Shell (Fahrer), aber ich erhalte den folgenden Fehler zu erklären: -Nicht in der Lage String Speicher

scala> val myacc = sc.accumulator("Test") 
<console>:21: error: could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[String] 
     val myacc = sc.accumulator("Test") 
           ^

Dies scheint zu sein, kein Problem für Int oder Double Typ von Akku.

Dank

Antwort

10

Das ist, weil Funken standardmäßig nur Akkus vom Typ bietet Long, Double und Float. Wenn Sie etwas anderes benötigen, müssen Sie AccumulatorParam erweitern.

import org.apache.spark.AccumulatorParam 

object StringAccumulatorParam extends AccumulatorParam[String] { 

    def zero(initialValue: String): String = { 
     "" 
    } 

    def addInPlace(s1: String, s2: String): String = { 
     s"$s1 $s2" 
    } 
} 

val stringAccum = sc.accumulator("")(StringAccumulatorParam) 

val rdd = sc.parallelize("foo" :: "bar" :: Nil, 2) 
rdd.foreach(s => stringAccum += s) 
stringAccum.value 

Hinweis:

Im Allgemeinen sollten Sie vermeiden, Akkumulatoren für Aufgaben mit denen Daten über die Zeit signifikant wachsen. Sein Verhalten ähnelt group einem collect und kann im schlimmsten Fall aufgrund fehlender Ressourcen scheitern. Akkumulatoren sind vor allem für einfache Diagnoseaufgaben wie die Verfolgung von Basisstatistiken nützlich.

+0

val Akkumulation = sc.accumulator (0) (SparkContext.IntAccumulatorParam) für Integer-Akkumulator. – Neethu