2016-10-12 3 views
9

ich diese Python-Code haben, die lokal in einem Pandas Datenrahmen läuft Anwendung:UDF auf GroupedData in PySpark (mit funktionierenden Python Beispiel)

df_result = pd.DataFrame(df 
          .groupby('A') 
          .apply(lambda x: myFunction(zip(x.B, x.C), x.name)) 

Ich möchte laufen diese in PySpark, aber mit pyspark Umgang Probleme, .sql.group.GroupedData-Objekt.

Ich habe folgendes versucht:

sparkDF 
.groupby('A') 
.agg(myFunction(zip('B', 'C'), 'A')) 

die

KeyError: 'A' 

ich zurück nehme an, weil ‚A‘ ist nicht mehr eine Spalte und ich kann nicht das Äquivalent für x.name finden .

Und dann

sparkDF 
.groupby('A') 
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
.toDF() 

aber erhalten folgende Fehlermeldung:

AttributeError: 'GroupedData' object has no attribute 'map' 

Irgendwelche Vorschläge geschätzt werden würde wirklich!

+0

Was ist 'myFunction' und was steckt in' sparkDF'? Bitte machen Sie Ihren Code reproduzierbar, indem Sie Beispieldatensatz, erwartete Ausgabe und spezifischen Code teilen. Bis dahin bleibt Ihre Frage zu weit gefasst. – mtoto

Antwort

20

Sie versuchen, eine UDAF (benutzerdefinierte Aggregatfunktion) im Gegensatz zu einer UDF (benutzerdefinierte Funktion) zu schreiben. UDAFs sind Funktionen, die an Daten arbeiten, die nach einem Schlüssel gruppiert sind. Insbesondere müssen sie definieren, wie mehrere Werte in der Gruppe in einer einzelnen Partition zusammengeführt werden und anschließend die Ergebnisse über Partitionen für den Schlüssel zusammengeführt werden. Es gibt derzeit keine Möglichkeit in Python einen UDAF zu implementieren, sie können nur in Scala implementiert werden.

Aber Sie können es in Python umgehen. Sie können den Sammlungssatz verwenden, um die gruppierten Werte zu sammeln, und dann eine reguläre UDF verwenden, um mit ihnen zu arbeiten, was Sie wollen. Der einzige Vorbehalt ist, dass collect_set nur für primitive Werte verwendet wird. Sie müssen sie daher in eine Zeichenfolge codieren.

from pyspark.sql.types import StringType 
from pyspark.sql.functions import col, collect_list, concat_ws, udf 

def myFunc(data_list): 
    for val in data_list: 
     b, c = data.split(',') 
     # do something 

    return <whatever> 

myUdf = udf(myFunc, StringType()) 

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \ 
    .groupBy('A').agg(collect_list('data').alias('data')) 
    .withColumn('data', myUdf('data')) 

Verwenden Sie collect_set, wenn Sie deduplizieren möchten. Wenn Sie viele Werte für einige Ihrer Schlüssel haben, wird dies langsam sein, da alle Werte für einen Schlüssel in einer einzigen Partition irgendwo in Ihrem Cluster gesammelt werden müssen. Wenn Ihr Endergebnis ein Wert ist, den Sie durch Kombinieren der Werte pro Schlüssel auf irgendeine Weise (z. B. Summieren) erstellen, ist es möglicherweise schneller, sie mit der Methode RDD aggregateByKey zu implementieren, mit der Sie vor dem Mischen einen Zwischenwert für jeden Schlüssel in einer Partition erstellen können Daten herum.

5

Seit Spark 2.3 (jetzt in Entwicklung) können Sie pandas_udf verwenden. Gruppenaggregatvarianten nehmen eine Funktion an, die von Pandas DataFrame mit der gleichen Form wie der Eingang auf den Ausgang DataFrame abbildet.Zum Beispiel, wenn Daten wie folgt aussehen:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)], 
    ("key", "value1", "value2")) 
) 

und Sie mögen Durchschnittswert von paarweise min berechnen zwischen value1value2, müssen Sie Ausgabeschema definieren:

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("key", StringType()), 
    StructField("avg_min", DoubleType()) 
]) 

pandas_udf:

from pyspark.sql.functions import pandas_udf 
from pyspark.sql.functions import PandasUDFType 

@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) 
def g(df): 
    result = pd.DataFrame(df.groupby(df.key).apply(
     lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean() 
    )) 
    result.reset_index(inplace=True, drop=False) 
    return result 

und wenden Sie es an:

df.groupby("key").apply(g).show() 
+---+-------+ 
|key|avg_min| 
+---+-------+ 
| b| -1.5| 
| a| -0.5| 
+---+-------+ 

Mit Ausnahme von Schemadefinition und Decorator kann Ihr aktueller Pandas-Code unverändert angewendet werden.