2017-10-31 3 views
0

Ich bin auf der Suche nach dem folgenden Code für eine unbekannte Menge von Schauspieler Fragen Anfragen arbeiten.Verarbeitung unbekannter Menge von Akteuren Ergebnisse auf einzelne Timeout

implicit val timeout = Timeout(100 millis) 
val sendRequestActor = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber)) 
val sendRequestActor2 = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber)) 
val a1 = ask(sendRequestActor, Request).fallbackTo(Future.successful(RequestTimeout)) 
val a2 = ask(sendRequestActor2, Request).fallbackTo(Future.successful(RequestTimeout)) 
val result = for { 
    r1 <- a1 
    r2 <- a2 
} yield(r1, r2) 

val r = Await.result(result, 100 millis) 
r match { 
    case (b: SuccessResponse, b2: SuccessResponse) => { 
    //Process Results 
    } 
    case (b: SuccessResponse, b2: RequestTimeout) => { 
    //Process Results 
    } 
    case (b: RequestTimeout, b2: SuccessResponse) => { 
    //Process Results 
    } 
    case (b: RequestTimeout, b2: RequestTimeout) => { 
    //Process Results 
    } 
    case _ => {} 
} 

Ich versuche, Anfragen an eine Liste der Empfänger (von einem früheren Datenbankaufruf erhalten) zu senden. Die Anzahl der Empfänger variiert bei jedem Aufruf dieser Funktion. Empfänger haben eine maximale Antwortzeit von 100 Millisekunden, bevor ich ihre Anforderungen zeitüberwachen und eine RequestTimeout aufzeichnen möchte. Der SendRequest Akteur antwortet mit SuccessResponse, wenn die Empfänger antworten. Ich gehe davon aus, dass ich die val result for-Schleife ändern muss, um eine Liste zu verarbeiten, aber ich bin mir nicht sicher, wie ich alles strukturieren soll, damit ich die Mindestzeit abwarten kann (entweder wenn alle Akteure zurückkehren oder wenn das Timeout eintritt) ist tiefer). Ich brauche nicht alles in einem einzigen Rückgabewert wie im Beispiel, mir geht es gut mit einer Liste von Ergebnissen und übereinstimmenden Typen bei jeder Iteration.

Jede Hilfe wäre willkommen, bitte lassen Sie mich wissen, wenn ich andere Informationen zur Verfügung stellen kann.

Danke

Edit:

Berufung Klasse:

case object GetResponses 

def main(args: Array[String]) { 

val route = { 
    get { 
    complete { 
     //stuff 
     val req_list = List(req1,req2,req3) 
     val createRequestActor = system.actorOf(Props(new SendAll(req_list)), "Get_Response_Actor_" + getActorNumber) 
     val request_future = ask(createRequestActor, GetResponses).mapTo[List[Any]] 
     Thread.sleep(1000) 
     println(request_future) 
     //more stuff 
    } 
    } 
} 


Http().bindAndHandle(route, "localhost", 8080) 
} 

Aktualisiert Senden Klasse:

class SendAll(requests: List[Request]) extends Actor { 
    import context.{become,dispatcher} 
    var numProcessed = 0 
    var results: List[Any] = List() 
    requests.foreach(self ! _) 

    implicit val timeout = Timeout(100 millis) 
    def receive = { 

    case r: RequestMsg => 
     val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request_".concat(getActorNumber)) 
     (sendRequestActor ? Request).pipeTo(self) 

    case s: SuccessResponse => 
     println("Got Success") 
     results = results :+ s 
     println(results.size + " == " + requests.size) 
     if(results.size == requests.size) { 
     println("Before done") 
     become(done) 
     } 

    case akka.actor.Status.Failure(f) => 
     println("Got Failed") 
     results = results :+ RequestTimeout 
     if(results.size == requests.size) { 
     become(done) 
     } 

    case m => 
     println("Got Other") 

    } 

    def done: Receive = { 
    case GetResponses => 
     println("Done") 
     sender ! results 
    case _ => { 
     println("Done as well") 
    } 
    } 
} 

Ausgabe

Got Success 
1 == 3 
Got Success 
2 == 3 
Got Success 
3 == 3 
Before done 
Future(<not completed>) 

Antwort

1

Ich würde die Liste der Anfragen an den Schauspieler, dann pipe die Antworten von den untergeordneten Akteuren an self statt Await.result übergeben. Zum Beispiel:

class Handler(requests: List[RequestMsg]) extends Actor { 
    import context.{become, dispatcher} 
    var numProcessed = 0 
    var results: List[Any] = List() 
    requests.foreach(self ! _) 

