2016-05-02 14 views
0

Ich habe einen Akteur, der Datenbankaktualisierungen orchestriert. Ich muss sicherstellen, dass jede Operation erst ausgeführt wird, nachdem die vorherige abgeschlossen wurde. Dies, weil Operation B das Ergebnis von Operation A wiederverwenden wird.Ich muss Operationen nacheinander mit Akka ausführen

Hier der Code, den ich für den Schauspieler schrieb.

class DbUpdateActor(databaseOperations: DBProvider) extends Actor { 

    implicit val ec:ExecutionContext = context.system.dispatcher 

    def receive: Receive = { 

    case newInfo : UpdateDb => 

     val future = Future { 
      // gets the current situation from DB 
      val status = databaseOperations.getSituation() 
      // do db update 
      databaseOperations.save(something) 
     } 

     future onComplete { 
     case Success(result: List[Int]) => 
      // 
     case Failure(err: Throwable) => 
      // 
     } 
    } 
} 

Der Code funktioniert für eine einzelne Operation. Wenn ich zwei Aktualisierungen feuere, wird die zweite asynchron ausgeführt, so dass sie gestartet wird, bevor die erste abgeschlossen ist.

Ich habe über verschiedene Arten von Mailbox gelesen, nicht sicher, ob eine andere helfen würde.

Irgendwelche Vorschläge?

Antwort

1

Eine Option, die Sie untersuchen können, wäre, diese Future zu entfernen und zuzulassen, dass blockierender db-Code innerhalb des Aktors ausgeführt wird. Verwenden Sie dann einen separaten Dispatcher (möglicherweise einen PinnedDispatcher), um diesen Blockierungscode vom Dispatcher des Hauptakteursystems abzukoppeln und ihm einen eigenen Thread zum Ausführen zu geben. Indem Sie im Körper blockieren und das Future entfernen, stellen Sie sicher, dass das Postfach des Akteurs ordnungsgemäß sequenziell ausgeführt wird. Eine grobe Skizze der Änderungen, die Arbeit zu machen, ist wie folgt:

object DbUpdateActor{ 
    def props(databaseOperations:DBProvider) = 
    Props(classOf[DbUpdateActor], databaseOperations). 
     withDispatcher("db-update-dispatcher") 
} 

class DbUpdateActor(databaseOperations: DBProvider) extends Actor { 
    def receive: Receive = { 

    case newInfo : UpdateDb => 
     val status = databaseOperations.getSituation() 
     databaseOperations.save(something) 
    } 
} 

Dann, solange Sie die folgenden Dispatcher in Ihrer Schauspieler Systemkonfiguration konfiguriert hatten:

db-update-dispatcher { 
    executor = "thread-pool-executor" 
    type = PinnedDispatcher 
} 

Und Sie in Betrieb genommen die db Update Schauspieler wie so:

val updater = system.actorOf(DbUpdateActor.props(databaseOperations)) 

Dann sollten Sie jetzt diesen Schauspieler Einstellung bis zu laufen, dass in einer Art und Weise Code blockieren, die nicht negativ auf den Durchsatz des Haupt Dispatcher auswirken wird.

+0

das hat sehr gut funktioniert. Ich hatte erwartet, dass der Future-Block den Actor beschäftigt hält und erst nach dem nächsten Block mit der nächsten Nachricht fortfahren soll. Was vermisse ich? – abx78

+1

Callbacks, die Sie in einem 'Future' registrieren (wie' onComplete' in Ihrem Fall), laufen asynchron ab, wahrscheinlich in einem ganz anderen Thread. Das bedeutet, dass die Codeausführung im aktuellen Thread fortgesetzt wird. In deinem Fall gibt es nach diesem "Future" keinen weiteren Code mehr, so dass der Akteur die Behandlung dieser Nachricht abschließt und zur nächsten übergeht. Du musst deshalb 'Future's und Schauspieler vorsichtig mischen. – cmbaxter

+0

genial, danke für die Erklärung. – abx78

0

Wie wäre es damit: Operation A in einem Kind starten; Wenn das Kind fertig ist, sendet es dem Elternteil eine Nachricht, dass es abgeschlossen ist. Dann können Sie Operation B entweder im vorhandenen oder einem neuen Kind starten.

Verwandte Themen