2016-05-19 8 views
3

Ich habe folgenden Apache Funken UDF in scala:Wie übergeben Sie Array [Seq [String]] an Apache Funken UDF? (Fehler: Nicht anwendbar)

val myFunc = udf { 
    (userBias: Float, otherBiases: Map[Long, Float], 
    userFactors: Seq[Float], context: Seq[String]) => 
    var result = Float.NaN 

    if (userFactors != null) { 
     var contexBias = 0f 

     for (cc <- context) { 
     contexBias += otherBiases(contextMapping(cc)) 
     } 

     // definition of result 
     // ... 
    } 
    result 
} 

Jetzt mag ich Parameter auf diese Funktion zu übergeben, aber ich bekomme immer die Meldung nicht anwendbar aufgrund der Parameter context. Ich weiß, dass benutzerdefinierte Funktionen Eingaben von Zeilen nehmen, und diese Funktion wird ausgeführt, wenn ich context ... lösche Wie kann dieses Problem behoben werden? Warum liest es keine Zeilen von Array[Seq[String]], d. H. Von context? Alternativ wäre es akzeptabel, context als DataFrame oder etwas Ähnliches zu übergeben.

// context is Array[Seq[String]] 
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b") 
val context = a.collect.map(_.toSeq.map(_.toString)) 

// userBias("bias"), otherBias("biases") and userFactors("features") 
// have a type Column, while userBias... are DataFrames 
myDataframe.select(dataset("*"), 
        myFunc(userBias("bias"), 
          otherBias("biases"), 
          userFactors("features"), 
          context) 
        .as($(newCol))) 

UPDATE:

ich die Lösung in der Antwort von zero323 angegeben versucht, aber immer noch gibt es ein kleines Problem mit context: Array[Seq[String]] ist. Insbesondere besteht das Problem darin, dieses Array durchzuschleifen for (cc <- context) { contexBias += otherBiases(contextMapping(cc)) }. Ich sollte einen String zu contextMapping passieren, kein Seq[String]:

def myFunc(context: Array[Seq[String]]) = udf { 
    (userBias: Float, otherBiases: Map[Long, Float], 
    userFactors: Seq[Float]) => 
     var result = Float.NaN 

     if (userFactors != null) { 
     var contexBias = 0f 
     for (cc <- context) { 
      contexBias += otherBiases(contextMapping(cc)) 
     } 

     // estimation of result 

     } 
     result 
    } 

Jetzt habe ich es nennen wie folgt:

myDataframe.select(dataset("*"), 
        myFunc(context)(userBias("bias"), 
            otherBias("biases"), 
            userFactors("features")) 
      .as($(newCol))) 

Antwort

1

Funken 2.2+

können Sie typedLit Funktionen nutzen:

import org.apache.spark.sql.functions.typedLit 

myFunc(..., typedLit(context)) 

Funken < 2,2

Jedes Argument, das direkt an die UDF übergeben wird, hat ein Column so sein, wenn man konstante Array übergeben möchten, müssen Sie es auf Spalte wörtlichen konvertieren müssen:

import org.apache.spark.sql.functions.{array, lit} 

val myFunc: org.apache.spark.sql.UserDefinedFunction = ??? 

myFunc(
    userBias("bias"), 
    otherBias("biases"), 
    userFactors("features"), 
    // org.apache.spark.sql.Column 
    array(context.map(xs => array(xs.map(lit _): _*)): _*) 
) 

Nicht - Column Objekte können nur indirekt durch Schließen übergeben werden, zum Beispiel so:

def myFunc(context: Array[Seq[String]]) = udf { 
    (userBias: Float, otherBiases: Map[Long, Float], userFactors: Seq[Float]) => 
    ??? 
} 

myFunc(context)(userBias("bias"), otherBias("biases"), userFactors("features")) 
+0

Beachten Sie, dass Spark 2.2 noch nicht veröffentlicht wurde. –

Verwandte Themen