2013-08-21 10 views
8

Ich verstehe, wie Sie eine Nachricht nicht blockierende Anwendung in akka, und können einfach Beispiele, die gleichzeitige Operationen durchführen und die aggregierten Ergebnisse in einer Nachricht zurückgeben. Wo ich Schwierigkeiten habe, zu verstehen, was meine nicht blockierenden Optionen sind, wenn meine Anwendung auf eine HTTP-Anfrage antworten muss. Das Ziel ist, eine Anfrage zu erhalten und sofort an einen lokalen oder entfernten Akteur zu übergeben, der die Arbeit erledigt, die wiederum es ausgibt, um ein Ergebnis zu erhalten, das einige Zeit dauern könnte. Leider verstehe ich unter diesem Modell nicht, wie ich dies mit einer nicht blockierenden Serie von "Tells" ausdrücken könnte, anstatt "ask" zu blockieren. Wenn ich an irgendeinem Punkt in der Kette einen Tell verwende, habe ich keine Zukunft mehr für als eventuellen Antwort-Inhalt zu verwenden (benötigt von der http-Framework-Schnittstelle, die in diesem Fall finagle ist - aber das ist nicht wichtig). Ich verstehe die Anfrage ist auf einem eigenen Thread, und mein Beispiel ist ziemlich erfunden, aber nur versuchen, meine Design-Optionen zu verstehen.Akka nicht blockierende Optionen, wenn eine HTTP-Antwort erforderlich ist

Zusammenfassend, Wenn mein künstliches Beispiel unten überarbeitet werden kann, um weniger zu blockieren, ich liebe es sehr zu verstehen, wie. Dies ist meine erste Verwendung von akka seit einigen Licht-Exploration vor einem Jahr + und in jedem Artikel, Dokument, und sprechen, die ich gesehen habe sagt nicht für Dienstleistungen blockieren.

Konzeptionelle Antworten können hilfreich sein, können aber auch die gleichen sein wie das, was ich bereits gelesen habe. Das Bearbeiten/Bearbeiten meines Beispiels wäre wahrscheinlich der Schlüssel zu meinem Verständnis des genauen Problems, das ich zu lösen versuche. Wenn das aktuelle Beispiel in der Regel ist, was zu tun ist, dass die Bestätigung auch hilfreich ist, so suche ich nicht nach Magie, die nicht existiert.

Hinweis Die folgenden Aliase. Import com.twitter.util {Zukunft => Pressezukunfts, Await => TwitterAwait}

object Server { 

    val system = ActorSystem("Example-System") 

    implicit val timeout = Timeout(1 seconds) 

    implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = { 
    val promise = TwitterPromise[T] 
    scFuture onComplete { 
     case Success(result) ⇒ promise.setValue(result) 
     case Failure(failure) ⇒ promise.setException(failure) 
    } 
    promise 
    } 

    val service = new Service[HttpRequest, HttpResponse] { 
    def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
     case "https://stackoverflow.com/a/b/c" => 
     val w1 = system.actorOf(Props(new Worker1)) 

     val r = w1 ? "take work" 

     val response: Future[HttpResponse] = r.mapTo[String].map { c => 
      val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
      resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
      resp 
     } 

     response 
    } 
    } 

//val server = Http.serve(":8080", service); TwitterAwait.ready(server) 

    class Worker1 extends Actor with ActorLogging { 
    def receive = { 
     case "take work" => 
     val w2 = context.actorOf(Props(new Worker2)) 
     pipe (w2 ? "do work") to sender 
    } 
    } 

    class Worker2 extends Actor with ActorLogging { 
    def receive = { 
     case "do work" => 
     //Long operation... 
     sender ! "The Work" 
    } 
    } 

    def main(args: Array[String]) { 
    val r = service.apply(
     com.twitter.finagle.http.Request("https://stackoverflow.com/a/b/c") 
    ) 
    println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work 
    } 
} 

Vielen Dank im Voraus für jede angebotene Führung!

Antwort

5

Sie vermeiden können, in Worker1 unter Verwendung des pipe pattern -ie, eine Zukunft als Nachricht zu senden Sie schreiben würde:

pipe(w2 ? "do work") to sender 

Statt:

sender ! (w2 ? "do work") 

Jetzt wird r ein Future[String] sein anstelle von Future[Future[String]].


Update: die pipe Lösung oben ist eine allgemeine Art und Weise Ihre Schauspieler mit Zukunft reagieren vermeiden müssen. Wie Viktor unten in einem Kommentar weist darauf hin, in diesem Fall können Sie Ihr nehmen Worker1 aus der Schleife vollständig von Worker2 erzählen direkt mit dem Schauspieler, der es (Worker1) von der Nachricht erhalten zu reagieren:

w2.tell("do work", sender) 