    implicit val timeout = Timeout(100.millis) 

    def receive = { 
    case r: RequestMsg => 
     val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request".concat(getActorNumber)) 
     (sendRequestActor ? Request).pipeTo(self) 

    case s: SuccessResponse => 
     println(s"response: $s") 
     results = results :+ s 
     if (results.size == requests.size) 
     become(done) 

    case akka.actor.Status.Failure(f) => 
     println("a request failed or timed out") 
     results = results :+ RequestTimeout 
     if (results.size == requests.size) 
     become(done) 

    case m => 
     println(s"Unhandled message received while processing requests: $m") 
     sender ! NotDone 
    } 

    def done: Receive = { 
    case GetResponses => 
     println("sending responses") 
     sender ! results 
    } 
} 

Sie würden einen Schauspieler für jede Liste von Anfragen instanziiert:

val requests1 = List(RequestMsg("one"), RequestMsg("two"), RequestMsg("three")) 
val handler1 = system.actorOf(Props(new Handler(requests1))) 

In diesem Beispiel - nach dem Prinzip, dass ein Schauspieler eine deutliche, begrenzte Sphäre responsibility-- haben sollte der Schauspieler koordiniert einfach Anfragen und Antworten; Es führt keine Verarbeitung der gesammelten Antworten durch. Die Idee ist, dass ein anderer Akteur diesem Akteur eine GetResponses Nachricht senden würde, um die Antworten zu erhalten und sie zu verarbeiten (oder dieser Akteur würde proaktiv die Ergebnisse an einen verarbeitenden Akteur senden).

+0

Das sieht aus wie es tun wird, was ich will. Aber ich habe ein Problem mit der "fertig" -Funktion, es scheint überhaupt nicht aufgerufen zu werden (scheint nicht der Sleep-Timer zu sein). Ich hatte zuvor einen anderen Akteur, der diese Funktion aufruft, und würde dort sein, wo die Verarbeitung stattfindet. Ich aktualisierte das OP, um meine Aufrufklasse sowie meine aktualisierte Sendeklasse einzuschließen.Irgendeine Ahnung, warum ich nie etwas von meiner fertigen Klasse drucken sehe und meine Zukunft nicht abgeschlossen habe? – Eumcoz

+1

@Eumcoz: In Bezug auf Ihr Update, ein paar Anmerkungen: (1) Ihr 'RequestHandler' sieht seltsam aus, dass es' Actor' erweitert und 'system.actorOf' aufruft (' system.actorOf' sollte im Hauptprogramm aufgerufen werden, nicht aus einem Schauspieler). (2) Sie geben wahrscheinlich nicht genug Zeit für das Abarbeiten der Anfrage-Antwort-Verarbeitung. Erhöhen Sie den 'Thread.sleep' oder verwenden Sie den' onComplete' Callback auf 'Future'. – chunjef

+0

Entschuldigung, meine Schuld, aus irgendeinem Grund dachte ich, dass dies von woanders her aufgerufen wurde, in meinem eigentlichen Programm 'RequestHandler' ist mein Hauptprogramm und ist ein akka-http-Handler. 'SendAll' wird von einer Direktive aus einer http-Anfrage aufgerufen. Egal, wie lange ich schlafe, es scheitert, mein Call-Timeout für Calls wird ausgelöst: 'Failure (akka.pattern.AskTimeoutException: Ask wurde auf [Actor [akka: // HTTP_Actor_System/user/Get_Response_Actor_1 # -720534280]] nach [ 10000 ms]. Sender [null] hat eine Nachricht vom Typ "GetResponses $" gesendet. '' Bei vorherigen Tests dauert die Verarbeitung meiner Antwort ca. 5 ms. – Eumcoz

0

Die einfachste Lösung ist, alle Refs in denen es zu List[Future]List Karte Ihrer Schauspieler setzen und Future.sequenceFuture[List] zu erhalten verwenden.

val route = { 
    get { 
    val listActorRefs = List(actorRef1, actorRef2, ...) 
    val futureListResponses = Future.sequence(listActorRefs.map(_ ? Request)) 
    onComplete(futureListResponses) { 
     case Success(listResponse) => ... 
     complete(...) 
     case Failure(exception) => ... 
    } 
    } 
} 

Eine bessere Lösung ist eine Menge Schauspieler vermeiden‘fragt, bereiten einige ResponseCollector Schauspieler, die alle Ihre Nachricht senden wird (Ich schlage vor, bei BroadcastPool suchen) und eine Fahrplanmeldung für sich warten und Rückkehr zu stoppen Ergebnis.

Verwandte Themen