2017-01-26 8 views
2

Ich habe zwei kleine Probleme in Bezug auf mein ein größeres Problem: Ich möchte JSON-Daten einmal pro Tag einlesen und speichern Sie es als Parkett für spätere datenbezogene Arbeit. Arbeiten mit Parkett ist so viel schneller. Aber die Sache, bei der ich feststecke, ist die Tatsache, dass Spark beim Lesen dieses Parketts immer versucht, das Schema aus der Schemadatei zu holen oder einfach das Schema aus der ersten Parkettdatei nimmt und annimmt, dass das Schema für alle Dateien gleich ist. Es gibt jedoch Fälle, in denen wir in einigen Spalten für einige Tage keine Daten haben.So ändern Sie Spark Datentyp Datentypen Spalte in einem Array

Also lassen Sie uns sagen, dass ich eine JSON-Datei mit Daten mit folgendem Schema haben:

root 
|-- Id: long (nullable = true)  
|-- People: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- Name: string (nullable = true) 
| | |-- Amount: double (nullable = true) 

Und dann habe ich noch eine JSON-Datei, wo es keine Daten für die „People“ Spalte ist. Und deshalb das Schema ist die folgende:

root 
|-- Id: long (nullable = true)  
|-- People: array (nullable = true) 
| |-- element: string (containsNull = true) 

Wenn ich würde sie in zusammen mit read.json lesen, Funken geht durch alle Dateien und folgert das fusionierte Schema von diesen, insbesondere von dem ersten und läßt nur die Zeilen aus der zweiten Datei leer, aber das Schema ist korrekt.

Aber wenn ich diese separat lese und auf Parkett getrennt schreibe, dann kann ich sie nicht zusammen lesen, denn für Parquet stimmt das Schema nicht und ich bekomme einen Fehler.

Meine erste Idee war, die Datei mit fehlenden Daten einzulesen und ihr Schema manuell zu ändern, indem ich Spaltentypen auf das erste Schema umwandelt, aber diese manuelle Konvertierung ist fehlerhaft, sie kann nicht synchron sein und ich nicht weiß, wie man diesen String-Typ in Array oder Struct-Typ umwandelt.

Und ein anderes Problem ist, wenn das Feld "Betrag" nur volle ganze Zahlen hat, dann liest Spark sie als longs, aber nicht verdoppelt, als notwendig ist. Aber wenn ich benutze:

val df2 = df.withColumn("People.Amount", col("People.Amount").cast(org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType,true))) 

Dann ist es nicht die Art der ursprünglichen Spalte ändern, sondern fügt eine neue Spalte namens People.Amount

Antwort

1

Ich denke, man kann in der Lage sein, etwas jigger up mit Schema Verschmelzung (siehe Dok. here). Wenn das erste Parkett, das du hast, das richtige Schema hat, könntest du dann so etwas tun, um dieses Schema auf die neuen Parkettböden anzuwenden?

// Read the partitioned table 
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") 
mergedDF.printSchema() 

bearbeiten

Sie sagen, es gibt mehr als 200 Spalten, weißt du sie schon alles? Ich sehe einen Weg nach vorne, und es gibt wahrscheinlich eine Reihe von Möglichkeiten, dies zu erreichen. Zum einen definieren Sie alle Felder, die Sie im Voraus sehen können. Was ich in der Vergangenheit getan habe, ist eine JSON-Datei mit einem einzigen Dummy-Datensatz zu erstellen, der alle Felder enthält, die ich möchte, und genau so eingegeben wird, wie ich sie haben möchte. Dann können Sie diesen Datensatz immer gleichzeitig mit Ihrem Dataset "Monday" oder "Tuesday" laden und nach dem Laden löschen. Es ist wahrscheinlich keine Best Practice, aber so bin ich nach vorne gestolpert.

Die Alternative besteht darin, zu versuchen, einzelne Datensätze im richtigen Schema zu laden/speichern, und das Schema festzulegen, sobald Sie alle Daten geladen haben. Klingt nicht nach dem Weg, den Sie gehen wollen, aber zumindest dann haben Sie dieses spezifische Problem nicht.

+0

Ich bin mir ziemlich sicher, dass ich es schon versucht habe. Gibt es einen Unterschied zwischen Spark 1.6 und 2.0?Aber wie auch immer, wenn wir sagen, dass Montagdaten das erste Schema, Dienstag Daten das zweite Schema (ohne Daten in einigen Feldern) und Mittwoch Daten das erste Schema wieder haben, dann kann ich nie sicher sein, dass das "erste" Parkett die " richtiges "Schema". Wenn ich Montag und Dienstag zusammenlesen will, hat es das, aber wenn ich Dienstag und Mittwoch möchte, dann ist das erste nicht das richtige Schema und es würde nicht funktionieren. Ich werde das noch einmal schnell versuchen, aber ich denke, das hat nicht funktioniert und ich vermisse wahrscheinlich etwas Wichtiges. –

+0

Nun, ja, ich habe es erneut versucht und die Ergebnisse bekommen, die ich erwartet hatte. Dies funktioniert nicht, da das Zusammenführen von Parkett nur funktioniert, wenn Sie Spalten hinzufügen. Wenn einige Spalten einen anderen Typ haben, schlägt dies fehl. Und ich weiß, dass es Sinn macht, aber ich weiß nicht, wie ich diesem Problem begegnen kann. Ich kann Spark das exakte Schema nicht mitteilen, das ich mit 200+ Attributfeldern in einer JSON-Datei in einer verschachtelten Struktur habe. Ich habe den folgenden Fehler erhalten: 'org.apache.spark.SparkException: Fehler beim Zusammenführen der inkompatiblen Datentypen ArrayType (StructType (StructField (Name, StringType, true), StructField (Amount, DoubleType, true)), true) und StringType' –

+0

editiert auf Basis von Kommentaren – flyingmeatball

Verwandte Themen