2016-09-07 3 views
0

ich kopieren haben folgende RDD [String]:Wie eine Reihe von Elementen in RDD in einen kleineren RDD

val rdd = sc.makeRDD(Seq("paul", "jim,", "joe", "mary", "sean", "peter", "lucy")) 

Was Ich mag wäre in der Lage sein, eine smallerRDD durch zu tun ist, zu erzeugen in der Lage, kopiere einen Bereich von Zeilen von der obigen Haupt-rdd in eine kleinere rdd.

Anwendungsfall: Beim Durchlaufen von RDDs in Zündfunken können ungewöhnliche Situationen auftreten, häufiger können bestimmte Leitungen/Datensätze in RDDs Probleme verursachen.

In der Lage zu sein, programmatisch einen zum anderen zu kopieren, nutze in der Tat eine nützliche Funktion, da ich hierfür keine in Dosen gespeicherte rdd-Methode finden konnte. siehe meine Lösung unten.

+3

Wie würden Sie wählen, welcher Teil der RDD sollte auf die kleinere RDD kopiert werden? – Yaron

+1

Bitte erweitern Sie Ihren Anwendungsfall. Ja, bestimmte Zeilen können Probleme verursachen, aber würden Sie diese wirklich nach Index oder Inhalt identifizieren? –

Antwort

0
val rdd = sc.makeRDD(Seq("paul", "jim", "joe", "mary", "sean", "peter", "lucy")) 

val startIndex = 1 
val endIndex = 5 
val shortRdd=rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.map(p=>p._1) 
shortRdd.count 
shortRdd.foreach(println) 

Schritt 1: Mal sehen, was im Innern des RDD:

rdd.foreach(println) 
peter 
lucy 
jim 
joe 
paul 
mary 
sean 

Schritt 2: eine Transformation Nehmen Index anhänge, bemerken der ein Indexwert nun auf jede Zeile angewandt wird.

rdd.zipWithIndex().foreach(println) 
(peter,5) 
(jim,1) 
(joe,2) 
(paul,0) 
(mary,3) 
(sean,4) 
(lucy,6) 

Schritt 3: Anwenden von Filtern auf der Indexposition, zieht Indizes zwischen Start- und Zielindexposition

rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.foreach(println) 
(mary,3) 
(sean,4) 
(jim,1) 
(peter,5) 
(joe,2) 

Schritt 4: Karte zurück zum einzelnen Elemente in jeder Zeile

rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.map(p=>p._1).foreach(println) 
mary 
jim 
joe 
peter 
sean 

I durchgeführt Dieser Prozess auf RDD mit Zeilen von 100k oder mehr ohne Probleme. Lassen Sie mich wissen, wie dies mit größeren RDD funktioniert.

Das ist es! Paul.

Verwandte Themen