Es tut mir leid, aber ich muss erneut eine Frage stellen. Ich hoffe, dass dieser nicht dupliziert wird. Ich habe die last one bearbeitet, aber ich denke, niemand hat die bearbeitete Version gesehen. Dies ist ein kurzes Beispiel für das Problem:Spark DataFrame Kartenfehler
val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()
val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)
val new_df = //Add hundred of new columns
//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)
Der Fehler:
error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
Was ich tun möchte, ist, jede Zeile zu ändern. In diesem Fall weiß ich, dass es nur 1 Spalte gibt und ich könnte damit umgehen wie Encoder error while trying to map dataframe row to updated row. Aber wie kann ich das Problem lösen, wenn ich Hunderte von Spalten habe? Ich möchte einige Zeilen entfernen, wenn sie eine Bedingung nicht erfüllen. Im Moment benutze ich:
val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)
Aber ich glaube nicht, dass dies die Lösung besten ist. Ich betreibe auch in einem Stackoverflow:
Exception in thread "main" java.lang.StackOverflowError
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
TY für Hilfe :)
Ich möchte DF verwenden, weil ich das Schema brauche. Gibt es eine kartenähnliche Funktion für DF? Ich möchte eine Zeile in einem DF für eine Bedingung löschen oder erweitern. –