2016-08-12 3 views
2

Ich versuche, mit einer großen RDD zu arbeiten, wie von einer Datei DStream gelesen.Zählen von Aufzeichnungen meiner RDDs in einem großen Dstream

Der Code sieht wie folgt aus:

val creatingFunc = {() => 
    val conf = new SparkConf() 
       .setMaster("local[10]") 
       .setAppName("FileStreaming") 
       .set("spark.streaming.fileStream.minRememberDuration", "2000000h") 
       .registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable], 
classOf[org.apache.hadoop.io.Text], classOf[GGSN])) 

    val sc = new SparkContext(conf) 

    // Create a StreamingContext 
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) 

    val appFile = httpFileLines 
        .map(x=> (x._1,x._2.toString())) 
        .filter(!_._2.contains("ggsnIPAddress")) 
        .map(x=>(x._1,x._2.split(","))) 

    var count=0 

    appFile.foreachRDD(s => { 
    // s.collect() throw exception due to insufficient amount of emery 
    //s.count() throw exception due to insufficient amount of memory 
    s.foreach(x => count = count + 1) 
    }) 

    println(count) 
    newContextCreated = true 
    ssc 
} 

, was ich versuche zu tun, die Zählung meines RDD..however zu bekommen, da es large..it ist wirft exception..so muss ich tun stattdessen ein foreach zu vermeiden, Daten in den Speicher zu sammeln ..

ich möchte die Zählung dann als Weg in meinem Code zu bekommen, aber es gibt immer 0 ..

gibt es eine Möglichkeit, dies zu tun?

+0

Wenn Sie mit RDDs arbeiten, können Sie keine Summe in eine lokale Variable wie diese akkumulieren. Sie müssen einen 'org.apache.spark.Accumulator' verwenden oder Sie können einfach' Rdd.count' oder 'DStream.count' aufrufen. –

+0

Wo werden Ihre' httpFileLines' erstellt? Ist es 'RDD oder' DStream'? –

+0

Willst du die Zählung deiner rds oder Zählung aller Elemente im dstream? – Knight71

Antwort

0

Es ist nicht erforderlich, foreachRDD und rufen Sie count. Sie können die count Methode auf DStream definiert verwenden:

val appFile = httpFileLines 
       .map(x => (x._1, x._2.toString())) 
       .filter(!_._2.contains("ggsnIPAddress")) 
       .map(x => (x._1, x._2.split(","))) 

val count = appFile.count() 

Wenn das immer noch eine unzureichende Menge an Speicher-Ausnahme ergibt, müssen Sie entweder jedes Mal die Berechnung werden kleinere Chargen von Daten, oder vergrößern Sie Arbeiter Knoten die Last zu bewältigen .

+0

Das gibt nicht die Zählungen aller Elemente in DStream zurück, ich muss immer noch fo .. – Luckylukee