2016-05-05 8 views
1

Ich habe eine lokale PSQL-Datenbank auf meinem Computer. Einige Spalten enthalten die darin enthaltenen Daten als Array. (Beispiel unten)Lesen eines Spark-Datenrahmens als Array-Typ von Postgres DB

+--------------------+ 
|   _authors| 
+--------------------+ 
|[u'Miller, Roger ...| 
|[u'Noyes, H.Pierre']| 
|[u'Berman, S.M.',...| 
+--------------------+ 
only showing top 3 rows 

root 
|-- _authors: string (nullable = true) 

Ich muss sie als Array/Wrapped-Array lesen. Wie erreiche ich das?

Ich muss dieses Array explodieren/in späteren Phasen meiner Pipeline abflachen.

Danke,

+1

Haben Sie versucht, dem Leser '.schema (s: StructType)' hinzuzufügen? Sie müssen das vollständige Schema als StructType-Objekt übergeben –

+0

@DanieldePaula Ich konnte keine Beispiele finden. Könnten Sie bitte näher ausführen? Vielen Dank –

Antwort

1

ich zwei Vorschläge für Sie haben Problem:

1) Ich bin nicht sicher, es für Arrays funktioniert, aber es ist einen Versuch wert: Es ist möglich, ein bestimmtes Schema zu definieren, wenn ein Lesen Datenrahmen von einer Quelle. Beispiel:

val customSchema = StructType(Seq(
    StructField("_authors", DataTypes.createArrayType(StringType), true), 
    StructField("int_column", IntegerType, true), 
    // other columns... 
)) 

val df_records = sqlContext.read 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://localhost:5432/dbname") 
    .option("driver", "org.postgresql.Driver") 
    .option("dbtable", "public.records") 
    .option("user", "name") 
    .option("password", "pwd") 
    .schema(customSchema) 
    .load() 

df_records.select("_authors").show() 

2) Wenn die andere Option, ich nur von der Definition einer Parsen UDF denken kann im Moment nicht funktioniert:

val splitString: (String => Seq[String]) = { s => 
    val seq = s.split(",").map(i => i.trim).toSeq 

    // Remove "u[" from the first element and "]" from the last: 
    Seq(seq(0).drop(2)) ++ 
    seq.drop(1).take(seq.length-2) ++ 
     Seq(seq.last.take(seq.last.length-1)) 
} 

import org.apache.spark.sql.functions._ 
val newDF = df_records 
    .withColumn("authors_array", udf(splitString).apply(col("_authors"))) 

Verwandte Themen