Ich habe ein Datenframe mit einer Spalte vom Typ string mit Nur-Text und möchte diese Spalte mit pyspark.sql.functions.udf
(oder pyspark.sql.functions.UserDefinedFunction
?) Ändern.Fehler, wenn die vorhandene Funktion als UDF verwendet wird, um eine Spark-Datenrahmenspalte zu ändern
Ich benutze Python 2.7, Pyspark 1.6.1 und Flask 0.10.1 auf OSX 10.11.4.
Es scheint gut zu funktionieren, wenn ich einen Lambda-Ausdruck verwenden:
@spark.route('/')
def run():
df = ... # my dataframe
myUDF = udf(lambda r: len(r), IntegerType())
df = df.withColumn('new_'+column, myUDF(df[column]))
return render_template('index.html', data=df.take(1000))
Sobald ich versuche, den Lambda-Ausdruck in eine benannte Funktion zu bewegen:
def my_function(x):
return len(x)
@spark.route('/')
def run():
df = ... # my dataframe
myUDF = udf(my_function, IntegerType())
df = df.withColumn('new_'+column, myUDF(df[column]))
return render_template('index.html', data=df.take(1000))
ich folgend erhalten Fehler:
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
File "app/__init__.py", line 19, in <module>
from app.controllers.main import main
File "app/controllers/main/__init__.py", line 5, in <module>
import default, source
File "app/controllers/main/default.py", line 3, in <module>
from app.controllers.main.source import file
File "app/controllers/main/source/__init__.py", line 2, in <module>
import file, online, database
File "app/controllers/main/source/database.py", line 1, in <module>
from app.controllers.spark import sqlContext
File "app/controllers/spark/__init__.py", line 18, in <module>
import default, grid #, pivot
File "app/controllers/spark/default.py", line 2, in <module>
from app.controllers.spark import spark, sc, sqlContext, grid as gridController
File "app/controllers/spark/grid.py", line 14, in <module>
from pyspark.ml import Pipeline
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/__init__.py", line 18, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 23, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/__init__.py", line 25, in <module>
ImportError: No module named numpy
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:398)
at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:363)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Numpy ist installiert. Durch das Entfernen der mllib-Importe wurde das Problem nicht gelöst.
Danke für die Hilfe. In der Tat hat Ihre Lösung funktioniert. In meinem Fall hat das Zurücksetzen der virtuellen Umgebung (vent) das Problem ebenfalls gelöst. (Aber ich weiß nicht warum). –