2016-07-27 6 views
0

Angenommen, ich folgende Daten habe:N Werte von jeder Partition in Funken unter

val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1)) 
val DataSortRDD = sc.parallelize(DataSort,2) 

Und jetzt gibt es zwei Partitionen mit:

scala>DataSortRDD.glom().take(2).head 
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4)) 
scala>DataSortRDD.glom().take(2).tail 
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1))) 

es, dass die Daten in jeder Partition angenommen wird, ist bereits sortiert mit etwas wie sortWithinPartitions(col("src").desc,col("rank").desc) (das ist für einen Datenrahmen, ist aber nur zur Veranschaulichung).

Was ich will ist von jeder Partition bekommen für jeden Buchstaben die ersten zwei Werte (wenn es mehr als 2 Werte gibt). So in diesem Beispiel sollte das Ergebnis in jeder Partition sein:

scala>HypotheticalRDD.glom().take(2).head 
Array(("a",5),("b",13),("b",2),("c",4)) 
scala>HypotheticalRDD.glom().take(2).tail 
Array(Array(("a",1),("b",15),("c",3),("c",2))) 

Ich weiß, dass ich die mapPartition Funktion aber es ist nicht klar, in meinem Kopf verwenden, wie kann ich durch die Werte in jeder Partition durchlaufen und erhalten die ersten 2. Irgendwelche Trinkgeld?

Bearbeiten: Genauer gesagt. Ich weiß, dass die Daten in jeder Partition bereits nach "Buchstaben" und danach nach "Anzahl" sortiert sind. Also meine Hauptidee ist, dass die Eingabefunktion in mapPartition die Partition durchlaufen sollte und yield die ersten beiden Werte jedes Buchstabens. Und dies könnte getan werden, indem jeder iterate den .next() Wert überprüft. Dies ist, wie ich es in Python schreiben konnte:

def limit_on_sorted(iterator): 
    oldKey = None 
    cnt = 0 
    while True: 
     elem = iterator.next() 
     if not elem: 
      return 
     curKey = elem[0] 
     if curKey == oldKey: 
      cnt +=1 
      if cnt >= 2: 
       yield None 
     else: 
      oldKey = curKey 
      cnt = 0 
     yield elem 

DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None) 
+0

tut es ganz gleich, wie das Endergebnis _partitioned_ ist? Mit anderen Worten: Wenn Sie die gleichen Ergebnisse erhalten, aber anders partitioniert, wäre das immer noch in Ordnung? Die Filterung würde weiterhin wie erwartet auf der ursprünglichen Partitionierung basieren. –

Antwort

1

Sie nicht wirklich über die Aufteilung des Ergebnis Pflege Unter der Annahme,, können Sie mapPartitionsWithIndex verwenden, um die Partitions-ID in den Schlüssel einzuarbeiten durch die Sie groupBy, dann können Sie leicht nehmen die ersten beiden Punkte für jeden dieser Schlüssel:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitionsWithIndex { 
    // add the partition ID into the "key" of every record: 
    case (partitionId, itr) => itr.map { case (k, v) => ((k, partitionId), v) } 
    } 
    .groupByKey() // groups by letter and partition id 
    // take only first two records, and drop partition id 
    .flatMap { case ((k, _), itr) => itr.take(2).toArray.map((k, _)) } 

println(result.collect().toList) 
// prints: 
// List((a,5), (b,15), (b,13), (b,2), (a,1), (c,4), (c,3)) 

Sie beachten Sie, dass das Endergebnis ist nicht auf die gleiche Weise partitioniert (groupByKey ändert die Partitionierung), ich bin unter der Annahme Dies ist nicht entscheidend für das, was Sie versuchen zu tun (was, ehrlich gesagt, entgeht mir).

EDIT: wenn Sie möchten, schlurfenden vermeiden und alle Operationen innerhalb jeder Partition durchführen:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitions(_.toList.groupBy(_._1).mapValues(_.take(2)).values.flatten.iterator, true) 
+0

Danke für die Antwort. Vielleicht sollte ich es in der Frage erwähnen. Der Grund, warum ich 'mapPartition' verwenden möchte, ist, weil ich das Mischen zwischen Partitionen aus Effizienzgründen vermeiden möchte. In Ihrer Lösung mit 'groupByKey' gibt es Shuffling. –

+0

Ich sehe. Bearbeitete meine Antwort, um eine Lösung ohne Mischen hinzuzufügen (behält Partitionen bei) –

+0

Ihre Antwort ist korrekt. Mein Anliegen ist die 'groupBy (_._ 1)'. Warum muss ich gruppieren, wenn ich weiß, dass die Werte bereits nach Buchstaben und nach Anzahl sortiert sind? Ich habe meine Frage aktualisiert, um meine Idee klarer zu machen. –

Verwandte Themen