2017-06-02 2 views
0

Ich arbeite an einer Anwendung, wo alle 30 Sekunden (kann auch 5 sec auch sein) einige Dateien in einem Dateisystem gelöscht werden. Ich muss es lesen lesen und einige Datensätze nach REDIS schieben.Spark Local File Streaming - Fehlertoleranz

In jeder Datei sind alle Datensätze unabhängig und ich mache keine Berechnung, die updateStateByKey erfordert.

Meine Frage ist, wenn aufgrund eines Problems (zB: REDIS Verbindungsproblem, Datenproblem in einer Datei etc) einige Dateien nicht vollständig verarbeitet werden Ich möchte die Dateien erneut (sagen n mal) verarbeiten und auch verfolgen Die Dateien wurden bereits verarbeitet.

Zu Testzwecken lese ich aus einem lokalen Ordner. Ich bin auch nicht sicher, wie zu dem Schluss, dass eine Datei vollständig verarbeitet wird und es als abgeschlossen markiert (dh schreibt in einer Textdatei oder db, die diese Datei verarbeitet)

val lines = ssc.textFileStream("E:\\SampleData\\GG") 
val words = lines.map(x=>x.split("_")) 
words.foreachRDD(
    x=> { 
    x.foreach(   
     x => { 
     var jedis = jPool.getResource(); 
     try{ 
      i=i+1 
      jedis.set("x"+i+"__"+x(0)+"__"+x(1), x(2)) 
     }finally{ 
      jedis.close() 
     } 
     } 
    ) 
    } 
) 

Antwort