15

Ich benutze Pyspark, Laden einer großen CSV-Datei in einen Datenrahmen mit Spark-CSV, und als Vorverarbeitungsschritt muss ich eine Vielzahl von Operationen anwenden zu den Daten, die in einer der Spalten verfügbar sind (die eine JSON-Zeichenfolge enthält). Dadurch werden X-Werte zurückgegeben, von denen jeder in einer eigenen Spalte gespeichert werden muss.Apache Spark - Zuweisen des Ergebnisses von UDF zu mehreren Datenframe-Spalten

Diese Funktionalität wird in einer UDF implementiert. Ich bin mir jedoch nicht sicher, wie ich eine Liste von Werten aus dieser UDF zurückliege und diese in einzelne Spalten füttere. Im Folgenden finden Sie ein einfaches Beispiel:

(...) 
from pyspark.sql.functions import udf 
def udf_test(n): 
    return [n/2, n%2] 

test_udf=udf(udf_test) 


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4) 

, dass die folgenden erzeugt:

+------+----------+--------------------+ 
|amount|trans_date|    test| 
+------+----------+--------------------+ 
| 28.0|2016-02-07|   [14.0, 0.0]| 
| 31.01|2016-02-07|[15.5050001144409...| 
| 13.41|2016-02-04|[6.70499992370605...| 
| 307.7|2015-02-17|[153.850006103515...| 
| 22.09|2016-02-05|[11.0450000762939...| 
+------+----------+--------------------+ 
only showing top 5 rows 

Was ist der beste Weg wäre, Werte, die die zwei (in diesem Beispiel) zu speichern, indem die UDF auf separaten Spalten zurückgegeben werden? Gerade jetzt sie werden als Zeichenfolgen eingegeben:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema() 

root 
|-- amount: float (nullable = true) 
|-- trans_date: string (nullable = true) 
|-- test: string (nullable = true) 

Antwort

25

Es ist nicht möglich, mehrere Top-Level-Spalten von einem einzigen UDF Aufruf zu erstellen, aber Sie können einen neuen struct erstellen. Es erfordert eine UDF mit bestimmten returnType:

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("foo", FloatType(), False), 
    StructField("bar", FloatType(), False) 
]) 

def udf_test(n): 
    return (n/2, n % 2) if n and n != 0.0 else (float('nan'), float('nan')) 

test_udf = udf(udf_test, schema) 
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) 

foobars = df.select(test_udf("y").alias("foobar")) 
foobars.printSchema() 
## root 
## |-- foobar: struct (nullable = true) 
## | |-- foo: float (nullable = false) 
## | |-- bar: float (nullable = false) 

Sie glätten weiter das Schema mit einfachen select:

foobars.select("foobar.foo", "foobar.bar").show() 
## +---+---+ 
## |foo|bar| 
## +---+---+ 
## |1.0|0.0| 
## |1.5|1.0| 
## +---+---+ 

Siehe auch Derive multiple columns from a single column in a Spark DataFrame

+0

Fantastic! Das funktioniert sehr gut für das, was ich brauchte. Ich war fast den ganzen Weg dorthin, aber fütterte das StructType-Schema falsch in das udf, was dazu führte, dass meine neue Spalte stattdessen als StringType endete. Vielen Dank! –

+0

Danke !! Das war genau das, wonach ich suchte. :) – dksahuji