2015-09-17 11 views
6

Ich möchte (als Parkettdatei) einen Spark DataFrame speichern, der eine benutzerdefinierte Klasse als Spalte enthält. Diese Klasse besteht aus einer Seq einer anderen benutzerdefinierten Klasse. Um dies zu tun, erstelle ich für jede dieser Klassen eine UserDefinedType-Klasse, ähnlich wie bei VectorUDT. Ich kann mit dem Datenrahmen arbeiten, wie ich es vorhabe, aber ich kann es nicht als Parkett (oder Jason) auf Platte speichern Ich habe es als Fehler gemeldet, aber vielleicht gibt es ein Problem mit meinem Code. Ich habe ein einfacheres Beispiel implementiert das Problem zu zeigen:Speichern von Spark DataFrames mit verschachtelten Benutzerdatentypen

import org.apache.spark.sql.SaveMode 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.catalyst.InternalRow 
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow 
import org.apache.spark.sql.types._ 

@SQLUserDefinedType(udt = classOf[AUDT]) 
case class A(list:Seq[B]) 

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) 
    override def userClass: Class[A] = classOf[A] 
    override def serialize(obj: Any): Any = obj match { 
    case A(list) => 
     val row = new GenericMutableRow(1) 
     row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) 
    } 
    } 
} 

object AUDT extends AUDT 

@SQLUserDefinedType(udt = classOf[BUDT]) 
case class B(num:Int) 

class BUDT extends UserDefinedType[B] { 
    override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) 
    override def userClass: Class[B] = classOf[B] 
    override def serialize(obj: Any): Any = obj match { 
    case B(num) => 
     val row = new GenericMutableRow(1) 
     row.setInt(0, num) 
     row 
    } 

    override def deserialize(datum: Any): B = { 
    datum match { 
     case row: InternalRow => new B(row.getInt(0)) 
    } 
    } 
} 

object BUDT extends BUDT 

object TestNested { 
    def main(args:Array[String]) = { 
    val col = Seq(new A(Seq(new B(1), new B(2))), 
        new A(Seq(new B(3), new B(4)))) 

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(1 to 2 zip col).toDF() 
    df.show() 

    df.write.mode(SaveMode.Overwrite).save(...) 
    } 
} 

Diese in den folgenden Fehlern führt:

15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):

Wenn ein einen Datenrahmen mit B speichern anstelle von A kein Problem existiert, da B wie keine geschachtelte benutzerdefinierte Klasse. Fehle ich etwas?

Antwort

2

Ich musste vier Änderungen an Ihrem Code vornehmen, damit es funktioniert (getestet in Spark 1.6.0 unter Linux) und ich glaube, ich kann meistens erklären, warum sie benötigt werden. Ich frage mich jedoch, ob es eine einfachere Lösung gibt. Alle Änderungen sind in AUDT wie folgt:

  1. Wenn sqlType definieren, machen es auf BUDT.sqlType abhängen, anstatt nur BUDT.
  2. In , rufen Sie BUDT.serialize() auf jedem Listenelement.
  3. In deserialize():
    • Anruf toArray(BUDT.sqlType) statt toArray(BUDT)
    • Anruf BUDT.deserialize() auf jedem Element

Hier ist der resultierende Code:

class AUDT extends UserDefinedType[A] { 
    override def sqlType: DataType = 
    StructType(
     Seq(StructField("list", 
         ArrayType(BUDT.sqlType, containsNull = false), 
         nullable = true))) 

    override def userClass: Class[A] = classOf[A] 

    override def serialize(obj: Any): Any = 
    obj match { 
     case A(list) => 
     val row = new GenericMutableRow(1) 
     val elements = 
      list.map(_.asInstanceOf[Any]) 
       .map(e => BUDT.serialize(e)) 
       .toArray 
     row.update(0, new GenericArrayData(elements)) 
     row 
    } 

    override def deserialize(datum: Any): A = { 
    datum match { 
     case row: InternalRow => 
     val first = row.getArray(0) 
     val bs:Array[InternalRow] = first.toArray(BUDT.sqlType) 
     val bseq = bs.toSeq.map(e => BUDT.deserialize(e)) 
     val a = new A(bseq) 
     a 
    } 
    } 

} 

Alle vier ch Anges haben das gleiche Charakter: Die Beziehung zwischen der Behandlung von A s und der Handhabung von B s ist jetzt sehr explizit: für Schematypisierung, für Serialisierung und für Deserialisierung. Der ursprüngliche Code scheint auf der Annahme zu basieren, dass Spark SQL "es einfach herausfinden wird", was vernünftig sein könnte, aber anscheinend nicht.

+0

Das hat funktioniert. Ich verwende "komplexe" Objekte als Spalten in einem DataFrame und in Spark 1.6.0 funktioniert es nicht mehr. Das hat den Trick gemacht, also ist die Lektion, die ich gelernt habe, einfach alles bezüglich der Serialisierung/Deserialisierung sehr explizit zu machen. Prost! –

Verwandte Themen