2016-11-14 4 views
0

konvertieren Ich benutze Apache Zeppelin Notebook. So läuft Funke grundsätzlich im interaktiven Modus. Ich kann die Variable closure hier nicht verwenden, da der Zeppelin org.apache.spark.SparkException: Task not serializable auswirft, während er versucht, den gesamten Absatz zu serialisieren (größerer Abschluss).Funken UDF, wie in Karte zu Spalte

Also ohne Schließung Ansatz nur Option Ich habe ist, Karte als eine Spalte zu UDF übergeben.

Ich habe eine folgende Karte von paried RDD gesammelt:

final val idxMap = idxMapRdd.collectAsMap 

die hier in einem der Funken Transformation verwendet wird:

def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = { 

    predictions.array.map(idxMap.getOrElse(_, "Other")) 
} 
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)} 

val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap))) 

Aber mit lit(idxMap) Anweisung Ich habe folgende Fehlermeldung:

So habe ich versucht, Spalte aus folgenden verwenden:

val colmap = map(idxMapArr.map(lit _): _*)

Aber folgende Fehler bekommen:

<console>:139: error: type mismatch; 
found : Iterable[org.apache.spark.sql.Column] 
required: Seq[org.apache.spark.sql.Column] 
     val colmap = map(idxMapArr.map(lit _): _*) 

Closure Ansatz (auf Vollständigkeit):

def predictionStrUDF2(idxMapArr: scala.collection.Map[Double,String]) = { 
    udf((predictions: WrappedArray[Double]) => labelStr(predictions, idxMapArr)) 
} 
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions"))) 

die kompilieren, aber wenn ich dann cvmlPredictionsStr.show bekomme ich folgende. Ich denke, das auf interaktive Art von zeppelin gebührt

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2139) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 62 elided 
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS 
Serialization stack: 
    - object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: [email protected]) 
    - field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS) 
    - object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f) 
    - field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator) 
    - object (class $iw, [email protected]) 
    - field (class: $iw, name: $iw, type: class $iw) 

Antwort

1

Die Frage Titel ist über Spark-UDF, aber es scheint, die wirkliche Frage ist, wie die Schließung Serialisierung Problem, dass einige interaktive Umgebungen zeigen zu vermeiden.

Aus Ihrer Beschreibung des Problems, es klingt wie die folgende nicht, wenn sie direkt in einem Ihrer Notebook-Zellen ausgeführt funktioniert:

val x = 5 
sc.parallelize(1 to 10).filter(_ > x).collect() 

Dies ist wahrscheinlich, weil x eine Klasse Mitglied der ist Zellobjekt; Wenn das Lambda x erfasst, versucht es, das gesamte Zellobjekt zu serialisieren. Das Zellenobjekt ist nicht serialisierbar und das Ergebnis ist eine chaotische Ausnahme. Dieses Problem kann mit einem Wrapper-Objekt vermieden werden. Beachten Sie, dass dies wahrscheinlich eine bessere Möglichkeit ist, diesen Wrapper zu deklarieren (vielleicht reicht das einfache Verschachteln innerhalb geschweifter Klammern).

Sie haben möglicherweise noch Fragen, nachdem Sie dieses Problem gelöst haben, aber die Frage berührt derzeit zu viele verschiedene Unterthemen. Eine weitere Erklärung des Serialisierungsproblems für die Schließung ist verfügbar here.

+0

Ihr erstes Beispiel funktioniert ohne Probleme in meinem Zeppelin-Notebook. Ich musste Wrapper dort nicht verwenden. – nir