Wenn Sie nur einige Reihen erwarten könnten Sie zunächst alle distinct
Werte erhalten, sammeln sie als List
und verwandeln sie in eine BroadCast
. Im Folgenden zeige ich ein schmutziges Beispiel feststellen, dass es nicht, dass die Ausgabe sortiert werden garantiert wird (es wahrscheinlich Ansätze besser sein könnte, aber dies ist das erste, was mir in den Sinn kommt):
// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)
val sdd = rdd.map{
case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)
}
sdd.collect()
// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))
In der zweite Ansatz sortiere ich mit Spark-Funktionalität, in der RDD's documentation können Sie finden, wie zipWithIndex
und keyBy
arbeiten.
//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
.join(distincts.keyBy(_._1))
.map{
case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
}.collect()
//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))
By the way, verwende ich collect
nur zur Visualisierung, in einer realen Anwendung, die Sie es nicht, wenn Sie sicher sind, verwenden sollte es im Fahrer Speicher passt.
Wie viele verschiedene Ergebnisse erwarten Sie? Tausende, Millionen? –
Sind auch Ränge eingepflegt oder erwarten Sie, dass der Rang von der Art der Eingabe abgeleitet wird? Der Kommentar über das Anwenden des Rangs auf den Index macht das für mich unklar – brycemcd
@AlbertoBonsanto, so gibt es mehrere Fälle, rangieren alle oder nur Top 10 oder 20. Ich muss alle Fälle unterstützen. so antworten alle Millionen. – happybayes