2017-03-21 5 views
8

Ich habe eine Spalte "StructType" in Spark Dataframe, die ein Array und eine Zeichenfolge als Unterfelder hat. Ich möchte das Array ändern und die neue Spalte des gleichen Typs zurückgeben. Kann ich es mit UDF verarbeiten? Oder was sind die Alternativen?Spark UDF für StructType/Zeile

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil) 
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil) 
val data = Seq(Row(Row(Array(1,2),"eb")), Row(Row(Array(3,2,1), "dsf"))) 
val rd = sc.parallelize(data) 
val df = spark.createDataFrame(rd, schema) 
df.printSchema 

root 
|-- subtable: struct (nullable = true) 
| |-- col1: array (nullable = true) 
| | |-- element: integer (containsNull = false) 
| |-- col2: string (nullable = true) 

Es scheint, dass ich ein UDF vom Typ Row benötigen, so etwas wie

val u = udf((x:Row) => x) 
     >> Schema for type org.apache.spark.sql.Row is not supported 

Dies macht Sinn, da Funke das Schema für den Rückgabetyp nicht kennt. Leider udf.register nicht zu:

spark.udf.register("foo", (x:Row)=> Row, sub_schema) 
    <console>:30: error: overloaded method value register with alternatives: ... 

Antwort

3

Ja, Sie dies mit UDF tun können. Die Einfachheit halber habe ich dein Beispiel mit Fallklassen und änderte es das Array von 2 auf jeden Wert hinzu:

case class Root(subtable: Subtable) 
case class Subtable(col1: Seq[Int], col2: String) 

val df = spark.createDataFrame(Seq(
    Root(Subtable(Seq(1, 2, 3), "toto")), 
    Root(Subtable(Seq(10, 20, 30), "tata")) 
)) 

val myUdf = udf((subtable: Row) => 
    Subtable(subtable.getSeq[Int](0).map(_ + 2), subtable.getString(1)) 
) 
val result = df.withColumn("subtable_new", myUdf(df("subtable"))) 
result.printSchema() 
result.show(false) 

druckt:

root 
|-- subtable: struct (nullable = true) 
| |-- col1: array (nullable = true) 
| | |-- element: integer (containsNull = false) 
| |-- col2: string (nullable = true) 
|-- subtable_new: struct (nullable = true) 
| |-- col1: array (nullable = true) 
| | |-- element: integer (containsNull = false) 
| |-- col2: string (nullable = true) 

+-------------------------------+-------------------------------+ 
|subtable      |subtable_new     | 
+-------------------------------+-------------------------------+ 
|[WrappedArray(1, 2, 3),toto] |[WrappedArray(3, 4, 5),toto] | 
|[WrappedArray(10, 20, 30),tata]|[WrappedArray(12, 22, 32),tata]| 
+-------------------------------+-------------------------------+ 
2

Sie sind auf dem richtigen Weg. In diesem Szenario wird UDF Ihnen das Leben erleichtern. Wie Sie bereits festgestellt haben, kann UDF keine Typen zurückgeben, von denen Spark nichts weiß. Also im Grunde müssen Sie etwas zurückgeben, das leicht serialisieren kann. Es kann ein case class sein oder Sie können ein Tupel wie (Seq[Int], String) zurückgeben. Also hier eine modifizierte Version des Codes ist:

def main(args: Array[String]): Unit = { 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.types._ 
    val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil) 
    val schema = StructType(StructField("subtable", sub_schema, true) :: Nil) 
    val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf"))) 
    val rd = spark.sparkContext.parallelize(data) 
    val df = spark.createDataFrame(rd, schema) 

    df.printSchema() 
    df.show(false) 

    val mapArray = (subRows: Row) => { 
    // I prefer reading values from row by specifying column names, you may use index also 
    val col1 = subRows.getAs[Seq[Int]]("col1") 
    val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements 
    (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2 
    } 
    val mapUdf = udf(mapArray) 

    val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable"))) 
    newDf.show(false) 
    newDf.printSchema() 
} 

Bitte nehmen Sie sich einen Blick auf diese Links, diese können Sie mehr Einblick geben.

  1. Die umfassendste Antwort auf mit komplexen Schema arbeiten: https://stackoverflow.com/a/33850490/4046067
  2. Spark-unterstützten Datentypen: https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
6

stellt sich heraus, das Ergebnis Schema als zweite UDF-Parameter übergeben können:

val u = udf((x:Row) => x, sub_schema) 
Verwandte Themen