2015-04-28 5 views
19

Ich möchte diese bestimmte Apache Spark mit Python-Lösung teilen, weil die Dokumentation dafür ziemlich schlecht ist.Berechnung der Durchschnittswerte für jede KEY in einer paarweisen (K, V) RDD in Spark mit Python

Ich wollte den Durchschnittswert von K/V-Paaren (gespeichert in einer paarweisen RDD), von KEY. Hier ist, was die Beispieldaten wie folgt aussehen:

>>> rdd1.take(10) # Show a small sample. 
[(u'2013-10-09', 7.60117302052786), 
(u'2013-10-10', 9.322709163346612), 
(u'2013-10-10', 28.264462809917358), 
(u'2013-10-07', 9.664429530201343), 
(u'2013-10-07', 12.461538461538463), 
(u'2013-10-09', 20.76923076923077), 
(u'2013-10-08', 11.842105263157894), 
(u'2013-10-13', 32.32514177693762), 
(u'2013-10-13', 26.249999999999996), 
(u'2013-10-13', 10.693069306930692)] 

Nun ist die folgende Codesequenz ein suboptimal Weg, es zu tun, aber es funktioniert. Es war, was ich getan habe, bevor ich eine bessere Lösung gefunden habe. Es ist nicht schlimm, aber - wie Sie in der Antwort sehen werden - gibt es einen prägnanteren, effizienteren Weg.

>>> import operator 
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} 
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs). 
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) 
>>> print(rdd1.collect()) 
    [(u'2013-10-09', 11.235365503035176), 
    (u'2013-10-07', 23.39500642456595), 
    ... snip ... 
    ] 

Antwort

31

Jetzt ein viel besserer Weg, dies zu tun ist, um die rdd.aggregateByKey() Methode zu verwenden. Weil diese Methode in der Apache Spark mit Python-Dokumentation so schlecht dokumentiert ist - und deshalb habe ich diese Q & A geschrieben - bis vor kurzem hatte ich die obige Codefolge verwendet. Aber wieder, es ist weniger effizient, so vermeiden tun es so, wenn es notwendig ist.

Hier ist, wie das gleiche tun, um die rdd.aggregateByKey() Methode ( empfohlen) ...

von KEY, berechnet gleichzeitig die Summe (den Zähler für den Durchschnitt, den wir berechnen wollen) und COUNT (die Nenner für den Durchschnitt, den wir berechnen wollen):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function. 
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1), 
             lambda a,b: (a[0] + b[0], a[1] + b[1])) 

Wo gilt folgendes über die Bedeutung der einzelnen a und b Paar oben (so können Sie sich vorstellen, was passiert):

First lambda expression for Within-Partition Reduction Step:: 
    a: is a TUPLE that holds: (runningSum, runningCount). 
    b: is a SCALAR that holds the next Value 

    Second lambda expression for Cross-Partition Reduction Step:: 
    a: is a TUPLE that holds: (runningSum, runningCount). 
    b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount). 

Schließlich berechnen Sie den Durchschnitt für jeden KEY und sammeln Ergebnisse.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect() 
>>> print(finalResult) 
     [(u'2013-09-09', 11.235365503035176), 
     (u'2013-09-01', 23.39500642456595), 
     (u'2013-09-03', 13.53240060820617), 
     (u'2013-09-05', 13.141148418977687), 
    ... snip ... 
    ] 

Ich hoffe, diese Frage & Antwort mit aggregateByKey() helfen. Wenn das der Fall ist, vergiss nicht, auch die Frage zu beantworten. Vielen Dank. (◠﹏◠)

+0

Das ist wirklich eine großartige Antwort. Ich werde jedoch feststellen, dass dies aufgrund von [PEP 3113] (https://www.python.org/dev/peps/pep-3113/) nur mit Python 2.x kompatibel ist, da das Tupel-Entpacken in Lambda-Ausdrücken nein ist länger unterstützt in Python 3.x – Tgsmith61591

+0

@ Tgsmith61591 Vielen Dank. Ich habe die Zwischenvariable "aTuple" hinzugefügt, um dies zu beheben. (Seufz, ich konnte mir keinen besseren Bezeichner nennen, LoL). Schöner Fang auf PEP 3113! –

3

Meiner Meinung nach einem lesbaren entspricht einen aggregateByKey mit zwei Lambda-Ausdrücke ist:

rdd1 = rdd1 \ 
    .mapValues(lambda v: (v, 1)) \ 
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) 

Auf diese Weise wird die gesamte durchschnittliche Berechnung wäre:

avg_by_key = rdd1 \ 
    .mapValues(lambda v: (v, 1)) \ 
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \ 
    .mapValues(lambda v: v[0]/v[1]) \ 
    .collectAsMap() 
0

Nur Hinzufügen einer Notiz über eine intuitive und kürzere (aber eine schlechte) Lösung für dieses Problem. Das Buch Sam's Teach Yourself Apache Spark in 24 Hours hat dieses Problem im letzten Kapitel gut erklärt.

Mit groupByKey man kann das Problem leicht wie folgt lösen:

rdd = sc.parallelize([ 
     (u'2013-10-09', 10), 
     (u'2013-10-09', 10), 
     (u'2013-10-09', 13), 
     (u'2013-10-10', 40), 
     (u'2013-10-10', 45), 
     (u'2013-10-10', 50) 
    ]) 

rdd \ 
.groupByKey() \ 
.mapValues(lambda x: sum(x)/len(x)) \ 
.collect() 

Ausgang:

[('2013-10-10', 45.0), ('2013-10-09', 11.0)] 

Dies ist intuitiv und ansprechend, aber Sie es nicht verwenden! groupByKey führt keine Kombination auf den Mappern durch und bringt alle einzelnen Schlüsselwertpaare zum Reduzierer.

Vermeiden Sie groupByKey so viel wie möglich. Gehen Sie mit der reduceByKey Lösung wie @ pat's.

0

Eine leichte Verbesserung der Antwort von prismalytics.io.

Es könnte einen Fall geben, in dem die Berechnung der Summe zu einer Überlaufzahl führen könnte, weil wir eine große Anzahl von Werten summieren. Wir könnten stattdessen die Durchschnittswerte beibehalten und den Mittelwert aus dem Durchschnitt berechnen und die Anzahl der Teile reduzieren.

Wenn Sie zwei Teile mit Mittelwert haben und als (a1, c1) und (a2, c2) zählen, ist der Gesamtdurchschnitt: Summe/Anzahl = (Gesamt1 + Gesamt2)/(Anzahl1 + Anzahl2) = (a1 * c1 + a2 * c2)/(c1 + c2)

Wenn wir R = c2/c1 markieren, kann es weiter geschrieben werden als a1/(1 + R) + a2 * R/(1 + R) Wenn wir markieren Ri weiter als 1/(1 + R), wir es als a1 * Ri + a2 * R * Ri

Dieser Ansatz für Schlüssel-Wert umgewandelt werden können, indem einfach mapValues ​​schreiben
myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10]) 
sumcount_rdd = myrdd.map(lambda n : (n, 1)) 
def avg(A, B): 
    R = 1.0*B[1]/A[1] 
    Ri = 1.0/(1+R); 
    av = A[0]*Ri + B[0]*R*Ri 
    return (av, B[1] + A[1]); 

(av, counts) = sumcount_rdd.reduce(avg) 
print(av) 

anstelle von map und reduceByKey statt reduzieren.

Dies ist aus: https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

Verwandte Themen