2017-05-11 4 views
0

Ich habe eine Anforderung, meine eigene UnaryTransformer-Instanz zu erstellen, die eine Dataframe-Spalte vom Typ Array [String] akzeptiert und denselben Typ ausgeben sollte. Beim Versuch, dies zu tun, stieß ich auf eine ClassCastException in meiner Spark-Version 2.1.0. Ich habe einen Beispieltest zusammengestellt, der meinen Fall zeigt.UnaryTransformer-Instanz wirft ClassCastException

import org.apache.spark.SparkConf 
import org.apache.spark.ml.UnaryTransformer 
import org.apache.spark.ml.util.Identifiable 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.types.{ArrayType, DataType, StringType} 

class MyTransformer(override val uid:String) extends UnaryTransformer[Array[String],Array[String],MyTransformer] { 
    override protected def createTransformFunc: (Array[String]) => Array[String] = { 
    param1 => { 
     param1.foreach(println(_)) 
     param1 
    } 
    } 

    override protected def outputDataType: DataType = ArrayType(StringType) 

    override protected def validateInputType(inputType: DataType): Unit = { 
    require(inputType == ArrayType(StringType), s"Data type mismatch between Array[String] and provided type $inputType.") 
    } 

    def this() = this(Identifiable.randomUID("tester")) 
} 


object Tester { 



    def main(args: Array[String]): Unit = { 

    val config = new SparkConf().setAppName("Tester") 

    implicit val sparkSession = SparkSession.builder().config(config).getOrCreate() 
    import sparkSession.implicits._ 

    val dataframe = Seq(Array("Firstly" , "F1"),Array("Driving" , "S1"),Array("Ran" , "T3"),Array("Fourth" ,"F4"), Array("Running" , "F5") 
     ,Array("Gone" , "S6")).toDF("input") 



    val transformer = new MyTransformer().setInputCol("input").setOutputCol("output") 

    val transformed = transformer.transform(dataframe) 

    transformed.select("output").show() 

    println("Complete....") 

    sparkSession.close() 


    } 

} 

Anbringen der Stapelüberwachung Referenz

Exception in thread "main" org.apache.spark.SparkException: Fehler beim Ausführen benutzerdefinierte Funktion (anonfun $ $ $ createTransformFunc 1: (array) => array) um org.apache.spark.sql.catalyst.expressions.ScalauDEF.eval (ScalaUDF.scala: 1072) um org.apache.spark.sql.catalyst.expressions.Alias.eval (namedExpressions. scala: 144) unter org.apache.spark.sql.c atalyst.expressions.InterpretedProjection.apply (Projection.scala: 48) bei org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply (Projection.scala: 30) bei scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) um scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) bei scala.collection.immutable.List.foreach (List.scala : 392) um scala.collection.TraversableLike $ class.map (TraversableLike.scala: 234) bei scala.collection.immutable.List.map (List.scala: 296) um org.apache.spark.sql.catalyst .optimizer.ConvertToLocalRelation $$ anonfun $ apply $ 21.applyOrElse (Optimizer.scala: 1078) bei org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $$ anonfun $ bewerben $ 21.applyOrElse (Optimizer.scala: 1073) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3. anwenden (TreeNode.scala: 288) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3.apply (TreeNode.scala: 288) um org.apache.spark.sql.catalyst. trees.CurrentOrigin $ .withOrigin (TreeNode.scala: 70) bei org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 287) bei org.apache.spark.sql.catalyst .trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode.scala: 293) um org.apache.spark.sql.catalysis t.trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode.scala: 293) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply (TreeNode.scala: 331) bei org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator (TreeNode.scala: 188) bei org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren (TreeNode.scala: 329) um org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 293) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode .scala: 293) um org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ transformDown $ 1.apply (TreeNode.scala: 293) bei org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 5.apply (TreeNode.scala: 331) bei org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator (TreeNode. scala: 188) um org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren (TreeNode.scala: 329) um org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 293) bei org.apache.spark.sql.catalyst.trees.TreeNode.transform (TreeNode.scala: 277) bei org.apache.spark.sql. catalyst.optimizer.ConvertToLocalRelation $ .apply (Optimizer.scala: 1073) bei org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation $ .apply (Optimizer.scala: 1072) bei org.apache.spark. sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ apply $ 1.apply (RuleExecutor.scala: 85) unter org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1 $$ anonfun $ anwenden $ 1.apply (RuleExecutor.scala: 82) um scala.collection.IndexedSeqOptimized $ class.fold l (IndexedSeqOptimized.scala: 57) bei scala.collection.IndexedSeqOptimized $ class.foldLeft (IndexedSeqOptimized.scala: 66) bei scala.collection.mutable.WrappedArray.foldLeft (WrappedArray.scala: 35) bei org .apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1.apply (RuleExecutor.scala: 82) unter org.apache.spark.sql.catalyst.rules.RuleExecutor $$ anonfun $ execute $ 1. Anwenden (RuleExecutor.scala: 74) bei scala.collection.immutable.List.foreach (List.scala: 392) um org.apache.spark.sql.catalyst.rules.RuleExecutor.execute (RuleExecutor.scala: 74) um org.apache.spark.sql.execution.QueryExecution.optimizedPlan $ lzycompute (Quer yExecution.scala: 73) bei org.apache.spark.sql.execution.QueryExecution.optimizedPlan (QueryExecution.scala: 73) bei org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute (QueryExecution. scala: 79) bei org.apache.spark.sql.execution.QueryExecution.sparkPlan (QueryExecution.scala: 75) bei org.apache.spark.sql.execution.QueryExecution.executedPlan $ lzycompute (QueryExecution.scala: 84) bei org.apache.spark.sql.execution.QueryExecution.executedPlan (QueryExecution.scala: 84) bei org.apache.spark.sql.Dataset.withTypedCallback (Dataset.scala: 2791) bei org.apache .spark.sql.Dataset.head (Dataset.scala: 2112) um org.apache.spark.sql.Dataset.take (Dataset.scala: 2327) um org.apache.spark.sql.Dataset.showString (Dataset.scala: 248) um ​​ org.apache.spark.sql.Dataset .show (Dataset.scala: 636) um org.apache.spark.sql.Dataset.show (Dataset.scala: 595) um org.apache.spark.sql.Dataset.show (Dataset.scala: 604) um Tester $ .main (Tester.scala: 45) bei Tester.main (Tester.scala) verursacht durch: java.lang.ClassCastException: scala.collection.mutable.WrappedArray $ ofRef kann nicht in umgewandelt werden [Ljava.lang .String; bei MyTransformer $$ anonfun $ createTransformFunc $ 1.Apply (Tester.scala: 9) bei org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2.Apply (ScalaUDF.scala: 89) bei org.apache.spark.sql.catalyst.expressions.Scalawdf $$ anonfun $ 2.apply (ScalaUDF.scala: 88) um org.apache.spark.sql.catalyst.expressions.ScalauDF.eval (ScalaUDF.scala: 1069) ... 53 weitere

Antwort

1

ArrayType als Seq nicht Array vertreten:

override protected def createTransformFunc: (Seq[String]) => Seq[String] = { 
    param1 => { 
     param1.foreach(println(_)) 
    param1 
    } 
} 
+0

Danke, das hat die Ausnahme behoben. Könnten Sie mich auf den Code verweisen, in dem ArrayType als Seq dargestellt wird? – schengalath

Verwandte Themen