2017-12-20 4 views
1

Ich muss eine Liste in eine UDF übergeben, die Liste wird die Punktzahl/Kategorie der Entfernung bestimmen. Fürs Erste bin ich hart um alle Entfernungen zu codieren, um die 4. Punktzahl zu sein.PySpark - Pass-Liste als Parameter zu UDF

a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"]) 

from pyspark.sql.functions import udf 
def cate(label, feature_list): 
    if feature_list == 0: 
     return label[4] 
label_list = ["Great", "Good", "OK", "Please Move", "Dead"] 
udf_score=udf(cate, StringType()) 
a.withColumn("category", udf_score(label_list,a["distances"])).show(10) 

Wenn ich so etwas versuche, bekomme ich diesen Fehler.

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace: 
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339) 
    at py4j.Gateway.invoke(Gateway.java:274) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 

Antwort

2

hoffe, das hilft!

from pyspark.sql.functions import udf, col 

#sample data 
a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"]) 
label_list = ["Great", "Good", "OK", "Please Move", "Dead"] 

def cate(label, feature_list): 
    if feature_list == 0: 
     return label[4] 
    else: #you may need to add 'else' condition as well otherwise 'null' will be added in this case 
     return 'I am not sure!' 

def udf_score(label_list): 
    return udf(lambda l: cate(l, label_list)) 
a.withColumn("category", udf_score(label_list)(col("distances"))).show() 

Ausgang ist:

+------+---------+--------------+ 
|Letter|distances|  category| 
+------+---------+--------------+ 
|  A|  20|I am not sure!| 
|  B|  30|I am not sure!| 
|  D|  80|I am not sure!| 
+------+---------+--------------+ 
0

Versuchen Sie, die Funktion currying, so dass das einzige Argument in den Datenrahmen Aufruf der Name der Spalte ist, auf dem Sie die Funktion wollen handeln:

udf_score=udf(lambda x: cate(label_list,x), StringType()) 
a.withColumn("category", udf_score("distances")).show(10)