2016-09-01 5 views
-1

Ich habe eine Funktion zu definieren versucht, die innerhalb Spark Dataframe arbeitet die scala Sätze als Eingabe und gibt eine ganze Zahl. Ich erhalte die folgende Fehlermeldung:Funken SqlContext UDF auf Sets wirken

org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set 

Hier ist ein einfacher Code, der den Kern des Problems gibt:

// generate sample data 
case class Dummy(x:Array[Integer]) 
val df = sqlContext.createDataFrame(Seq(
    Dummy(Array(1,2,3)), 
    Dummy(Array(10,20,30,40)) 
)) 

// define the UDF 
import org.apache.spark.sql.functions._ 
def setSize(A:Set[Integer]):Integer = { 
    A.size 
} 
// For some reason I couldn't get it to work without this valued function 
val sizeWrap: (Set[Integer] => Integer) = setSize(_) 
val sizeUDF = udf(sizeWrap) 

// this produces the error 
df.withColumn("colSize", sizeUDF('x)).show 

Was soll ich hier fehlt? Wie kann ich das zur Arbeit bringen? Ich weiß, dass ich dies tun kann, indem ich zu RDD übergebe, aber ich möchte nicht zwischen RDD und DataFrames hin und her gehen.

Antwort

1

Verwendung Seq:

val sizeUDF = udf((x: Seq) => setSize(x.toSet)) 
+0

Dank. Es klappt. Ich habe es auch geschafft, 2 Sätze als Eingabe zu verallgemeinern (was mein ursprüngliches Bedürfnis war) – Avision

Verwandte Themen