Ich versuche, mit einer großen RDD zu arbeiten, wie von einer Datei DStream
gelesen.Zählen von Aufzeichnungen meiner RDDs in einem großen Dstream
Der Code sieht wie folgt aus:
val creatingFunc = {() =>
val conf = new SparkConf()
.setMaster("local[10]")
.setAppName("FileStreaming")
.set("spark.streaming.fileStream.minRememberDuration", "2000000h")
.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text], classOf[GGSN]))
val sc = new SparkContext(conf)
// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val appFile = httpFileLines
.map(x=> (x._1,x._2.toString()))
.filter(!_._2.contains("ggsnIPAddress"))
.map(x=>(x._1,x._2.split(",")))
var count=0
appFile.foreachRDD(s => {
// s.collect() throw exception due to insufficient amount of emery
//s.count() throw exception due to insufficient amount of memory
s.foreach(x => count = count + 1)
})
println(count)
newContextCreated = true
ssc
}
, was ich versuche zu tun, die Zählung meines RDD..however zu bekommen, da es large..it ist wirft exception..so muss ich tun stattdessen ein foreach zu vermeiden, Daten in den Speicher zu sammeln ..
ich möchte die Zählung dann als Weg in meinem Code zu bekommen, aber es gibt immer 0 ..
gibt es eine Möglichkeit, dies zu tun?
Wenn Sie mit RDDs arbeiten, können Sie keine Summe in eine lokale Variable wie diese akkumulieren. Sie müssen einen 'org.apache.spark.Accumulator' verwenden oder Sie können einfach' Rdd.count' oder 'DStream.count' aufrufen. –
Wo werden Ihre' httpFileLines' erstellt? Ist es 'RDD oder' DStream'? –
Willst du die Zählung deiner rds oder Zählung aller Elemente im dstream? – Knight71