2014-12-01 11 views
5

Gibt es eine Spark-Funktion, die es erlaubt, eine Sammlung in mehrere RDDs aufzuteilen? Eine solche Funktion würde eine übermäßige Iteration vermeiden. Zum Beispiel:Scala Spark: Split-Sammlung in mehrere RDD?

def main(args: Array[String]) { 
    val logFile = "file.txt" 
    val conf = new SparkConf().setAppName("Simple Application") 
    val sc = new SparkContext(conf) 
    val logData = sc.textFile(logFile, 2).cache() 
    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt") 
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt") 
    } 

In diesem Beispiel habe ich iterieren haben ‚logData` zweimal nur um Ergebnisse in zwei separaten Dateien zu schreiben:

val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt") 
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt") 

Es stattdessen wäre schön, so etwas zu haben:

val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line) 
    resultMap.writeByKey("a", "linesA.txt") 
    resultMap.writeByKey("b", "linesB.txt") 

Irgendwelche solche Sache?

+0

Sie scheinen groupBy Variante zu wollen, die eine Karte zurück von substring-> RDD, anstatt eine RDD von substring-> Iterable [String]? –

Antwort

3

Schauen Sie sich die folgende Frage an.

Write to multiple outputs by key Spark - one Spark job

Sie können flatMap eine RDD mit einer Funktion wie folgt aus und führen Sie dann einen groupBy auf den Schlüssel.

def multiFilter(words:List[String], line:String) = for { word <- words; if line.contains(word) } yield { (word,line) } 
val filterWords = List("a","b") 
val filteredRDD = logData.flatMap(line => multiFilter(filterWords, line)) 
val groupedRDD = filteredRDD.groupBy(_._1) 

aber abhängig von der Größe Ihrer Eingabe RDD Sie Gewinne sehen oder keine Leistung, weil jeder von groupBy Operationen einen Shuffle beinhaltet.

Wenn Sie andererseits genügend Speicher in Ihrem Spark-Cluster haben, können Sie die Eingabe-RDD zwischenspeichern und daher ist das Ausführen mehrerer Filteroperationen möglicherweise nicht so teuer, wie Sie denken.

+0

Danke. Wenn sie jedoch für einen Datensatz von (K, V) -Paaren aufgerufen wird, gibt 'groupBy' einen ** einzigen RDD ** -kontinuierlichen Datensatz von (K, Iterable ) Paaren zurück. Also gibt es keine Möglichkeit, eine ** Sammlung von RDDs ** als Ergebnis von Spark-Transformationen zu erhalten? – zork

3

Vielleicht wäre so etwas wie dies funktioniert:

def singlePassMultiFilter[T](
     rdd: RDD[T], 
     f1: T => Boolean, 
     f2: T => Boolean, 
     level: StorageLevel = StorageLevel.MEMORY_ONLY 
): (RDD[T], RDD[T], Boolean => Unit) = { 
    val tempRDD = rdd mapPartitions { iter => 
    val abuf1 = ArrayBuffer.empty[T] 
    val abuf2 = ArrayBuffer.empty[T] 
    for (x <- iter) { 
     if (f1(x)) abuf1 += x 
     if (f2(x)) abuf2 += x 
    } 
    Iterator.single((abuf1, abuf2)) 
    } 
    tempRDD.persist(level) 
    val rdd1 = tempRDD.flatMap(_._1) 
    val rdd2 = tempRDD.flatMap(_._2) 
    (rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking)) 
} 

Hinweis, dass eine Aktion auf rdd1 genannt (bzw. rdd2.) Wird tempRDD verursacht berechnet werden und anhielt. Dies entspricht praktisch der Berechnung rdd2 (bzw. rdd1), da der Overhead der flatMap in den Definitionen rdd1 und meiner Meinung nach ziemlich vernachlässigbar ist.

würden Sie singlePassMultiFitler verwenden wie so:

val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2) 
rdd1.persist() //I'm going to need `rdd1` more later... 
println(rdd1.count) 
println(rdd2.count) 
cleanUp(true)  //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up... 
println(rdd1.distinct.count) 

klar, daß diese auf eine beliebige Anzahl von Filtern erweitert könnte, Sammlungen von Filtern usw.

+0

Es kann effizienter auf einer sehr niedrigen Ebene (CPU-Cache-Auslastung und dergleichen, obwohl es von ArrayBuffer Wartung verzehrt werden kann), aber auf der höheren Ebene führt es genau die gleiche Menge an Arbeit als wiederholte Filter auf einem zwischengespeicherten 'rdd' . Trotzdem sieht es viel besser aus als eine angenommene Antwort, die die Situation tatsächlich verschlimmert. – zero323

+0

Der Ansatz, den ich vorgeschlagen habe, erfordert nur ein einmaliges Durchlaufen von "rdd", während wiederholtes Filtern von "rdd" ein wiederholtes Durchlaufen von "rdd" erfordert. Wenn man weiß, dass sich die Filterkriterien gegenseitig ausschließen (wahrscheinlich gilt dies für viele Anwendungen), kann man eine größere Effizienz erreichen, indem man einfach die beiden "if" -Anweisungen durch eine einzige "if-else" -Anweisung ersetzt.Eine solche Optimierung wäre nicht möglich, indem wiederholt auf "rdd" gefiltert wird. –

+0

So oder so, für N-Elemente und M-Bedingungen ist es immer noch O (NM). Wenn RDD nicht im Speicher zwischengespeichert würde, gäbe es einen großen praktischen Unterschied, aber ansonsten ist es nicht wirklich wichtig. Wie auch immer, wenn ich immer noch denke, es ist viel besser Ansatz als Partitionierung daher der upvote. – zero323