2016-12-20 5 views
-1

Ich habe eine UDF, die einen Datenrahmen zurückgibt. Etwas wie die untenApache Spark - Registrierung einer UDF - zurückkehrenden Datenrahmen

scala> predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0)) 
res3: org.apache.spark.sql.DataFrame = [noprob: string, yesprob: string, pred: string] 

scala> predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0)).show 
+------------------+------------------+----+ 
|   noprob|   yesprob|pred| 
+------------------+------------------+----+ 
|0.3619977592578127|0.6380022407421874| 1.0| 
+------------------+------------------+----+ 

aber wenn ich versuche, dies als eine UDF zu registrieren, um den

hiveContext.udf.register("predict_churn", outerpredict _) 

Befehl bekomme ich einen Fehler wie

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.DataFrame is not supported 
      at  org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715) 

kehrt Datenrahmen nicht unterstützt. Ich benutze Spark 1.6.1 und Scala 2.10. Wenn dies nicht unterstützt wird, wie kann ich bitte mehrere Spalten zu einem externen Programm zurückgeben.

Dank

Bala

Antwort

4

kehrt Datenrahmen nicht

unterstützt

Richtig - Sie keine Datenrahmen von einem UDF zurückkehren kann. UDF sollte Typen zurück, die in den unterstützten Spaltentypen konvertierbar sind:

  • Primitives (Int, String, Boolean, ...)
  • Tupeln von anderen unterstützten Typen
  • Listen, Arrays , Karten von anderen unterstützten Typen
  • Fallklassen von anderen unterstützten Typen

In Ihrem Fall können Sie einen Fall-Klasse verwenden:

case class Record(noprob: Double, yesprob: Double, pred: Double) 

Und haben Sie Ihre UDF (predict_churn) Record zurückzukehren. Wenn dann auf einen einzelnen Datensatz angewendet wird (wie UDFs), wird diese Fallklasse in Spalten konvertiert, die als ihre Member (und mit den richtigen Typen) benannt werden, was zu einem DataFrame führt, der dem aktuell von Ihrer Funktion zurückgegebenen ähnelt.

+0

Vielen Dank für Ihre Antwort. Ich habe die von Ihnen vorgeschlagene Lösung ausprobiert. Das ist, was ich tat Mein Fall-Klasse ist wie folgt 'Fallklasse Prob (noprob: String, yesprob: String, pred: String)' In der Funktion 'val op = result.map (p => prob (p (0) .toString, S. (1) .toString, p (2) .toString)) op // Rückkehr op als Ausgabe ' Auch danach bekomme ich einen sehr ähnlichen Fehler ** scala> hiveContext.udf.register ("predict_churn", outerpredict _) java.lang.UnsupportedOperationException: Schema für Typ org.apache.spark.rdd.RDD [Prob] nicht unterstützt ** Was mache ich falsch? –

+0

Gibt Ihre modifizierte UDF jetzt ein _RDD_ zurück? Das ist nicht, was ich meinte, es sollte nur ein _Record_ –

+0

zurückgeben Es tut mir leid, ich bekomme nicht die Syntax richtig.Wie kann ich konvertieren 'scala> Ergebnis res13: org.apache.spark.sql.DataFrame = [noprob: string, yesprob: string, pred: string]' in einem Fall, Klasse vom Typ 'Fallklasse Record (noprob: String, yesprob: String, pred: String) ' Danke –

Verwandte Themen