2017-11-06 4 views
1

Ich benutze akkas PersistentActor mit sharding in einem Cluster, um meinen Zustand zu verfolgen. Ich habe einen ‚Room‘, die ich über folgenden Code aktualisieren:akka Persist-Funktion wird nicht jedes Mal behandelt

case UpdateRoom(id, room, userId) => ((ret: ActorRef) => { 
    (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map { role => 
    if (role == "owner") { 
     state = Some(room) 
     ret ! Success(room) 
     eventRegion ! RoomEventPackage(id, RoomUpdated(room)) 
     println("works") 
     persist(RoomUpdated(room))(e => println("this doesn't always fire") 
    } else { 
     ret ! Failure(InsufficientRights(role, "Update Room")) 
    } 
    } 

Problem ist, dass nur persist jede andere Zeit, während der Rest der Funktion arbeitet wie erwartet funktioniert. ("Werke" werden jedes Mal gedruckt, "das feuert nicht immer", sondern zweimal). Ich muss immer den Update-Befehl zweimal auslösen, um mein Event zu speichern, aber dann scheint es für beide Male gespeichert zu sein, dass ich den Befehl abgefeuert habe.

Fehle ich einen wichtigen Teil von akka persist?

Antwort

1

Ich denke, dass Sie einen schweren Fehler in der Welt Actor machen: Zugriff auf den Actor (veränderlichen) Zustand von außen. In Ihrem Fall geschieht dies zweimal aus dem Rückruf der zurück Future von ask/?:

  • bei der Aktualisierung Zustand: state = Some(room)
  • wenn persist

Der einzig sichere Weg, Aufruf zu behandeln Fragen innerhalb von Actor und anschließend den Status des Schauspielers ändern, ist eine Nachricht an den gleichen Akteur von Ask Callback zu senden, zu diesem Zweck können Sie pipeTo verwenden.

eine vereinfachte Version des Codes verwenden zu veranschaulichen:

case UpdateRoom(id, room, userId) => 
    val answer = (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map(role => RoleForRoom(id, room, userId, role)) 
    answer piepTo self 
case RoleForRoom(id, room, userId, room) => 
    if (role == "owner") { 
    state = Some(room) 
    eventRegion ! RoomEventPackage(id, RoomUpdated(room)) 
    persist(RoomUpdated(room))(e => println("this is safe")) 
    } 

siehe auch: https://doc.akka.io/docs/akka/2.5.6/scala/general/jmm.html#actors-and-shared-mutable-state

Verwandte Themen