2017-08-25 4 views
0

Wenn ich eine Funktion aufrufen, funktioniert es. aber wenn ich diese Funktion in UDF aufrufen, wird es nicht funktionieren.Spark classnotfundedexception in UDF

Dies ist der vollständige Code.

val sparkConf = new SparkConf().setAppName("HiveFromSpark").set("spark.driver.allowMultipleContexts","true") 
val sc = new SparkContext(sparkConf) 
val hive = new org.apache.spark.sql.hive.HiveContext(sc) 

///////////// UDFS 
def toDoubleArrayFun(vec:Any) : scala.Array[Double] = { 
    return vec.asInstanceOf[WrappedArray[Double]].toArray 
} 
def toDoubleArray=udf((vec:Any) => toDoubleArrayFun(vec)) 

//////////// PROCESS 
var df = hive.sql("select vec from mst_wordvector_tapi_128dim where word='soccer'") 
println("==== test get value then transform") 
println(df.head().get(0)) 
println(toDoubleArrayFun(df.head().get(0))) 

println("==== test transform by udf") 
df.withColumn("word_v", toDoubleArray(col("vec"))) 
.show(10); 

Dann ist dies der Ausgang.

sc: org.apache.spark.SparkContext = [email protected] 
hive: org.apache.spark.sql.hive.HiveContext = 
toDoubleArrayFun: (vec: Any)Array[Double] 
toDoubleArray: org.apache.spark.sql.UserDefinedFunction 
df: org.apache.spark.sql.DataFrame = [vec: array<double>] 
==== test get value then transform 
WrappedArray(-0.88675,, 0.0216657) 
[[email protected] 
==== test transform by udf 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, xdad008.band.nhnsystem.com): java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$$$5ba2a895f25683dd48fe725fd825a71$$$$$$iwC$$anonfun$toDoubleArray$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

Volle Ausgabe hier. https://gist.github.com/jeesim2/efb52f12d6cd4c1b255fd0c917411370

Wie Sie "toDoubleArrayFun" -Funktion funktioniert gut sehen kann, aber in UDF behauptet, es ClassNotFoundException.

Ich kann die Struktur der Strukturstruktur nicht ändern und muss vec in Array [Double] konvertieren, um eine Vector-Instanz zu erstellen.

Also welches Problem mit Code oben?

Spark-Version ist 1.6.1

Update 1

Hive Tisch 'VEC' Spaltentyp ist "array<double>"

Im Folgenden Code verursachen auch Fehler

var df = hive.sql("select vec from mst_wordvector_tapi_128dim where 
word='hh'") 
df.printSchema() 
var word_vec = df.head().get(0) 
println(word_vec) 
println(Vectors.dense(word_vec)) 

Ausgang

Das heißt hive ‚array<double>‘ Spalte kippe nicht gegossen werden Array<Double> Eigentlich möchte ich Abstand berechnen: Double mit zwei array<double> Spalte. Wie füge ich Vector Spalte basierend auf array<double> Spalte hinzu?

Typisches Verfahren ist

Vectors.sqrt(Vectors.dense(Array<Double>, Array<Double>) 

Antwort

2

Da udf Funktion Serialisierung und Deserialisierung gehen hat, wird any Datentyp nicht. Sie müssen den genauen Datentyp der Spalte definieren, die Sie an die udf-Funktion übergeben.

Vom Ausgang in Ihrer Frage scheint es, dass Sie nur eine Spalte in Ihrem Datenrahmen dh vec haben, die von Array[Double] Typ ist

df: org.apache.spark.sql.DataFrame = [vec: array<double>] 

Es tatsächlich als vec Spalte bereits keine Notwendigkeit dieser UDF-Funktion sind von Array dataType und das ist, was Ihre udf Funktion auch tut, dh den Wert auf Array[Double] werfend.

Nun wird Ihr anderer Funktionsaufruf

println(toDoubleArrayFun(df.head().get(0))) 

zu arbeiten, weil es keine Notwendigkeit der Serialisierung und Deserialisierung Prozess ist, es ist nur scala Funktionsaufruf.

+0

Ah, serialisierbar war der Punkt!Aber wie mache ich Dataframe Spalte 'Array ', nicht Any? Ich habe die Frage aktualisiert! –

+0

anstelle von 'Any' in Ihrer UdF-Funktion, definieren Sie einfach den Datentyp als' WrappedArray [Double] 'und Sie sollten in Ordnung sein. :) –

+0

Vielen Dank für die freundlichen Antworten .. btw wenn ich den Argumenttyp auf 'WrappedArray [Double]' anstelle von 'Any' setze, scheitert es. ': 346: Fehler: Typ nicht übereinstimmen; gefunden: Irgendwelche. Erforderlich: scala.collection.mutable.WrappedArray [Double] println (Vectors.dense (toDoubleArrayFun (df.head(). get (0)))) ' –