2017-12-20 2 views
0

Ich bin ein Dienstprogramm erstellen, die den Fortschritt einer Datei in einem größeren System verarbeitet und verarbeitet. Die Datei ist eine große "Text" -Datei, .csv, .xls, .txt, usw. Das könnte Daten von Kafka streamen, sie in Avro schreiben oder in Massenbatches eine SQL DB schreiben. Ich versuche, ein "Catchall" Dienstprogramm zu erstellen, das die Anzahl der verarbeiteten Zeilen protokolliert und den Fortschritt in einer Datenbank mithilfe eines RESTful-API-Aufrufs fortsetzt.Zukunft in einer While-Schleife aufgerufen wird nicht jedes Mal aufgerufen

Die Verarbeitung erfolgt immer innerhalb eines Akka Actors, unabhängig von der Art der Verarbeitung. Ich versuche, die Fortschrittsprotokollierung asynchron durchzuführen, um den Fortschritt der Verarbeitung nicht zu blockieren. Das Fortschreiten geschieht sehr schnell. Das meiste davon in einem ähnlichen Batch-Stil-Format geschieht, wenn auch manchmal ist es inkrementell eines nach dem anderem gehen, hier ist eine grundlegende Darstellung das, was bei der Verarbeitung nur zu Demonstrations passieren würde:

//inside my processing actor 

    var fileIsProcessing = true 
    val allLines = KafkaUtil.getConnect(fileKey) 
    val totalLines = KafkaUtil.getSize 
    val batchSize = 500 
    val dBUtil = new DBUtil(totalLines) 

while (fileIsProcessing) { 

    // consumes @ 500 lines at a time to process, returns empty if done consuming 
    val batch:List[Pollable] = allLines.poll 
    //for batch identification purposes 
    val myMax = batch.map(_.toInt).max 
    println("Starting new batch with max line: " + myMax) 

    //processing work happens here 
    batch.map(processSync) 
    println("Finished processing batch with max line: " + myMax) 

    //send a progress update to be persisted to the DB 
    val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)} 
    progressCall.onComplete{ 
      case Success(s) => // don't care 
      case Failure(e) => logger.error("Unable to persist progress from actor ") 
    } 

if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional. 
} 

Und eine einfache Darstellung die Verarbeitung meiner DBUtil, macht die Klasse:

class DBUtil(totalLines:Int) { 

    //store both the number processed and the total to process in db, even if there is currently a percentage 

var rate = 0 //lines per second 
var totalFinished = 0 
var percentageFin:Double = 0 
var lastUpdate = DateTime.now() 

def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = { 
    //simulate write the data and calculated progress percentage to db 
    rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000) 
    totalFinished += totalProcessed 
    percentageFin = (totalFinished.toDouble/totalLines.toDouble) * 100 
    println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate") 
} 

}

Nun, was wirklich seltsam ist, in der Produktion ist, geschieht die Verarbeitung so schnell, dass die Linie Future[Unit] { dBUtil.incrementProgress(batch.size)} nicht zuverlässig jedes Mal aufgerufen . Die while Schleife wird beendet, aber ich werde in meiner DB bemerken, dass der Fortschritt bei 50% oder 80% hängen bleibt. Der einzige Weg, wie es funktioniert, ist, wenn ich das System mit logger oder println Aussagen zu verlangsamen, um es zu verlangsamen.

Warum ruft mein Future Call nicht zuverlässig jedes Mal an?

+1

Sie zeigen Pseudocode ohne Synchronisierung in 'DBUtil'. Es ist leicht vorstellbar, dass Sie dort eine Ausnahme werfen und es nie bemerken. –

+0

Ich überprüfe auf jeden Fall nach Fehlern, es gibt 'Try's wo verwendet, und die Zukunft verwendet oft eine .onComplete {case Success => ... case Fehler => ...}' Der obige Pseudocode ist so entfernt wie möglich zum Zwecke der Abstraktion. Ich bin nur neugierig, ob es einen Grund geben würde, dass ein zukünftiger Anruf aus irgendeinem Grund "übersprungen" würde. – NateH06

Antwort

1

Nun ... so gibt es nur wenige Probleme mit dem Code, den Sie haben,

Sie initiieren nur das Futures in Ihrer while-Schleife und dann geht die Schleife für die nächste Iteration ohne Warten auf die Zukunft zu beenden. Das bedeutet, dass Ihr Programm beendet werden kann, bevor die Futures tatsächlich vom Executor ausgeführt wurden.

Außerdem erstellt Ihre Schleife mehr und mehr "futuristische" Aufrufe an dBUtil.incrementProgress(batch.size), Sie haben mehrere Threads, die dieselbe Funktion zur gleichen Zeit ausführen. Dies wird zu Wettlaufbedingungen führen, wenn Sie einen veränderbaren Zustand verwenden.

def processFileWithIncrementalUpdates(
    allLines: ????, 
    totalLines: Int, 
    batchSize: Int, 
    dbUtil: DBUtil 
): Future[Unit] = { 
    val promise = Promise[Unit]() 
    Future { 
    val batch: List[Pollable] = allLines.poll 
    if (batch.isEmpty) { 
     promise.completeWith(Future.successful[Unit]()) 
    } 
    else { 
     val myMax = batch.map(_.toInt).max 
     println("Starting new batch with max line: " + myMax) 

     //processing work happens here 
     batch.map(processSync) 
     println("Finished processing batch with max line: " + myMax) 

     //send a progress update to be persisted to the DB 
     val progressCall = Future[Unit] { dBUtil.incrementProgress(batch.size) } 

     progressCall.onComplete{ 
     case Success(s) => // don't care 
     case Failure(e) => logger.error("Unable to persist progress from actor ") 
     } 

     progressCall.onComplete({ 
     case _ => promise.completeWith(processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil)) 
     }) 
    } 
    promise.future 
    } 
} 

val allLines = KafkaUtil.getConnect(fileKey) 
val totalLines = KafkaUtil.getSize 
val batchSize = 500 
val dBUtil = new DBUtil(totalLines) 

val processingFuture = processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil) 
+0

> Sie initiieren nur die Futures in Ihrer while-Schleife und dann geht Ihre Schleife> für die nächste Iteration, ohne auf die Zukunft zu warten. Was bedeutet, dass Ihr Programm beendet wird, bevor die Futures tatsächlich vom Executor ausgeführt wurden. Genau das möchte ich - aber würden die Anrufe in die Zukunft wirklich "verloren" gehen? Ich würde denken, dass alle Schleifen enden würden, aber dann zurückgehen und versuchen, alle zukünftigen Anrufe "aufzuholen". Ich befürwortete wirklich, die Anrufe in einem Akteur zu machen, für das "Feuer und Vergessen" und die Warteschlange, die den Auftrag behielt, aber wegen der Gemeinkosten abgelehnt wurde. – NateH06

+0

Sorry über die Formatierung Ihrer Zitate, nicht schnell genug bearbeitet. – NateH06

+0

Das Hauptproblem besteht darin, dass Sie den veränderbaren Status zwischen verschiedenen Threads geteilt haben, die die Funktion incrementProgress aufrufen. –

Verwandte Themen