Ich nehme an, Sie mit irgendeiner Art von flachen Schema wie folgt beginnen:
root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)
Zuerst lässt Beispieldaten erstellen:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)
val df = sqlContext.createDataFrame(rdd, schema)
Eine einfache Möglichkeit ist es, eine UDF und Fall-Klasse zu verwenden:
case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")
dfRes.printSchema
und wir bekommen
root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)
Ein harter Weg, um Ihre Daten und gelten Schema danach zu transformieren:
val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}
val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)
sqlContext.createDataFrame(rddRes, schemaRes).show
und wir bekommen eine erwartete Ausgabe
+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+
verschachtelten Schema Erstellen von Grund auf kann sehr mühsam sein, so, wenn Sie können Ich würde den ersten Ansatz empfehlen. Es kann leicht erweitert werden, wenn Sie komplexere Struktur benötigen:
case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema
und wir erhalten Ausgang erwartet:
root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)
Leider haben Sie keine Kontrolle über nullable
Feld also, wenn für Ihr Projekt wichtig ist, du wirst muss ein Schema angeben.
Schließlich können Sie struct
Funktion in 1.4 eingeführt verwenden:
import org.apache.spark.sql.functions.struct
df.select($"key", struct($"lat", $"long").alias("location"))
Thanks @ zero323 für Ihre ausführliche Antwort! Dies hilft einem Haufen. Würdest du wissen, wie ich diese Zuordnung für verschachtelte Typen rekursiv durchführen könnte? Diese Daten sind hässlicher als ich gehofft hatte. –
Ich sehe keinen Grund, warum Sie nicht können. – zero323
Hi @ zero323 - Wissen Sie, ob es Ihre UDF-Methode zum Erstellen einer Struktur gibt, wenn mehr als 10 Spalten in der neuen Struktur vorhanden sind? UDFs scheinen 10 Eingabeparameter zu begrenzen. –