konvertieren Ich benutze Apache Zeppelin Notebook. So läuft Funke grundsätzlich im interaktiven Modus. Ich kann die Variable closure hier nicht verwenden, da der Zeppelin org.apache.spark.SparkException: Task not serializable
auswirft, während er versucht, den gesamten Absatz zu serialisieren (größerer Abschluss).Funken UDF, wie in Karte zu Spalte
Also ohne Schließung Ansatz nur Option Ich habe ist, Karte als eine Spalte zu UDF übergeben.
Ich habe eine folgende Karte von paried RDD gesammelt:
final val idxMap = idxMapRdd.collectAsMap
die hier in einem der Funken Transformation verwendet wird:
def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {
predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap)))
Aber mit lit(idxMap)
Anweisung Ich habe folgende Fehlermeldung:
So habe ich versucht, Spalte aus folgenden verwenden:
val colmap = map(idxMapArr.map(lit _): _*)
Aber folgende Fehler bekommen:
<console>:139: error: type mismatch;
found : Iterable[org.apache.spark.sql.Column]
required: Seq[org.apache.spark.sql.Column]
val colmap = map(idxMapArr.map(lit _): _*)
Closure Ansatz (auf Vollständigkeit):
def predictionStrUDF2(idxMapArr: scala.collection.Map[Double,String]) = {
udf((predictions: WrappedArray[Double]) => labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))
die kompilieren, aber wenn ich dann cvmlPredictionsStr.show
bekomme ich folgende. Ich denke, das auf interaktive Art von zeppelin gebührt
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
- object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: [email protected])
- field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
- object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
- field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
- object (class $iw, [email protected])
- field (class: $iw, name: $iw, type: class $iw)
Ihr erstes Beispiel funktioniert ohne Probleme in meinem Zeppelin-Notebook. Ich musste Wrapper dort nicht verwenden. – nir