2016-07-05 5 views
0

Ich würde gerne einen Akkumulator in PYSPARW, die vom Typ List ist, und akkumulieren String-Werte für Worker-Knoten. Hier ist der Code I haben:benutzerdefinierte Akkumulator Klasse in Spark

class ListParam(AccumulatorParam): 
def zero(self, v): 
    return [] 
def addInPlace(self, acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

I dann einen Akkumulator dieser Art zu definieren, wie unten

accu = sc.accumulator([], ListParam()) 

und dann verschiedene Werte, um es in den Vollstrecker wie folgt hinzufügen

accu.add("abc") 

Ich möchte den Wert abc als nur einen Wert im Akkumulator erscheinen, aber der Akkumulator fügt drei verschiedene Werte (ein PR-Zeichen) und wenn ich mir die accu Wert im Treiber sieht aus wie ['a','b','c']. Wie kann ich es so ändern, dass es nicht jedes Zeichen als separaten Eintrag im Akkumulator hinzufügt?

-------------- bearbeiten ----------------

definiert ich eine andere benutzerdefinierte Klasse für meine Akkumulator wie folgt

class VectorAccumulatorParam(AccumulatorParam): 
def zero(self, value): 
    return [0.0] * len(value) 
def addInPlace(self, val1, val2): 
    for i in range(len(val1)): 
     val1[i] += val2[i] 
    return val1  

und innerhalb des Arbeiters habe ich den folgenden Code

global accu 
accu += [accuracy] 

aber wenn ich die accu im Treiber zu drucken, ist es leer. Irgendwas falsch?

+0

Wie wird es verwendet? –

Antwort

0

Haben Sie versucht, Funke explizit zu informieren, um Ihre Operationen auszuführen, die sich mit dem Akkumulator befassen? Wie Sie wissen sollten, spark's operations are lazy, und viele Male müssen Sie rdd.collect() aufrufen, um Ihre Zuordnungen tatsächlich durchzuführen

Verwandte Themen