2017-05-09 5 views
0

Ich habe eine rdd wie soSplit RDD in viele RDDs und Cache

(aid, session, sessionnew, date) 
(55-BHA, 58, 15, 2017-05-09) 
(07-YET, 18, 5, 2017-05-09) 
(32-KXD, 27, 20, 2017-05-09) 
(19-OJD, 10, 1, 2017-05-09) 
(55-BHA, 1, 0, 2017-05-09) 
(55-BHA, 19, 3, 2017-05-09) 
(32-KXD, 787, 345, 2017-05-09) 
(07-YET, 4578, 1947, 2017-05-09) 
(07-YET, 23, 5, 2017-05-09) 
(32-KXD, 85, 11, 2017-05-09) 

ich alles mit der gleichen Hilfe zu einem neuen rdd teilen wollen und dann die später für die Verwendung zwischenzuspeichern, so ein rdd pro einzigartige Hilfe . Ich sah einige andere Antworten, aber sie speichern die RDDs in Dateien. Gibt es ein Problem beim Speichern dieser vielen rds im Speicher? Es wird wahrscheinlich um 30k sein +

Ich speichere die zwischengespeicherte Rdd mit Funken Jobserver.

Antwort

0

Ich würde Sie cache die grouped rdd als vorschlagen unten
können sagen, Sie haben RDD Daten:

val rddData = sparkContext.parallelize(Seq(
     ("55-BHA", 58, 15, "2017-05-09"), 
     ("07-YET", 18, 5, "2017-05-09"), 
     ("32-KXD", 27, 20, "2017-05-09"), 
     ("19-OJD", 10, 1, "2017-05-09"), 
     ("55-BHA", 1, 0, "2017-05-09"), 
     ("55-BHA", 19, 3, "2017-05-09"), 
     ("32-KXD", 787, 345, "2017-05-09"), 
     ("07-YET", 4578, 1947, "2017-05-09"), 
     ("07-YET", 23, 5, "2017-05-09"), 
     ("32-KXD", 85, 11, "2017-05-09"))) 

Sie können die Daten cache, indem sie mit „Hilfe“ Gruppierung und verwenden filter die grouped data Sie auswählen müssen wie:

val grouped = rddData.groupBy(_._1).cache 
val filtered = grouped.filter(_._1 equals("32-KXD")) 

Aber ich würde vorschlagen, Sie DataFrame wie unterhalb derer zu verwenden ist effizient und verbessert als rdd s

import sqlContext.implicits._ 
val dataFrame = Seq(
    ("55-BHA", 58, 15, "2017-05-09"), 
("07-YET", 18, 5, "2017-05-09"), 
("32-KXD", 27, 20, "2017-05-09"), 
("19-OJD", 10, 1, "2017-05-09"), 
("55-BHA", 1, 0, "2017-05-09"), 
("55-BHA", 19, 3, "2017-05-09"), 
("32-KXD", 787, 345, "2017-05-09"), 
("07-YET", 4578, 1947, "2017-05-09"), 
("07-YET", 23, 5, "2017-05-09"), 
("32-KXD", 85, 11, "2017-05-09")).toDF("aid", "session", "sessionnew", "date").cache 

val newDF = dataFrame.select("*").where(dataFrame("aid") === "32-KXD") 
newDF.show 

Ich hoffe, es hilft