Ich möchte eine Matrix mit Werten aus einer rdd
mit einer pyspark accumulator additively füllen; Ich fand die Dokumente etwas unklar. Hinzufügen eines Hintergrunds, nur für den Fall, dass es relevant ist.
Meine rddData
enthält Listen von Indizes, für die eine Zählung zur Matrix hinzugefügt werden muss. Zum Beispiel diese Liste zuordnet Indizes:
[1,3,4] -> (11), (13), (14), (33), (34), (44)
pyspark matrix accumulator
Hier ist mein Akku:
from pyspark.accumulators import AccumulatorParam
class MatrixAccumulatorParam(AccumulatorParam):
def zero(self, mInitial):
import numpy as np
aaZeros = np.zeros(mInitial.shape)
return aaZeros
def addInPlace(self, mAdd, lIndex):
mAdd[lIndex[0], lIndex[1]] += 1
return mAdd
Also das ist meine Mapper Funktion ist:
def populate_sparse(lIndices):
for i1 in lIndices:
for i2 in lIndices:
oAccumilatorMatrix.add([i1, i2])
und dann die Daten aus:
oAccumilatorMatrix = oSc.accumulator(aaZeros, MatrixAccumulatorParam())
rddData.map(populate_sparse).collect()
Jetzt, wenn ich auf meine da schaue ta:
sum(sum(oAccumilatorMatrix.value))
#= 0.0
Was es nicht sein sollte. Was vermisse ich?
BEARBEITEN Versucht dies zuerst mit einer dünn besetzten Matrix, bekam diese Rückverfolgung, dass dünn besetzte Matrizen nicht unterstützt werden. Frage für dichte numpige Matrix geändert:
...
raise IndexError("Indexing with sparse matrices is not supported"
IndexError: Indexing with sparse matrices is not supported except boolean indexing where matrix and index are equal shapes.