Ich möchte (als Parkettdatei) einen Spark DataFrame speichern, der eine benutzerdefinierte Klasse als Spalte enthält. Diese Klasse besteht aus einer Seq einer anderen benutzerdefinierten Klasse. Um dies zu tun, erstelle ich für jede dieser Klassen eine UserDefinedType-Klasse, ähnlich wie bei VectorUDT. Ich kann mit dem Datenrahmen arbeiten, wie ich es vorhabe, aber ich kann es nicht als Parkett (oder Jason) auf Platte speichern Ich habe es als Fehler gemeldet, aber vielleicht gibt es ein Problem mit meinem Code. Ich habe ein einfacheres Beispiel implementiert das Problem zu zeigen:Speichern von Spark DataFrames mit verschachtelten Benutzerdatentypen
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any = obj match {
case A(list) =>
val row = new GenericMutableRow(1)
row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
}
}
object AUDT extends AUDT
@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)
class BUDT extends UserDefinedType[B] {
override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
override def userClass: Class[B] = classOf[B]
override def serialize(obj: Any): Any = obj match {
case B(num) =>
val row = new GenericMutableRow(1)
row.setInt(0, num)
row
}
override def deserialize(datum: Any): B = {
datum match {
case row: InternalRow => new B(row.getInt(0))
}
}
}
object BUDT extends BUDT
object TestNested {
def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
new A(Seq(new B(3), new B(4))))
val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(1 to 2 zip col).toDF()
df.show()
df.write.mode(SaveMode.Overwrite).save(...)
}
}
Diese in den folgenden Fehlern führt:
15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):
Wenn ein einen Datenrahmen mit B speichern anstelle von A kein Problem existiert, da B wie keine geschachtelte benutzerdefinierte Klasse. Fehle ich etwas?
Das hat funktioniert. Ich verwende "komplexe" Objekte als Spalten in einem DataFrame und in Spark 1.6.0 funktioniert es nicht mehr. Das hat den Trick gemacht, also ist die Lektion, die ich gelernt habe, einfach alles bezüglich der Serialisierung/Deserialisierung sehr explizit zu machen. Prost! –