2016-04-28 16 views
0

Ich wollte in Funken tun Ranking wie folgt:Sortieren und Ranking in Apache Spark Scala?

Eingang:

5.6 
5.6 
5.6 
6.2 
8.1 
5.5 
5.5 

Ranks:

1 
1 
1 
2 
3 
0 
0 
0 

Ausgang:

Rank Input 
0  5.5 
0  5.5 
1  5.6 
1  5.6 
1  5.6 
2  6.2 
3  8.1 

Ich wollte wissen, wie ich diese in Spark sortieren kann und auch die gleiche Rangfolge wie oben aufgeführt bekommen. Die Anforderungen sind:

  1. Ranking beginnt mit 0 nicht 1
  2. dies für Millionen von Datensätzen eine Probe Fall ist, und eine Partition kann sehr groß sein - ich Empfehlung schätzen, wie Verfahren unter Verwendung eines internen Sortier Rang

Ich wollte dies in Scala tun. Kann mir jemand dabei helfen Code zu schreiben?

+0

Wie viele verschiedene Ergebnisse erwarten Sie? Tausende, Millionen? –

+0

Sind auch Ränge eingepflegt oder erwarten Sie, dass der Rang von der Art der Eingabe abgeleitet wird? Der Kommentar über das Anwenden des Rangs auf den Index macht das für mich unklar – brycemcd

+0

@AlbertoBonsanto, so gibt es mehrere Fälle, rangieren alle oder nur Top 10 oder 20. Ich muss alle Fälle unterstützen. so antworten alle Millionen. – happybayes

Antwort

2

Wenn Sie nur einige Reihen erwarten könnten Sie zunächst alle distinct Werte erhalten, sammeln sie als List und verwandeln sie in eine BroadCast. Im Folgenden zeige ich ein schmutziges Beispiel feststellen, dass es nicht, dass die Ausgabe sortiert werden garantiert wird (es wahrscheinlich Ansätze besser sein könnte, aber dies ist das erste, was mir in den Sinn kommt):

// Case 1. k is small (fits in the driver and nodes) 
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2)) 
val distincts = rdd.distinct.collect.sortBy(x => x) 
val broadcast = sc.broadcast(distincts) 

val sdd = rdd.map{ 
    case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i) 
} 

sdd.collect() 

// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2)) 

In der zweite Ansatz sortiere ich mit Spark-Funktionalität, in der RDD's documentation können Sie finden, wie zipWithIndex und keyBy arbeiten.

//case 2. k is big, distinct values don't fit in the Driver. 
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2)) 
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex 
rdd.keyBy(x => x) 
    .join(distincts.keyBy(_._1)) 
    .map{ 
    case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value) 
    }.collect() 

//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2)) 

By the way, verwende ich collect nur zur Visualisierung, in einer realen Anwendung, die Sie es nicht, wenn Sie sicher sind, verwenden sollte es im Fahrer Speicher passt.

+0

vielen Dank. es liefert die erwarteten Ergebnisse.Da ich mich mehr um die Leistung sorge, wollte ich die Interna verstehen, wie die Sortierung abläuft. In dem Fall, in dem, wenn ein Schlüssel mit 100k Datensätzen, die Partition wäre riesig, so wundern sich ist sortby ist die einzige Option oder einen Vorschlag zur Verwendung einer Bibliothek. Ich benutze das gleiche in Python mit Numpy und es ist wirklich gut sortiert. Etwas ähnliches suchen. – happybayes

+0

Das Problem ist, dass, wenn k riesig ist, die Art und Weise, wie sorten durch Funken ausgeführt wird, durch das Verschieben mehrerer Werte zwischen Partitionen erfolgt, was wirklich ineffizient ist; aber ich werde es als Fall 2 hinzufügen. –

+0

Vielen Dank, um mir dabei zu helfen. viel zu schätzen wissen. – happybayes