2015-12-25 6 views
13

Ich habe Tausende von kleinen Dateien in HDFS. Müssen Sie eine etwas kleinere Teilmenge von Dateien verarbeiten (was wiederum in Tausenden ist), enthält Dateiliste eine Liste von Dateipfaden, die verarbeitet werden müssen.Stackoverflow aufgrund der langen RDD Lineage

// fileList == list of filepaths in HDFS 

var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD 

for (i <- 0 to fileList.size() - 1) { 

val filePath = fileStatus.get(i) 
val fileRDD = sparkContext.textFile(filePath) 
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) 

masterRDD = masterRDD.union(sampleRDD) 

} 

masterRDD.first() 

// Einmal aus Schleife, Durchführung jeglicher Aktion Ergebnisse in Stackoverflow-Fehler aufgrund der langen Linie von RDD

Exception in thread "main" java.lang.StackOverflowError 
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    ===================================================================== 
    ===================================================================== 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 

Antwort

27

Im Allgemeinen können Sie Checkpoints lange Linien zu brechen verwenden. Einige mehr oder weniger ähnlich wie dies funktionieren sollte:

import org.apache.spark.rdd.RDD 
import scala.reflect.ClassTag 

val checkpointInterval: Int = ??? 

def loadAndFilter(path: String) = sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _)) 

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) 
    (acc: RDD[T], xi: (RDD[T], Int)) = { 
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint 
    else xi._1.union(acc) 
    } 

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] 
fileList.map(loadAndFilter).zipWithIndex 
    .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 

In dieser besonderen Situation eine viel einfachere Lösung sollte SparkContext.union Methode zu verwenden sein:

val masterRDD = sc.union(
    fileList.map(path => sc.textFile(path) 
    .filter(_.startsWith("#####")) 
    .map((path, _))) 
) 

Ein Unterschied zwischen diesen Methoden sollte offensichtlich sein, wenn Sie nehmen ein Blick auf den durch die Schleife erzeugt DAG/reduce:

enter image description here

und Einzel union:

enter image description here

Natürlich, wenn Dateien klein sind Sie wholeTextFiles mit flatMap und lesen Sie alle Dateien auf einmal kombinieren:

sc.wholeTextFiles(fileList.mkString(",")) 
    .flatMap{case (path, text) => 
    text.split("\n").filter(_.startsWith("#####")).map((path, _))} 
+4

Best ever Verwendung von sc.union() –