Ich bin auf der Suche nach dem folgenden Code für eine unbekannte Menge von Schauspieler Fragen Anfragen arbeiten.Verarbeitung unbekannter Menge von Akteuren Ergebnisse auf einzelne Timeout
implicit val timeout = Timeout(100 millis)
val sendRequestActor = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber))
val sendRequestActor2 = context.actorOf(Props(new SendRequest(request)), "Send_Request_".concat(getActorNumber))
val a1 = ask(sendRequestActor, Request).fallbackTo(Future.successful(RequestTimeout))
val a2 = ask(sendRequestActor2, Request).fallbackTo(Future.successful(RequestTimeout))
val result = for {
r1 <- a1
r2 <- a2
} yield(r1, r2)
val r = Await.result(result, 100 millis)
r match {
case (b: SuccessResponse, b2: SuccessResponse) => {
//Process Results
}
case (b: SuccessResponse, b2: RequestTimeout) => {
//Process Results
}
case (b: RequestTimeout, b2: SuccessResponse) => {
//Process Results
}
case (b: RequestTimeout, b2: RequestTimeout) => {
//Process Results
}
case _ => {}
}
Ich versuche, Anfragen an eine Liste der Empfänger (von einem früheren Datenbankaufruf erhalten) zu senden. Die Anzahl der Empfänger variiert bei jedem Aufruf dieser Funktion. Empfänger haben eine maximale Antwortzeit von 100 Millisekunden, bevor ich ihre Anforderungen zeitüberwachen und eine RequestTimeout
aufzeichnen möchte. Der SendRequest
Akteur antwortet mit SuccessResponse
, wenn die Empfänger antworten. Ich gehe davon aus, dass ich die val result
for-Schleife ändern muss, um eine Liste zu verarbeiten, aber ich bin mir nicht sicher, wie ich alles strukturieren soll, damit ich die Mindestzeit abwarten kann (entweder wenn alle Akteure zurückkehren oder wenn das Timeout eintritt) ist tiefer). Ich brauche nicht alles in einem einzigen Rückgabewert wie im Beispiel, mir geht es gut mit einer Liste von Ergebnissen und übereinstimmenden Typen bei jeder Iteration.
Jede Hilfe wäre willkommen, bitte lassen Sie mich wissen, wenn ich andere Informationen zur Verfügung stellen kann.
Danke
Edit:
Berufung Klasse:
case object GetResponses
def main(args: Array[String]) {
val route = {
get {
complete {
//stuff
val req_list = List(req1,req2,req3)
val createRequestActor = system.actorOf(Props(new SendAll(req_list)), "Get_Response_Actor_" + getActorNumber)
val request_future = ask(createRequestActor, GetResponses).mapTo[List[Any]]
Thread.sleep(1000)
println(request_future)
//more stuff
}
}
}
Http().bindAndHandle(route, "localhost", 8080)
}
Aktualisiert Senden Klasse:
class SendAll(requests: List[Request]) extends Actor {
import context.{become,dispatcher}
var numProcessed = 0
var results: List[Any] = List()
requests.foreach(self ! _)
implicit val timeout = Timeout(100 millis)
def receive = {
case r: RequestMsg =>
val sendRequestActor = context.actorOf(Props(new SendRequest(r)), "Send_Request_".concat(getActorNumber))
(sendRequestActor ? Request).pipeTo(self)
case s: SuccessResponse =>
println("Got Success")
results = results :+ s
println(results.size + " == " + requests.size)
if(results.size == requests.size) {
println("Before done")
become(done)
}
case akka.actor.Status.Failure(f) =>
println("Got Failed")
results = results :+ RequestTimeout
if(results.size == requests.size) {
become(done)
}
case m =>
println("Got Other")
}
def done: Receive = {
case GetResponses =>
println("Done")
sender ! results
case _ => {
println("Done as well")
}
}
}
Ausgabe
Got Success
1 == 3
Got Success
2 == 3
Got Success
3 == 3
Before done
Future(<not completed>)
Das sieht aus wie es tun wird, was ich will. Aber ich habe ein Problem mit der "fertig" -Funktion, es scheint überhaupt nicht aufgerufen zu werden (scheint nicht der Sleep-Timer zu sein). Ich hatte zuvor einen anderen Akteur, der diese Funktion aufruft, und würde dort sein, wo die Verarbeitung stattfindet. Ich aktualisierte das OP, um meine Aufrufklasse sowie meine aktualisierte Sendeklasse einzuschließen.Irgendeine Ahnung, warum ich nie etwas von meiner fertigen Klasse drucken sehe und meine Zukunft nicht abgeschlossen habe? – Eumcoz
@Eumcoz: In Bezug auf Ihr Update, ein paar Anmerkungen: (1) Ihr 'RequestHandler' sieht seltsam aus, dass es' Actor' erweitert und 'system.actorOf' aufruft (' system.actorOf' sollte im Hauptprogramm aufgerufen werden, nicht aus einem Schauspieler). (2) Sie geben wahrscheinlich nicht genug Zeit für das Abarbeiten der Anfrage-Antwort-Verarbeitung. Erhöhen Sie den 'Thread.sleep' oder verwenden Sie den' onComplete' Callback auf 'Future'. – chunjef
Entschuldigung, meine Schuld, aus irgendeinem Grund dachte ich, dass dies von woanders her aufgerufen wurde, in meinem eigentlichen Programm 'RequestHandler' ist mein Hauptprogramm und ist ein akka-http-Handler. 'SendAll' wird von einer Direktive aus einer http-Anfrage aufgerufen. Egal, wie lange ich schlafe, es scheitert, mein Call-Timeout für Calls wird ausgelöst: 'Failure (akka.pattern.AskTimeoutException: Ask wurde auf [Actor [akka: // HTTP_Actor_System/user/Get_Response_Actor_1 # -720534280]] nach [ 10000 ms]. Sender [null] hat eine Nachricht vom Typ "GetResponses $" gesendet. '' Bei vorherigen Tests dauert die Verarbeitung meiner Antwort ca. 5 ms. – Eumcoz