2016-05-24 5 views
0

Ich habe ein Datenframe mit einer Spalte vom Typ string mit Nur-Text und möchte diese Spalte mit pyspark.sql.functions.udf (oder pyspark.sql.functions.UserDefinedFunction?) Ändern.Fehler, wenn die vorhandene Funktion als UDF verwendet wird, um eine Spark-Datenrahmenspalte zu ändern

Ich benutze Python 2.7, Pyspark 1.6.1 und Flask 0.10.1 auf OSX 10.11.4.

Es scheint gut zu funktionieren, wenn ich einen Lambda-Ausdruck verwenden:

@spark.route('/') 
def run(): 
    df = ... # my dataframe 
    myUDF = udf(lambda r: len(r), IntegerType()) 
    df = df.withColumn('new_'+column, myUDF(df[column])) 
    return render_template('index.html', data=df.take(1000)) 

Sobald ich versuche, den Lambda-Ausdruck in eine benannte Funktion zu bewegen:

def my_function(x): 
    return len(x) 

@spark.route('/') 
def run(): 
    df = ... # my dataframe 
    myUDF = udf(my_function, IntegerType()) 
    df = df.withColumn('new_'+column, myUDF(df[column])) 
    return render_template('index.html', data=df.take(1000)) 

ich folgend erhalten Fehler:

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
    File "app/__init__.py", line 19, in <module> 
    from app.controllers.main import main 
    File "app/controllers/main/__init__.py", line 5, in <module> 
    import default, source 
    File "app/controllers/main/default.py", line 3, in <module> 
    from app.controllers.main.source import file 
    File "app/controllers/main/source/__init__.py", line 2, in <module> 
    import file, online, database 
    File "app/controllers/main/source/database.py", line 1, in <module> 
    from app.controllers.spark import sqlContext 
    File "app/controllers/spark/__init__.py", line 18, in <module> 
    import default, grid #, pivot 
    File "app/controllers/spark/default.py", line 2, in <module> 
    from app.controllers.spark import spark, sc, sqlContext, grid as gridController 
    File "app/controllers/spark/grid.py", line 14, in <module> 
    from pyspark.ml import Pipeline 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/__init__.py", line 18, in <module> 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 23, in <module> 
    File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/__init__.py", line 25, in <module> 
ImportError: No module named numpy 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:398) 
    at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:363) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Numpy ist installiert. Durch das Entfernen der mllib-Importe wurde das Problem nicht gelöst.

Antwort

0

Es funktioniert normalerweise, wenn Sie den gesamten Körper von 'my_function' im Rumpf der 'run' Funktion deklarieren. Sonst habe ich noch nicht gefunden wie man eine externe Funktion genau so anruft wie in Ihrem Fall.

+0

Danke für die Hilfe. In der Tat hat Ihre Lösung funktioniert. In meinem Fall hat das Zurücksetzen der virtuellen Umgebung (vent) das Problem ebenfalls gelöst. (Aber ich weiß nicht warum). –

Verwandte Themen