Dies gewonnen Ist keine Option, wenn Worker1 ist verantwortlich für den Betrieb auf die Antwort von Worker2 in irgendeiner Weise (mit map on w2 ? "do work", die Kombination mehrerer Futures mit flatMap oder for-Comprehension, etc.), aber wenn das nicht notwendig ist, ist diese Version sauberer und effizienter.


Das tötet einen Await.result.

val response: Future[HttpResponse] = r.mapTo[String].map { c => 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

Jetzt müssen Sie ihn nur Future in eine TwitterFuture drehen: Sie können durch das Schreiben so etwas wie die folgenden von den anderen loszuwerden. Ich kann Ihnen nicht genau sagen, wie das geht, aber es sollte fairly trivial sein, und definitiv keine Blockierung.

+0

Vielen Dank für Ihre schnelle Antwort Travis. Das bereinigt die Zukunft und die Erwartungen ein wenig. Also glaubst du, dass die Verwendung der zwei Fragen tatsächlich erforderlich ist (Natürlich konnte ich es nur so sehen - aber wollte sicher gehen)? Ich werde meinen Code aktualisieren, um Ihre Verbesserungen einzubeziehen und die impliziten Konvertierungen von einer Akka-Zukunft in eine Twitter-Zukunft zu integrieren. Ich kenne mich noch nicht so gut mit der Stack Overflow-Etikette aus, daher gebe ich ein +1 für Verbesserungen. Jede zusätzliche Information zu den Fragen wäre hilfreich. Vielen Dank! – Eric

+0

Es ist schwer zu sagen, ob die Fragen erforderlich sind, ohne mehr darüber zu wissen, wofür diese Akteure verantwortlich sind, aber der wichtige Teil ist, dass Fragen nicht blockiert werden müssen (es gibt ein wenig zusätzliche Buchhaltung, aber es ist immer noch asynchron). Ich würde auch vorschlagen, die Konvertierung zwischen Twitter und Standard-Bibliotheks-Futures explizit zu halten - eine Conversion-Methode aufrufen zu müssen, ist in der Regel ein kleiner Preis, um mögliche Verwirrung in einem Fall wie diesem zu vermeiden. –

+0

Vielen Dank für Ihre Hilfe und Einblicke in dieses Travis. Das löst meine Sorgen perfekt. – Eric

0

Sie müssen hier definitiv nicht blocken. Aktualisieren Sie zunächst Ihre Import für die twitter Zeug:

import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise} 

benötigen Sie den twitter Promise als dass die impl von Future ist man von der apply Methode zurück. Dann folge dem, was Travis Brown in seiner Antwort gesagt hat, so dass dein Akteur so reagiert, dass du keine verschachtelten Futures hast. Sobald Sie das tun, sollten Sie in der Lage sein, Ihre apply Methode, um so etwas zu ändern:

def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
    case "https://stackoverflow.com/a/b/c" => 
    val w1 = system.actorOf(Props(new Worker1)) 

    val r = (w1 ? "take work").mapTo[String] 
    val prom = new TwitterPromise[HttpResponse] 
    r.map(toResponse) onComplete{ 
     case Success(resp) => prom.setValue(resp) 
     case Failure(ex) => prom.setException(ex)    
    } 

    prom 
} 

def toResponse(c:String):HttpResponse = { 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

Dies muss wahrscheinlich ein wenig mehr Arbeit. Ich habe es nicht in meiner IDE eingerichtet, daher kann ich nicht garantieren, dass es kompiliert wird, aber ich glaube, dass die Idee gut ist. Was Sie von der apply Methode zurückgeben, ist eine , die noch nicht abgeschlossen ist. Es wird abgeschlossen sein, wenn die Zukunft von dem Akteur ask (?) Erledigt ist und dies über einen nicht blockierenden onComplete Callback geschieht.

+0

Ich hatte meine Antwort mit den impliziten und Änderungen aktualisiert, aktualisiert und sah Ihre Antwort, die im Wesentlichen die gleiche Funktionalität nur Inline ist. Danke, dass du dir die Zeit genommen hast. Ich nehme an, mit beiden Antworten sind die Fragen in der Tat in Ordnung. War es hauptsächlich eine Übung, die Aufwartung entweder mit Ihrer Methode oder mit der von Travis vorgeschlagenen Methode zu töten, von der ich glaube, dass sie gleichwertig ist? Danke noch einmal. – Eric

+0

Die zukünftige Konvertierung mit der Antwortaufbau-Logik so zu gestalten, erscheint mir nicht ideal - besonders, wenn die Konvertierung auch anderswo notwendig ist (was wahrscheinlich ist). Gibt es einen Grund, warum Sie diesen Ansatz für das Mapping auf die Zukunft vorschlagen und dann konvertieren? –

+0

@TravisBrown, nur für die Einfachheit des Beispiels, werde ich in einer Sekunde aktualisieren, die zuerst eine 'map' zeigt, die konvertiert wird, bevor 'onComplete' ausgeführt wird. Das ist, worüber du gerade gesprochen hast, oder? – cmbaxter

Verwandte Themen