2015-02-09 2 views
7

Ich habe eine Lösung für die Gruppe RDD[K, V] per Schlüssel implementiert und Daten entsprechend jeder Gruppe (K, RDD[V]), mit partitionBy und Partitioner zu berechnen. Trotzdem bin ich mir nicht sicher, ob es wirklich effizient ist und ich möchte Ihre Sichtweise haben.Verwenden von PartitionBy zum Teilen und effiziente Berechnung RDD-Gruppen mit Schlüssel

Hier ist ein Beispielfall: nach einer Liste von [K: Int, V: Int] berechnen die V s für jede Gruppe von K bedeuten, wohl wissend, dass es verteilt werden soll und dass V Werte können sehr groß sein. Das sollte geben:

List[K, V] => (K, mean(V)) 

Die einfache Partitioner Klasse:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

Die Partitionscode:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

Die Ausgabe lautet:

1->13, 2->4, 3->7

Meine Fragen sind:

  1. was passiert wirklich beim Anruf partitionBy? (Entschuldigung, ich fand nicht genug Spezifikationen)
  2. Ist es wirklich effizient, nach Partition zu mappen, wissend, dass es in meinem Produktionsfall nicht zu viele Schlüssel (wie 50 für Probe) durch sehr viele Werte (wie 1 Million für Beispiel)
  3. Was sind die Kosten von paralellize(x.toList)? Ist es konsequent, es zu tun? (Ich brauche eine RDD in Eingabe von mean())
  4. Wie würden Sie es selbst tun?

Grüße

Antwort

4

Der Code sollte nicht funktionieren. Sie können das Objekt SparkContext nicht an die Executoren übergeben. (Es ist nicht Serializable.) Auch ich sehe nicht, warum Sie müssten.

Um den Mittelwert zu berechnen, müssen Sie die Summe und die Anzahl berechnen und ihr Verhältnis nehmen. Der Standard-Partitionierer funktioniert gut.

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

Dies ist eine effiziente Single-Pass-Berechnung, die sich gut verallgemeinert.

+0

Vielen Dank für Ihre Antwort, natürlich kann es nicht funktionieren, ich habe nicht alle Reflex von Funken-Code-Tricks und ich wurde von meinem lokalen jvm verwöhnt. Trotzdem muss ich nicht die mittlere, sondern eine komplexe ml-Methode berechnen, und ich brauche eine RDD [Vector]. Wie könnte ich eine Liste von (Schlüssel, RDD [Vector]) von einer eindeutigen RDD [Int, Int] erhalten? Ich habe keine Lösung gefunden. – Seb

+0

Ich denke, das ist ein ähnliches Thema dann: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 Ich bin mir nicht sicher, wie Sie 'Vector' aus machen möchten 'Int's. Wenn Sie jedoch eine RDD pro Schlüssel erhalten möchten, müssen Sie die ursprüngliche RDD teilen, was in der verknüpften Antwort erläutert wird. Wenn es Ihnen nicht die Antwort gibt, schlage ich vor, eine andere Frage zu stellen, vielleicht mit einer klaren Erklärung auf hoher Ebene, was Sie tun möchten. –

Verwandte Themen