Angenommen, ich folgende Daten habe:N Werte von jeder Partition in Funken unter
val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1))
val DataSortRDD = sc.parallelize(DataSort,2)
Und jetzt gibt es zwei Partitionen mit:
scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))
es, dass die Daten in jeder Partition angenommen wird, ist bereits sortiert mit etwas wie sortWithinPartitions(col("src").desc,col("rank").desc)
(das ist für einen Datenrahmen, ist aber nur zur Veranschaulichung).
Was ich will ist von jeder Partition bekommen für jeden Buchstaben die ersten zwei Werte (wenn es mehr als 2 Werte gibt). So in diesem Beispiel sollte das Ergebnis in jeder Partition sein:
scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))
Ich weiß, dass ich die mapPartition
Funktion aber es ist nicht klar, in meinem Kopf verwenden, wie kann ich durch die Werte in jeder Partition durchlaufen und erhalten die ersten 2. Irgendwelche Trinkgeld?
Bearbeiten: Genauer gesagt. Ich weiß, dass die Daten in jeder Partition bereits nach "Buchstaben" und danach nach "Anzahl" sortiert sind. Also meine Hauptidee ist, dass die Eingabefunktion in mapPartition
die Partition durchlaufen sollte und yield
die ersten beiden Werte jedes Buchstabens. Und dies könnte getan werden, indem jeder iterate den .next()
Wert überprüft. Dies ist, wie ich es in Python schreiben konnte:
def limit_on_sorted(iterator):
oldKey = None
cnt = 0
while True:
elem = iterator.next()
if not elem:
return
curKey = elem[0]
if curKey == oldKey:
cnt +=1
if cnt >= 2:
yield None
else:
oldKey = curKey
cnt = 0
yield elem
DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)
tut es ganz gleich, wie das Endergebnis _partitioned_ ist? Mit anderen Worten: Wenn Sie die gleichen Ergebnisse erhalten, aber anders partitioniert, wäre das immer noch in Ordnung? Die Filterung würde weiterhin wie erwartet auf der ursprünglichen Partitionierung basieren. –