Wie man groupbyKey zu reduceByKey in pyspark umwandeln. Ich habe einen Ausschnitt beigefügt. Dies wird ein Korr für jede Region-Woche-Kombination anwenden. Ich habe groupbyKey verwendet, aber es ist sehr langsam und Shuffle-Fehler (ich habe 10-20GB Daten und jede Gruppe wird 2-3GB haben). Bitte helfen Sie mir in Umschreiben dies mit reduceByKeyConvert groupBYKey zu ReduceByKey Pyspark
Datensatz
region dept week val1 valu2
US CS 1 1 2
US CS 2 1.5 2
US CS 3 1 2
US ELE 1 1.1 2
US ELE 2 2.1 2
US ELE 3 1 2
UE CS 1 2 2
Ausgang
region dept corr
US CS 0.5
US ELE 0.6
UE CS .3333
-Code
def testFunction (key, value):
for val in value:
keysValue = val.asDict().keys()
inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue])
pdDF = pd.DataFrame(inputpdDF, columns = keysValue)
corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0]
corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}
finalRDD.append(Row(**corrDict))
return finalRDD
resRDD = df.select(["region", "dept", "week", "val1", "val2"])\
.map(lambda r: (Row(region= r.region, dept= r.dept), r))\
.groupByKey()\
.flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1])))
reduceByKey unterscheidet sich von groupByKey in einigen Punkten, aber der Hauptunterschied ist der Unterschied zwischen aggregate - groupby erträgen (key,) während reduce produziert (key, aggregate z. B. summe von ). Um von einem zum anderen zu schreiben bedeutet zu verstehen, wie wir eine einzige Funktion (Aggregator) über die Daten haben können. Beachten Sie, dass ich mich nicht darum gekümmert habe, auf Ihre "Testfunktion" zu schauen. –
Chinny84
@ Chinny84 Entschuldigung, ich verpasste das erforderliche output Format früher. Ist es möglich, mich zu alternativen Ansätzen zu führen? – Harish