2016-09-20 2 views
1

Wie man groupbyKey zu reduceByKey in pyspark umwandeln. Ich habe einen Ausschnitt beigefügt. Dies wird ein Korr für jede Region-Woche-Kombination anwenden. Ich habe groupbyKey verwendet, aber es ist sehr langsam und Shuffle-Fehler (ich habe 10-20GB Daten und jede Gruppe wird 2-3GB haben). Bitte helfen Sie mir in Umschreiben dies mit reduceByKeyConvert groupBYKey zu ReduceByKey Pyspark

Datensatz

region dept week val1 valu2 
US CS 1  1 2 
US CS 2  1.5 2 
US CS 3  1 2 
US ELE 1  1.1 2 
US ELE 2  2.1 2 
US ELE 3  1 2 
UE CS 1  2 2 

Ausgang

region dept corr 
US  CS 0.5 
US  ELE 0.6 
UE  CS .3333 

-Code

def testFunction (key, value): 
    for val in value: 
     keysValue = val.asDict().keys() 
     inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue]) 
    pdDF = pd.DataFrame(inputpdDF, columns = keysValue) 
    corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0] 
    corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}     
    finalRDD.append(Row(**corrDict)) 
    return finalRDD 

resRDD = df.select(["region", "dept", "week", "val1", "val2"])\ 
      .map(lambda r: (Row(region= r.region, dept= r.dept), r))\ 
      .groupByKey()\ 
      .flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1]))) 
+1

reduceByKey unterscheidet sich von groupByKey in einigen Punkten, aber der Hauptunterschied ist der Unterschied zwischen aggregate - groupby erträgen (key, ) während reduce produziert (key, aggregate z. B. summe von ). Um von einem zum anderen zu schreiben bedeutet zu verstehen, wie wir eine einzige Funktion (Aggregator) über die Daten haben können. Beachten Sie, dass ich mich nicht darum gekümmert habe, auf Ihre "Testfunktion" zu schauen. – Chinny84

+0

@ Chinny84 Entschuldigung, ich verpasste das erforderliche output Format früher. Ist es möglich, mich zu alternativen Ansätzen zu führen? – Harish

Antwort

0

Versuchen:

>>> from pyspark.sql.functions import corr 
>>> df.groupBy("region", "dept").agg(corr("val1", "val2")) 
+0

Danke, das wird funktionieren .. ich kopierte nur 2 Spalten .. eigentlich sollte meine corr Berechnung mit Val1 mit Val2, Val1 mit Val3, Val1 mit Val4 ... Val1 mit Valn (Nte Spalte) ich plane, wie zu tun Diese aggList = [func.corr ("val1", col) .alias (Spaltenname) für col in Spalten] df.groupBy ("region", "dept"). agg (* aggList) .... Ich denke, das sollte Arbeit . Das nächste größere Problem ist, dass ich statsmodels.formula.api.ols() auf die selbe Gruppe anwenden muss, was mit groupByKey sehr langsam ist. Haben wir einen anderen Weg? Ich habe versucht, MLLIB, die nicht für uns arbeiten (ich brauche geschlossene Form Lösung) – Harish

+0

Sie können mehrere Aggregate durchführen. Ich kann nicht mit Formel helfen. –

Verwandte Themen