2015-03-18 5 views

Antwort

2

Hier ist, wie ich es gemacht habe. Erstellen Sie eine leere RDD, die Ihr vorheriges Fenster ist. Berechnen Sie dann im forEachRDD den Unterschied zwischen dem letzten Fenster und dem aktuellen Fenster. Wenn das aktuelle Fenster Datensätze enthält, die nicht im vorherigen Fenster enthalten sind, gibt es etwas Neues im Stapel. Schließlich setzen Sie das vorherige Fenster auf das aktuelle Fenster.

... 

    var previousWindowRdd = sc.emptyRDD[String] 

    dStream.foreachRDD { 
    windowRdd => { 
     if (!windowRdd.isEmpty) processWindow(windowRdd.cache()) 
    } 
    } 

    ... 

def processWindow(windowRdd: RDD[String]) = { 
    val newInBatch = windowRdd.subtract(previousWindowRdd) 

    if (!newInBatch.isEmpty()) 
    processNewBatch(windowRdd) 

    previousWindowRdd = windowRdd 
}