2016-05-10 5 views
2

Ich trainiere ein LogisticRegression-Modell in PYSPARK (ML-Lib) und das Ergebnis der Vorhersage ist ein DataFrame (predictions genannt) und ich wähle nur drei Spalten davon aus wie unten.Wie auf die Werte von dichteVector in PySpark

prunedPredictions = predictions.select(predictions["prediction"], predictions["probability"], predictions["label"]) 

Die Säule probability ist vom Typ DenseVector. Ich versuche, einen anderen Datenrahmen zu erstellen, in dem die Wahrscheinlichkeitsspalte durch eines ihrer Elemente (die ersten Elemente) ersetzt wird. Ich habe verschiedene Dinge ausprobiert, aber keiner funktioniert. Hier ist eine:

prunedPredictionsDF = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1].apply(1),label = r[2])) 

oder

prunedPredictionsDF = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1][1],label = r[2])) 

Wie kann ich das tun? Ich bekomme folgende Fehlermeldung.

Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
File "/usr/lib/spark/python/pyspark/rdd.py", line 1317, in first 
rs = self.take(1) 
File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take 
res = self.context.runJob(self, takeUpToNumLeft, p) 
File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob 
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco 
return f(*a, **kw) 
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 235.0 failed 4 times, most recent failure: Lost task 0.3 in stage 235.0 (TID 61305, anp-r03wn03.c03.hadoop.td.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/usr/lib/spark/python/pyspark/worker.py", line 111, in main 
process() 
File "/usr/lib/spark/python/pyspark/worker.py", line 106, in process 
serializer.dump_stream(func(split_index, iterator), outfile) 
File "/usr/lib/spark/python/pyspark/serializers.py", line 263, in dump_stream 
vs = list(itertools.islice(iterator, batch)) 
File "/usr/lib/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft 
yield next(iterator) 
File "/usr/lib/spark/python/pyspark/serializers.py", line 139, in load_stream 
yield self._read_with_length(stream) 
File "/usr/lib/spark/python/pyspark/serializers.py", line 164, in _read_with_length 
return self.loads(obj) 
File "/usr/lib/spark/python/pyspark/serializers.py", line 422, in loads 
return pickle.loads(obj) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 728, in _parse_datatype_json_string 
return _parse_datatype_json_value(json.loads(json_string)) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 748, in _parse_datatype_json_value 
return _all_complex_types[tpe].fromJson(json_value) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 525, in fromJson 
return StructType([StructField.fromJson(f) for f in json["fields"]]) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 425, in fromJson 
_parse_datatype_json_value(json["type"]), 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 750, in _parse_datatype_json_value 
return UserDefinedType.fromJson(json_value) 
File "/usr/lib/spark/python/pyspark/sql/types.py", line 663, in fromJson 
m = __import__(pyModule, globals(), locals(), [pyClass]) 
File "/usr/lib/spark/python/pyspark/mllib/__init__.py", line 25, in <module> 

import numpy 
ImportError: ('No module named numpy', <function _parse_datatype_json_string at 0xcd9d70>, (u'{"type":"struct","fields":[{"name":"prediction","type":"double","nullable":true,"metadata":{}},{"name":"probability","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"label","type":"double","nullable":true,"metadata":{"ml_attr":{"vals":["0","1"],"type":"nominal","name":"label"}}}]}',)) 

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 
+0

Dies ist kein Duplikat! Was passiert in letzter Zeit in SO? –

+0

Es ist tatsächlich ... ist nicht einmal in der Nähe eines Duplikats. – chappers

+0

Weil ich keine Antwort hinzufügen kann; Wahrscheinlich ist der beste Weg, eine UDF zu definieren, so dass Sie in der DataFrame-Welt bleiben (und ** NICHT ** in RDD konvertieren, wie in der verknüpften Frage vorgeschlagen). 'vector_udf = udf (Lambdavektor: float (Vektor [0]), DoubleType())'. Dann können Sie Folgendes tun: 'df.withColumn ('Prob_0', vector_udf (df.features)). First()'. Hier ist die [gist] (https://gist.github.com/chappers/b8ae3a92f2a52453f1f1a3affe20c4a) – chappers

Antwort

4

In Pyspark können Sie die Elemente in einem DenseVector Zugriff nur über den Index.

Zum Beispiel:

from pyspark.mllib.linalg import DenseVector 

a = DenseVector([1.0,2.0,3.0,4.0,5.0]) 

print(a[1]) 
# 2.0 

Darüber hinaus können Sie iterate über die DenseVector:

for e in a: 
    print(e) 

# 1.0 
# 2.0 
# 3.0 
# 4.0 
# 5.0 

Ein anderer Weg durch ist DenseVector.values Eigenschaft verwenden, auf diese Weise können Sie alle Werte als Python erhalten list.

+0

Ich habe das in meinem Programm versucht, aber es gibt mir die Fehlermeldung, die ich zu meiner ursprünglichen Frage hinzugefügt habe. –

+0

Nun, anscheinend hat Ihr Interpreter keinen Zugriff auf numpy, Sie sollten ihn herunterladen, indem Sie anaconda oder pip verwenden. –

+0

wenn ich help ('modules') in pyspark shell ausführen, zeigt es, dass numpy da ist. –