2015-07-24 4 views
12

Ich versuche gerade, eine Datenbank aus MongoDB zu extrahieren und Spark in ElasticSearch mit geo_points aufzunehmen.So fügen Sie eine neue Struct-Spalte zu einem DataFrame hinzu

Die Mongo-Datenbank hat Breiten- und Längenwerte, aber ElasticSearch erfordert, dass sie in den Typ geo_point umgewandelt werden.

Gibt es eine Möglichkeit in Funken die lat und lon Spalten auf eine neue Spalte zu kopieren, die ein array oder struct ist?

Jede Hilfe wird geschätzt!

Antwort

33

Ich nehme an, Sie mit irgendeiner Art von flachen Schema wie folgt beginnen:

root 
|-- lat: double (nullable = false) 
|-- long: double (nullable = false) 
|-- key: string (nullable = false) 

Zuerst lässt Beispieldaten erstellen:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{col, udf} 
import org.apache.spark.sql.types._ 

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil) 

val schema = StructType(
    StructField("lat", DoubleType, false) :: 
    StructField("long", DoubleType, false) :: 
    StructField("key", StringType, false) ::Nil) 

val df = sqlContext.createDataFrame(rdd, schema) 

Eine einfache Möglichkeit ist es, eine UDF und Fall-Klasse zu verwenden:

case class Location(lat: Double, long: Double) 
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long)) 

val dfRes = df. 
    withColumn("location", makeLocation(col("lat"), col("long"))). 
    drop("lat"). 
    drop("long") 

dfRes.printSchema 

und wir bekommen

root 
|-- key: string (nullable = false) 
|-- location: struct (nullable = true) 
| |-- lat: double (nullable = false) 
| |-- long: double (nullable = false) 

Ein harter Weg, um Ihre Daten und gelten Schema danach zu transformieren:

val rddRes = df. 
    map{case Row(lat, long, key) => Row(key, Row(lat, long))} 

val schemaRes = StructType(
    StructField("key", StringType, false) :: 
    StructField("location", StructType(
     StructField("lat", DoubleType, false) :: 
     StructField("long", DoubleType, false) :: Nil 
    ), true) :: Nil 
) 

sqlContext.createDataFrame(rddRes, schemaRes).show 

und wir bekommen eine erwartete Ausgabe

+------+-------------+ 
| key|  location| 
+------+-------------+ 
|Warsaw|[52.23,21.01]| 
| Corte| [42.3,9.15]| 
+------+-------------+ 

verschachtelten Schema Erstellen von Grund auf kann sehr mühsam sein, so, wenn Sie können Ich würde den ersten Ansatz empfehlen. Es kann leicht erweitert werden, wenn Sie komplexere Struktur benötigen:

case class Pin(location: Location) 
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long)) 

df. 
    withColumn("pin", makePin(col("lat"), col("long"))). 
    drop("lat"). 
    drop("long"). 
    printSchema 

und wir erhalten Ausgang erwartet:

root 
|-- key: string (nullable = false) 
|-- pin: struct (nullable = true) 
| |-- location: struct (nullable = true) 
| | |-- lat: double (nullable = false) 
| | |-- long: double (nullable = false) 

Leider haben Sie keine Kontrolle über nullable Feld also, wenn für Ihr Projekt wichtig ist, du wirst muss ein Schema angeben.

Schließlich können Sie struct Funktion in 1.4 eingeführt verwenden:

import org.apache.spark.sql.functions.struct 

df.select($"key", struct($"lat", $"long").alias("location")) 
+0

Thanks @ zero323 für Ihre ausführliche Antwort! Dies hilft einem Haufen. Würdest du wissen, wie ich diese Zuordnung für verschachtelte Typen rekursiv durchführen könnte? Diese Daten sind hässlicher als ich gehofft hatte. –

+0

Ich sehe keinen Grund, warum Sie nicht können. – zero323

+0

Hi @ zero323 - Wissen Sie, ob es Ihre UDF-Methode zum Erstellen einer Struktur gibt, wenn mehr als 10 Spalten in der neuen Struktur vorhanden sind? UDFs scheinen 10 Eingabeparameter zu begrenzen. –

1

Versuchen Sie folgendes:

import org.apache.spark.sql.functions._ 

df.registerTempTable("dt") 

dfres = sql("select struct(lat,lon) as colName from dt") 
Verwandte Themen