2016-03-30 5 views
1
nicht

Hier ist der Code:Warum ist diese anhaltende Schauspieler arbeiten

import akka.persistence._ 
import akka.actor.{Actor, ActorRef, ActorSystem, Props, ActorLogging} 


class Counter extends PersistentActor with ActorLogging { 

    import Counter._ 

    var state: State = new State(0) 

    override def receiveRecover: Receive = { 
    case RecoveryCompleted => println("Recovery completed.") 
    case SnapshotOffer(_, snapshot: State) => state = snapshot 
    case op: Operation => updateState(op) 
    } 


    override def persistenceId: String = "counter-persistent" 

    override def receiveCommand: Receive = { 
    case op: Operation => 
     println(s"Counter receive ${op}") 
     persist(op) { 
     op => updateState(op) 
     } 
    case "print" => println(s"The current state of couter is ${state}") 
    case SaveSnapshotFailure(_, reason) => println(s"save snapshot failed, reason: ${reason}") 
    case SaveSnapshotSuccess(_) => println(s"snapshot saved") 
    } 

    def updateState(op: Operation): Unit = op match { 
    case Increment(n) => 
     state = state.inc(n) 
     takeSnapshot 
    case Decrement(n) => 
     state = state.dec(n) 
     takeSnapshot 
    } 

    def takeSnapshot: Unit = { 
    // if (state % 5 == 0) saveSnapshot() 
    saveSnapshot() 
    } 
} 


object Counter { 

    sealed trait Operation { 
    val count: Int 
    } 

    case class Increment(override val count: Int) extends Operation 

    case class Decrement(override val count: Int) extends Operation 

    final case class State(n: Int) { 
    def inc(x: Int) = State(n + x) 

    def dec(x: Int) = State(n - x) 
    } 

} 







object Persistent extends App { 

    import Counter._ 

    val system = ActorSystem("persistent-actors") 

    val counter = system.actorOf(Props[Counter]) 

    counter ! Increment(3) 
    counter ! Increment(5) 
    counter ! Decrement(3) 
    counter ! "print" 

    Thread.sleep(1000) 

    system.terminate() 

} 

Konfiguration (application.conf):

akka { 
    persistence { 
    journal { 
     plugin = "akka.persistence.journal.leveldb", 
     leveldb { 
     dir = "target/example/journal", 
     native = false 
     } 
    }, 
    snapshot-store { 
     plugin = "akka.persistence.snapshot-store.local", 
     local { 
     dir = "target/example/snapshots" 
     } 
    } 
    } 
} 

die App zweimal Laufen zeigt, dass der Staat nicht persistent ist überhaupt:

Recovery completed. 
Counter receive Increment(3) 
Counter receive Increment(5) 
Counter receive Decrement(3) 
The current state of couter is State(5) 
snapshot saved 
snapshot saved 
snapshot saved 

Recovery completed. 
Counter receive Increment(3) 
Counter receive Increment(5) 
Counter receive Decrement(3) 
The current state of couter is State(5) 
snapshot saved 
snapshot saved 
snapshot saved 

Warum?

+0

Haben Sie das Persistenz-Plugin konfiguriert? Welches Journal verwendest du? – manub

+0

Was ist das Problem, vor dem Sie stehen, was nicht funktioniert? Bitte geben Sie weitere Informationen dazu an. – curious

+0

@manub Konfiguration jetzt veröffentlicht. – qed

Antwort

3

Das Problem hier ist, dass Sie Snapshot nach jeder Operation Nachricht der Akteur nimmt, aber während der Aufnahme Snapshot speichern Sie nicht seinen Zustand. Wenn Sie genau auf Sie schauen takeSnapshot Code:

def takeSnapshot: Unit = { 
    // if (state % 5 == 0) saveSnapshot() 
    saveSnapshot() 
    } 

der Aufruf von saveSnapshot() kein Schnappschuss Ihres Staates übernehmen, da es kein Argument an sie übergeben ist.

Sie benötigen Sie takeSnapshot Methode ein bisschen wie folgt zu ändern:

def takeSnapshot: Unit = { 
    // if (state % 5 == 0) saveSnapshot() 
    saveSnapshot(state) // Pass the states you need to store while taking a snapshot. 
    } 

Dies funktioniert.

Verwandte Themen