2017-08-10 7 views
0

Ich habe eine benutzerdefinierte udf und in Spark registriert.Wenn ich versuche, auf diese UDF zuzugreifen, löst er error.Unable für den Zugriff.UDF-Verwendung in Funke

Ich versuchte es so.

spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 
val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(rssi_weightage($"RSSI").as("RSSI_Weight"))) 

Fehler werden angezeigt in der ersten (rssi_weightage ($ "RSSI") // rssi_weightage nicht gefunden Fehler

Jede Hilfe geschätzt wird.

+0

Hat meine Lösung Ihre Frage gelöst? Wenn ja, bitte akzeptieren Sie die Antwort –

Antwort

2

das ist nicht, wie Sie die UDF verwenden, die tatsächliche UDF ist ein Rückgabewert von spark.udf.register So können Sie tun:.

val udf_rssii_weightage = spark.udf.register("rssi_weightage", FilterMap.rssi_weightage) 

val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(udf_rssi_weightage($"RSSI")).as("RSSI_Weight")) 

Aber in Ihrem Fall müssen Sie nicht das UDF registrieren, nur org.apache.spark.sql.functions.udf verwenden, um eine regu zu konvertieren lar Funktion zu einem UDF:

val udf_rssii_weightage = udf(FilterMap.rssi_weightage) 
+0

Danke für Ihre Lösung ... –

+0

guten einen :) @Raphael, verdient eine Verbesserung –

1

Ich nehme an, Sie haben ein Problem mit der Art und Weisen Sie die UDF-Funktion sind definiert, die nächste Snapshot einen etwas anderen Ansatz in Ankündigung UDF hat - es ist direkt definierte Funktion: Import org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'foo' : 'Bar'}", "{'foo': 'Baz'}"))) 

val example = Seq("Bar", "Bazzz") 
val urbf = udf { foo: String => if (example.contains(example)) 1 else 0 } 

data.select($"foo", urbf($"foo")).show 

+--------+-------------+ 
| foo |UDF(foo)  | 
+--------+-------------+ 
| Bar |   1| 
| Bazzz |   0| 
+--------+-------------+ 
+0

Danke für Ihre Lösung ... –