Ich trainiere ein LogisticRegression-Modell in PYSPARK (ML-Lib) und das Ergebnis der Vorhersage ist ein DataFrame (predictions
genannt) und ich wähle nur drei Spalten davon aus wie unten.Wie auf die Werte von dichteVector in PySpark
prunedPredictions = predictions.select(predictions["prediction"], predictions["probability"], predictions["label"])
Die Säule probability
ist vom Typ DenseVector
. Ich versuche, einen anderen Datenrahmen zu erstellen, in dem die Wahrscheinlichkeitsspalte durch eines ihrer Elemente (die ersten Elemente) ersetzt wird. Ich habe verschiedene Dinge ausprobiert, aber keiner funktioniert. Hier ist eine:
prunedPredictionsDF = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1].apply(1),label = r[2]))
oder
prunedPredictionsDF = prunedPredictions.map(lambda r: Row(prediction = r[0],probability = r[1][1],label = r[2]))
Wie kann ich das tun? Ich bekomme folgende Fehlermeldung.
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1317, in first
rs = self.take(1)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/usr/lib/spark/python/pyspark/context.py", line 916, in runJob
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 235.0 failed 4 times, most recent failure: Lost task 0.3 in stage 235.0 (TID 61305, anp-r03wn03.c03.hadoop.td.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/lib/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
yield next(iterator)
File "/usr/lib/spark/python/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 728, in _parse_datatype_json_string
return _parse_datatype_json_value(json.loads(json_string))
File "/usr/lib/spark/python/pyspark/sql/types.py", line 748, in _parse_datatype_json_value
return _all_complex_types[tpe].fromJson(json_value)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 525, in fromJson
return StructType([StructField.fromJson(f) for f in json["fields"]])
File "/usr/lib/spark/python/pyspark/sql/types.py", line 425, in fromJson
_parse_datatype_json_value(json["type"]),
File "/usr/lib/spark/python/pyspark/sql/types.py", line 750, in _parse_datatype_json_value
return UserDefinedType.fromJson(json_value)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 663, in fromJson
m = __import__(pyModule, globals(), locals(), [pyClass])
File "/usr/lib/spark/python/pyspark/mllib/__init__.py", line 25, in <module>
import numpy
ImportError: ('No module named numpy', <function _parse_datatype_json_string at 0xcd9d70>, (u'{"type":"struct","fields":[{"name":"prediction","type":"double","nullable":true,"metadata":{}},{"name":"probability","type":{"type":"udt","class":"org.apache.spark.mllib.linalg.VectorUDT","pyClass":"pyspark.mllib.linalg.VectorUDT","sqlType":{"type":"struct","fields":[{"name":"type","type":"byte","nullable":false,"metadata":{}},{"name":"size","type":"integer","nullable":true,"metadata":{}},{"name":"indices","type":{"type":"array","elementType":"integer","containsNull":false},"nullable":true,"metadata":{}},{"name":"values","type":{"type":"array","elementType":"double","containsNull":false},"nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"label","type":"double","nullable":true,"metadata":{"ml_attr":{"vals":["0","1"],"type":"nominal","name":"label"}}}]}',))
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Dies ist kein Duplikat! Was passiert in letzter Zeit in SO? –
Es ist tatsächlich ... ist nicht einmal in der Nähe eines Duplikats. – chappers
Weil ich keine Antwort hinzufügen kann; Wahrscheinlich ist der beste Weg, eine UDF zu definieren, so dass Sie in der DataFrame-Welt bleiben (und ** NICHT ** in RDD konvertieren, wie in der verknüpften Frage vorgeschlagen). 'vector_udf = udf (Lambdavektor: float (Vektor [0]), DoubleType())'. Dann können Sie Folgendes tun: 'df.withColumn ('Prob_0', vector_udf (df.features)). First()'. Hier ist die [gist] (https://gist.github.com/chappers/b8ae3a92f2a52453f1f1a3affe20c4a) – chappers