2015-05-11 7 views
11

Say habe ich eine PairRDD als solche (natürlich viel mehr Daten im wirklichen Leben, Millionen von Datensätzen übernehmen):Funke: Get oben N von Schlüssel

val scores = sc.parallelize(Array(
     ("a", 1), 
     ("a", 2), 
     ("a", 3), 
     ("b", 3), 
     ("b", 1), 
     ("a", 4), 
     ("b", 4), 
     ("b", 2) 
)) 

Was der effizienteste Weg ist, eine RDD zu erzeugen, mit die besten 2 Punkte pro Taste?

val top2ByKey = ... 
res3: Array[(String, Int)] = Array((a,4), (a,3), (b,4), (b,3)) 

Antwort

10

Ich denke, das sehr effizient sein sollte:

Herausgegeben nach OP Kommentare:

scores.mapValues(p => (p, p)).reduceByKey((u, v) => { 
    val values = List(u._1, u._2, v._1, v._2).sorted(Ordering[Int].reverse).distinct 
    if (values.size > 1) (values(0), values(1)) 
    else (values(0), values(0)) 
}).collect().foreach(println) 
+0

Dies scheint nicht zu funktionieren?Dies ist die Ausgabe: Array [(String, (Int, Int))] = Array ((a, (4,4)), (b, (4,4))) –

+1

Ich habe das durch die Anpassung der Antwort von user52045 gelöst : val Scores = sc.parallelize (Array ( ("a", 1), ("a", 2), ("a", 3), ("B", 3), ("b", 1), ("a", 4), ("b", 4), ("b", 2) )) scores.mapValues ​​(p => (p, p)) .reduceByKey ((u, v) => { valwerte = Liste (u._1, u._2, v._1, v._2) .sortiert (Bestellung [Int] .reverse) .distinct (Werte (0), Werte (1)) }). Collect() –

+1

@michael_erasmus Sie haben Recht, es gibt einen Fehler in meinem Code. Thx für die Befestigung. Eine Sache müssen Sie vorsichtig sein, denn wenn alle Elemente der Liste identisch sind, erhalten Sie outOfBoudException. – abalcerek

0
scores.reduceByKey(_ + _).map(x => x._2 -> x._1).sortByKey(false).map(x => x._2 -> x._1).take(2).foreach(println) 
+3

Hallo, willkommen zu Stack Overflow. Bitte nicht nur den Code als Antwort ausgeben. Erklären Sie Ihren Gedankengang, damit wir ihn besser verstehen können. Lesen Sie dies, wenn Sie irgendwelche Zweifel haben: http://stackoverflow.com/help/how-to-answer Danke. – Cthulhu

+1

Ich glaube, dass scores.reduceByKey (_ + _) alle Paare mit dem gleichen Schlüssel zusammenbrechen würde, so dass Sie mit einem einzigen (a, N) und einem einzigen (b, M) enden würden, wobei N und M die Summe aller a sind Werte bzw. b Werte. An diesem Punkt würden Sie nur eine einzige (a, N) und keine Menge an Zerlegung zurückbekommen (a, i) und (a, j), wobei i und j die zwei höchsten Werte für alle Paare sind. –

2

Ihre Eingabedaten wurden geringfügig geändert.

val scores = sc.parallelize(Array(
     ("a", 1), 
     ("a", 2), 
     ("a", 3), 
     ("b", 3), 
     ("b", 1), 
     ("a", 4), 
     ("b", 4), 
     ("b", 2), 
     ("a", 6), 
     ("b", 8) 
    )) 

ich erklären, wie es Schritt für Schritt zu tun:

1.Group durch Schlüssel erstellen Array

scores.groupByKey().foreach(println) 

Ergebnis:

(b,CompactBuffer(3, 1, 4, 2, 8)) 
(a,CompactBuffer(1, 2, 3, 4, 6)) 

Wie Sie sehen, jeder Wert selbst ist ein Array von Zahlen. CompactBuffer ist nur ein optimiertes Array.

2.Für jede Taste, umgekehrte Sortierung Liste von Zahlen dieser Wert enthält

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)}).foreach(println) 

Ergebnis:

(b,List(8, 4, 3, 2, 1)) 
(a,List(6, 4, 3, 2, 1)) 

3.Keep nur ersten zwei Elemente aus dem zweiten Schritt, werden sie oben sein 2 Partituren in der Liste

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).foreach(println) 

Ergebnis:

(a,List(6, 4)) 
(b,List(8, 4)) 

4.Flat Karte neu Gekoppelte RDD für jeden Schlüssel und Bestnote

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println) 

Ergebnis zu erstellen:

(b,8) 
(b,4) 
(a,6) 
(a,4) 

5.Optional Schritt - sortiert nach Schlüssel, wenn Sie wollen

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println) 

Ergebnis:

(a,6) 
(a,4) 
(b,8) 
(b,4) 

Hoffnung, diese Erklärung half, die Logik zu verstehen.

Verwandte Themen