2015-06-29 20 views
9

Beginnend mit einem Spark DataFrame, um eine Vektormatrix für die weitere Analyseverarbeitung zu erstellen.Iterieren durch eine Spark-RDD

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache() 
feature_matrix_vectors.first() 

Die Ausgabe ist ein Array von Vektoren. Einige dieser Vektor haben eine Null in ihnen

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]) 
... 
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null]) 

Daraus i durch die Vektor-Matrix zu durchlaufen möchten, und erstellen Sie eine LabeledPoint Array mit 0 (Null), wenn der Vektor eine Null enthält, ansonsten mit einem 1.

mit

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums 
next(feature_matrix_labeledPoint) # Run the iteration protocol 

def f(row): 
    if row.contain(None): 
     LabeledPoint(1.0,row) 
    else: 
     LabeledPoint(0.0,row) 

ich habe versucht, durch die Vektor-Matrix zu durchlaufen, aber das funktioniert nicht.

TypeError: 'PipelinedRDD' object is not iterable 

Jede Hilfe wäre für eine Python-Listen

+0

Diese SO Antwort hat die Details http://stackoverflow.com/a/25296061/429476 –

Antwort

7

RDDs sind nicht ein direkter Ersatz groß sein. Sie müssen entweder Aktionen oder Transformationen verwenden, die auf einer gegebenen verfügbar sind. Hier können Sie einfach map verwenden:

from pyspark.mllib.linalg import DenseVector 
from pyspark.mllib.regression import LabeledPoint 


feature_matrix_vectors = sc.parallelize([ 
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]), 
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None]) 
]) 

(feature_matrix_vectors 
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v)) 
    .collect())