Ich habe eine Anforderung, meine eigene UnaryTransformer-Instanz zu erstellen, die eine Dataframe-Spalte vom Typ Array [String] akzeptiert und denselben Typ ausgeben sollte. Beim Versuch, dies zu tun, stieß ich auf eine ClassCastException in meiner Spark-Version 2.1.0. Ich habe einen Beispieltest zusammengestellt, der meinen Fall zeigt.UnaryTransformer-Instanz wirft ClassCastException
import org.apache.spark.SparkConf
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class MyTransformer(override val uid:String) extends UnaryTransformer[Array[String],Array[String],MyTransformer] {
override protected def createTransformFunc: (Array[String]) => Array[String] = {
param1 => {
param1.foreach(println(_))
param1
}
}
override protected def outputDataType: DataType = ArrayType(StringType)
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == ArrayType(StringType), s"Data type mismatch between Array[String] and provided type $inputType.")
}
def this() = this(Identifiable.randomUID("tester"))
}
object Tester {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setAppName("Tester")
implicit val sparkSession = SparkSession.builder().config(config).getOrCreate()
import sparkSession.implicits._
val dataframe = Seq(Array("Firstly" , "F1"),Array("Driving" , "S1"),Array("Ran" , "T3"),Array("Fourth" ,"F4"), Array("Running" , "F5")
,Array("Gone" , "S6")).toDF("input")
val transformer = new MyTransformer().setInputCol("input").setOutputCol("output")
val transformed = transformer.transform(dataframe)
transformed.select("output").show()
println("Complete....")
sparkSession.close()
}
}
Anbringen der Stapelüberwachung Referenz
Exception in thread "main" org.apache.spark.SparkException: Fehler beim Ausführen benutzerdefinierte Funktion (anonfun $ $ $ createTransformFunc 1: (array) => array) um org.apache.spark.sql.catalyst.expressions.ScalauDEF.eval (ScalaUDF.scala: 1072) um org.apache.spark.sql.catalyst.expressions.Alias.eval (namedExpressions. scala: 144) unter org.apache.spark.sql.c atalyst.expressions.InterpretedProjection.apply (Projection.scala: 48) bei org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply (Projection.scala: 30) bei scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) um scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) bei scala.collection.immutable.List.foreach (List.scala : 392) um scala.collection.TraversableLike $ class.map (TraversableLike.scala: 234) bei scala.collection.immutable.List.map (List.scala: 296) um org.apache.spark.sql.catalyst .optimizer.ConvertToLocalRelation $$ anonfun $ apply $ 21.applyOrElse (Optimizer.scala: 1078) bei org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $$ anonfun $ bewerben $ 21.applyOrElse (Optimizer.scala: 1073) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3. anwenden (TreeNode.scala: 288) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3.apply (TreeNode.scala: 288) um org.apache.spark.sql.catalyst. trees.CurrentOrigin $ .withOrigin (TreeNode.scala: 70) bei org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 287) bei org.apache.spark.sql.catalyst .trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode.scala: 293) um org.apache.spark.sql.catalysis t.trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode.scala: 293) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply (TreeNode.scala: 331) bei org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator (TreeNode.scala: 188) bei org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren (TreeNode.scala: 329) um org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 293) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode .scala: 293) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode.scala: 293) bei org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply (TreeNode.scala: 331) bei org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator (TreeNode. scala: 188) um org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren (TreeNode.scala: 329) um org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 293) bei org.apache.spark.sql.catalyst.trees.TreeNode.transform (TreeNode.scala: 277) bei org.apache.spark.sql. catalyst.optimizer.ConvertToLocalRelation $ .apply (Optimizer.scala: 1073) bei org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $ .apply (Optimizer.scala: 1072) bei org.apache.spark. sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply (RuleExecutor.scala: 85) unter org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ anwenden $ 1.apply (RuleExecutor.scala: 82) um scala.collection.IndexedSeqOptimized $ class.fold l (IndexedSeqOptimized.scala: 57) bei scala.collection.IndexedSeqOptimized $ class.foldLeft (IndexedSeqOptimized.scala: 66) bei scala.collection.mutable.WrappedArray.foldLeft (WrappedArray.scala: 35) bei org .apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1.apply (RuleExecutor.scala: 82) unter org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1. Anwenden (RuleExecutor.scala: 74) bei scala.collection.immutable.List.foreach (List.scala: 392) um org.apache.spark.sql.catalyst.rules.RuleExecutor.execute (RuleExecutor.scala: 74) um org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (Quer yExecution.scala: 73) bei org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 73) bei org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution. scala: 79) bei org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 75) bei org.apache.spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 84) bei org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryExecution.scala: 84) bei org.apache.spark.sql.Dataset.withTypedCallback (Dataset.scala: 2791) bei org.apache .spark.sql.Dataset.head (Dataset.scala: 2112) um org.apache.spark.sql.Dataset.take (Dataset.scala: 2327) um org.apache.spark.sql.Dataset.showString (Dataset.scala: 248) um org.apache.spark.sql.Dataset .show (Dataset.scala: 636) um org.apache.spark.sql.Dataset.show (Dataset.scala: 595) um org.apache.spark.sql.Dataset.show (Dataset.scala: 604) um Tester $ .main (Tester.scala: 45) bei Tester.main (Tester.scala) verursacht durch: java.lang.ClassCastException: scala.collection.mutable.WrappedArray $ ofRef kann nicht in umgewandelt werden [Ljava.lang .String; bei MyTransformer $$ anonfun $ createTransformFunc $ 1.Apply (Tester.scala: 9) bei org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2.Apply (ScalaUDF.scala: 89) bei org.apache.spark.sql.catalyst.expressions.Scalawdf $$ anonfun $ 2.apply (ScalaUDF.scala: 88) um org.apache.spark.sql.catalyst.expressions.ScalauDF.eval (ScalaUDF.scala: 1069) ... 53 weitere
Danke, das hat die Ausnahme behoben. Könnten Sie mich auf den Code verweisen, in dem ArrayType als Seq dargestellt wird? – schengalath