2017-07-04 6 views
0

Ich habe RDD [(Int, Array [Double])] zB:Sortieren RDD nach einem Array() Inhalt

1, Array(2.0,5.0,6.3) 
5, Array(1.0,3.3,9.5) 
1, Array(5.0,4.2,3.1) 
2, Array(9.6,6.3,2.3) 
1, Array(8.5,2.5,1.2) 
5, Array(6.0,2.4,7.8) 
2, Array(7.8,9.1,4.2) 

Ich habe den eindeutigen Wert der ersten Spalte zu sammeln und die ganze arrangieren RDD nach diesem Array.

val label_array = rdd.map(_._1).collect.distinct 

Ausgabe: Array (1,5,2) und jetzt muss ich Daten ordnen nach label_array.

erforderliche Ausgabe

1, Array(2.0,5.0,6.3) 
1, Array(5.0,4.2,3.1) 
1, Array(8.5,2.5,1.2) 
5, Array(1.0,3.3,9.5) 
5, Array(6.0,2.4,7.8) 
2, Array(9.6,6.3,2.3) 
2, Array(7.8,9.1,4.2) 

I

val ordering = (1,5,2).productIterator.toList.zipWithIndex.toMap 
rdd.sortBy{case (k,v) => ordering(k)} 

versucht haben, aber, wie die erforderliche Leistung zu erhalten, wie die Array (Elemente und Größendifferenz) variiert wird. Wie kann ich RDD nach Array-Format sortieren?

Antwort

0

Gerade zipWithIndex Ihre label_array und Sie sollten

val ordering = label_array.zipWithIndex.map(x => (x._1, x._2)).toMap 

Und Sie sollten Ihre ordering Karte

scala.collection.immutable.Map[Int,Int] = Map(1 -> 0, 5 -> 1, 2 -> 2) 
0

einfacher Art und Weise in Ordnung ist, ein neues RDD mit unterschiedlichen ersten Spalte zu erstellen und sich mit die vorherige ursprüngliche Spalte

Unten ist das einfache Beispiel

val rdd = spark.sparkContext.parallelize(Seq(
     (1, Array(2.0,5.0,6.3)), 
     (5, Array(1.0,3.3,9.5)), 
     (1, Array(5.0,4.2,3.1)), 
     (2, Array(9.6,6.3,2.3)), 
     (1, Array(8.5,2.5,1.2)), 
     (5, Array(6.0,2.4,7.8)), 
     (2, Array(7.8,9.1,4.2)) 
    ) 
    ) 

    val distinct = rdd.map(v => (v._1, 1))distinct() 
    //(v._1, 1)this is done because you need key value to join 

    //now join distinct with previous original RDD 
    distinct.join(rdd).map(v => (v._1, v._2._2)) 

Ausgang:

1, Array(2.0,5.0,6.3) 
1, Array(5.0,4.2,3.1) 
1, Array(8.5,2.5,1.2) 
5, Array(1.0,3.3,9.5) 
5, Array(6.0,2.4,7.8) 
2, Array(9.6,6.3,2.3) 
2, Array(7.8,9.1,4.2)