0

Es tut mir leid, aber ich muss erneut eine Frage stellen. Ich hoffe, dass dieser nicht dupliziert wird. Ich habe die last one bearbeitet, aber ich denke, niemand hat die bearbeitete Version gesehen. Dies ist ein kurzes Beispiel für das Problem:Spark DataFrame Kartenfehler

val spark = SparkSession 
.builder() 
.appName("test") 
.getOrCreate() 

val field = StructField("1", BooleanType, false) 
val schema = StructType(field::Nil) 
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false))) 
val df = spark.createDataFrame(rowRDD, schema) 

val new_df = //Add hundred of new columns 

//here is the error 
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil) 

Der Fehler:

error: Unable to find encoder for type stored in a Dataset. 
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ 
Support for serializing other types will be added in future releases. 

Was ich tun möchte, ist, jede Zeile zu ändern. In diesem Fall weiß ich, dass es nur 1 Spalte gibt und ich könnte damit umgehen wie Encoder error while trying to map dataframe row to updated row. Aber wie kann ich das Problem lösen, wenn ich Hunderte von Spalten habe? Ich möchte einige Zeilen entfernen, wenn sie eine Bedingung nicht erfüllen. Im Moment benutze ich:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil) 

Aber ich glaube nicht, dass dies die Lösung besten ist. Ich betreibe auch in einem Stackoverflow:

Exception in thread "main" java.lang.StackOverflowError 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) 

TY für Hilfe :)

+0

Ich möchte DF verwenden, weil ich das Schema brauche. Gibt es eine kartenähnliche Funktion für DF? Ich möchte eine Zeile in einem DF für eine Bedingung löschen oder erweitern. –

Antwort

0

Die withColumn() Möglichkeit, neue Spalte hinzugefügt wird auf gesamten Datensatz arbeiten. Und wenn Sie mehr Spalten haben, macht es die Dinge noch schlimmer. Sie können Spark SQL verwenden und eine Abfrage im SQL-Stil haben, um neue Spalten hinzuzufügen. Dies erfordert mehr SQL-Fähigkeiten als nur Funken. Und mit 100 Spalten könnte die Wartung schwierig sein.

Sie können einen anderen Ansatz verfolgen.

Sie können eine RDD in Dataframe konvertieren. Verwenden Sie dann die Karte im Datenrahmen und bearbeiten Sie jede Zeile nach Ihren Wünschen. Innerhalb der Kartenmethode,

a. Sie können neue Werte basierend auf den Berechnungen sammeln

b. Hinzufügen, diese neue Spaltenwerte Haupt RDD wie unten

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

Hier Zeile ist die Referenzreihe von in der Karte Methode

c. Erstellen Sie ein neues Schema wie folgt:

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d. Zum alten Schema hinzufügen

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e. Erstellen Sie einen neuen Datenrahmen mit neuen Spalten

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

Thnx für Ihre Antwort, aber wie kann ich map() auf DataFrame mit vielen Spalten verwenden? Ich erhalte den obigen Fehler. Alle meine Spalten sind boolean. –

+0

Ihre Frage bezieht sich auf das Hinzufügen neuer und mehr Spalten zu bestehenden Datenrahmen. Also die obigen Schritte würden helfen – Ramzy

+0

Die Frage war, wie man map() mit hundert Spalten verwendet, ohne den Fehler zu bekommen? –