Ich verwende Akka Cluster (Version 2.4.10) mit wenigen Knoten für "Front-End" -Rolle und einige andere als "Arbeiter". Die Arbeiter sind auf entfernten Maschinen. Die ankommende Arbeit wird vom Front-End-Akteur über Round-Robin-Routing an die Arbeiter verteilt. Das Problem ist, die Antwort der "Arbeiter" zurück an den Frontend-Schauspieler zu senden. Ich kann sehen, dass die Arbeit von den Arbeitern abgeschlossen wird. Aber die von den Arbeitern an das Front-End gesandte Botschaft erreicht und endet nicht als tote Briefe. Ich sehe den folgenden Fehler im Protokoll.Akka-Round-Robin: Antwort von Remote-Routern an den Absender senden
[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.
Ich habe gesehen, this und ich bin nach dem gleichen in meinem Code. Ich habe auch this gesehen, aber die vorgeschlagene Lösung trifft in diesem Fall nicht zu, weil ich die Routen nicht im Voraus kenne. Es kommt durch die Konfiguration und es kann sich ändern. Die Round-Robin-Router-Konfiguration ist wie folgt.
akka.actor.deployment {
/frontEnd/hm = {
router = round-robin-group
nr-of-instances = 5
routees.paths = ["/user/hmWorker"]
cluster {
enabled = on
use-role = backend
allow-local-routees = on
}
}
}
Der Router wird in Front-End-Akteur wie unten instanziiert.
val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))
Der Controller und die Worker-Codes sind unten aufgeführt.
// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {
val list = List("a", "b") // Assume this is a big list
val groups = list.grouped(500)
override def receive: Actor.Receive = {
val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
val future = Future.sequence(futures).map(_.flatten)
val result = Await.result(future, 50 seconds)
println(s"Result is $result")
}
}
// Node 2
class Worker extends Actor {
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future onComplete {
case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
}
Bitte lassen Sie mich wissen, was ich hier falsch mache. Schätze deine Hilfe.
Ja. Es funktioniert mit "pipeTo". Vielen Dank. – Jegan