2016-10-06 2 views
0

Ich verwende Akka Cluster (Version 2.4.10) mit wenigen Knoten für "Front-End" -Rolle und einige andere als "Arbeiter". Die Arbeiter sind auf entfernten Maschinen. Die ankommende Arbeit wird vom Front-End-Akteur über Round-Robin-Routing an die Arbeiter verteilt. Das Problem ist, die Antwort der "Arbeiter" zurück an den Frontend-Schauspieler zu senden. Ich kann sehen, dass die Arbeit von den Arbeitern abgeschlossen wird. Aber die von den Arbeitern an das Front-End gesandte Botschaft erreicht und endet nicht als tote Briefe. Ich sehe den folgenden Fehler im Protokoll.Akka-Round-Robin: Antwort von Remote-Routern an den Absender senden

[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered. 

Ich habe gesehen, this und ich bin nach dem gleichen in meinem Code. Ich habe auch this gesehen, aber die vorgeschlagene Lösung trifft in diesem Fall nicht zu, weil ich die Routen nicht im Voraus kenne. Es kommt durch die Konfiguration und es kann sich ändern. Die Round-Robin-Router-Konfiguration ist wie folgt.

akka.actor.deployment { 
    /frontEnd/hm = { 
    router = round-robin-group 
    nr-of-instances = 5 
    routees.paths = ["/user/hmWorker"] 
    cluster { 
     enabled = on 
     use-role = backend 
     allow-local-routees = on 
    } 
    } 
} 

Der Router wird in Front-End-Akteur wie unten instanziiert.

val router = context.actorOf(FromConfig.props(), name = "hm") 
val controller = context.actorOf(Props(classOf[Controller], router)) 

Der Controller und die Worker-Codes sind unten aufgeführt.

// Node 1 : Controller routes requests using round-robin 
class Controller(router: ActorRef) extends Actor { 

    val list = List("a", "b") // Assume this is a big list 

    val groups = list.grouped(500) 

    override def receive: Actor.Receive = { 
     val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]])) 
     val future = Future.sequence(futures).map(_.flatten) 
     val result = Await.result(future, 50 seconds) 
     println(s"Result is $result") 
    } 
} 

// Node 2 
class Worker extends Actor { 

    override def receive: Actor.Receive = { 
     case Message(lst) => 
      val future: Future[List[String]] = // Do Something asynchronous 
      future onComplete { 
       case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor. 
       case Failure(th) => // Error handling 
      } 
    } 
} 

Bitte lassen Sie mich wissen, was ich hier falsch mache. Schätze deine Hilfe.

Antwort

2

Sie sollten sender() nicht im Rückruf auf einem Future verwenden. Zu dem Zeitpunkt, zu dem der Rückruf verarbeitet wird, bezieht sich die sender() wahrscheinlich auf etwas anderes als beim Empfang der Nachricht.

Betrachten Sie entweder die Referenz außerhalb des Callback-Speichern zuerst wie:

override def receive: Actor.Receive = { 
    case Message(lst) => 
     val future: Future[List[String]] = // Do Something asynchronous 
     val replyTo: ActorRef = sender() 
     future onComplete { 
      case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor. 
      case Failure(th) => // Error handling 
     } 
} 

Oder noch besser, verwenden Sie das Rohr Muster:

import akka.pattern.pipe 
override def receive: Actor.Receive = { 
    case Message(lst) => 
    val future: Future[List[String]] = // Do Something asynchronous 
    future.pipeTo(sender()) 
} 
+0

Ja. Es funktioniert mit "pipeTo". Vielen Dank. – Jegan

Verwandte Themen