2016-10-27 1 views
0

Ich bin ein mehr oder weniger Neuling zu akka und akka-http und ein Problem mit dem Senden eines http singleRequest von einem Schauspieler.akka http, singleRequest (java), Anfrage wird nicht gesendet

Was ich versuche zu erreichen:

  • Akteure berechnen Werte bei Empfang eines Berechnungsreihenfolge
  • sobald Berechnung einen anderen Schauspieler gemacht wird, ihn nennen wir ReplyActor wird aufgerufen sende eine "done" http Anfrage an ein anderes System
  • Nachrichten werden in die Warteschlange gestellt, in meinem Setup gibt es immer nur eine einzige ReplyActor lebendig

Alles funktioniert gut, wenn ich eine einzige Berechnungsreihenfolge senden. Wenn 10 Berechnungsaufträge vorliegen, wird die Anforderung irgendwann nicht mehr gesendet, je nachdem, wie stark das System mit Debug-Meldungen verlangsamt wird. Keine Ausnahmen, keine Timeouts, noting.

Mein Akteur-Setup ist ähnlich dem verteilten Master-Arbeiter Beispiel aus den Akka-Beispielen. Beim Versuch, herauszufinden, was schief geht, habe ich nur einen einzigen Arbeiter (CalculationActor und ReplyActor).

Nun, ich habe noch ein paar Details für Sie.

Zunächst funktioniert alles auch gut, wenn der Anfrage-Endpunkt in akka http geschrieben wird. Leider ist es in sparkjava geschrieben, das auf Anlegesteg beruht. Aber soweit ich das beurteilen kann, ist es nicht die Schuld des Endpunkts. Die Anfrage wird nicht gesendet.

Bei akka.http.impl.engine.client.PoolConductor # gelten ist ein Diagramm des Befehlsflusses:

Request- +-----------+  +-----------+ Switch- +-------------+  +-----------+ Command 
Context | retry |  | slot- | Command | doubler |  | route +--------------> 
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +--------------> 
      |   |  |   |    |    |  | Route) +--------------> 
      +----+------+  +-----+-----+    +-------------+  +-----------+  to slots 
       ^    ^
       |     | SlotEvent 
       |    +----+----+ 
       |    | flatten | mapAsync 
       |    +----+----+ 
       |     | RawSlotEvent 
       | Request-   | 
       | Context  +---------+ 
       +-------------+ retry |<-------- RawSlotEvent (from slotEventMerge) 
           | Split | 
           +---------+ 

Anfragen, die nicht gesendet werden, werden die Schlitz-Selector-Befehl fehlt . Keine Ahnung warum. Ich habe schon einige Zeit mit dem Debuggen verbracht. Folgendes ist von einer Art von Hilfe Vielleicht:

Befehlskette erfolgreich gesendet Anfragen:

akka.stream.actor.ActorPublisher#onNext 
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 51 
akka.stream.actor.ActorPublisher#onNext 
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 51 
akka.stream.stage.GraphStageLogic#grab (fast path) 
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 49 
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push 
    out (class): akka.stream.Outlet 
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext 
    threadId: 49 
akka.stream.stage.GraphStageLogic#grab (fast path) 
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 49 
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push 
    out (cass): akka.stream.Outlet 
    element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand) 
    threadId: 49 
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push 
    out (class): akka.stream.Outlet 
    element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand 
    threadId: 49 
akka.stream.actor.ActorPublisher#onNext 
    HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)) 
    threadId: 78 
akka.stream.actor.ActorPublisher#onNext 
    HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)) 
    threadId: 78 
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push 
    out (class): akka.stream.Outlet 
    element (class): akka.http.impl.engine.rendering.RequestRenderingContext 
    threadId: 78 
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push 
    out (class): akka.stream.Outlet 
    element (class): scala.collection.immutable.$colon$colon 
    threadId: 78 
akka.stream.actor.ActorPublisher#onNext 
    List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ... 
    threadId: 78 

Befehlskette nicht gesendet Anfrage (gleiche run):

akka.stream.actor.ActorPublisher#onNext 
    RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 116 
akka.stream.actor.ActorPublisher#onNext 
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 116 
akka.stream.stage.GraphStageLogic#grab (fast path) 
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List([email protected]),0) 
    threadId: 78 
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting 
    out (class): akka.stream.Outlet 
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext 
    threadId: 78 
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting 
    out (class): akka.stream.Outlet 
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext 
    threadId: 78 
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e 
    handler: [email protected] 
    out: MergePreferred.out, 
    next: [email protected] 
    threadId: 78 
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e 
    handler: [email protected] 
    out: MergePreferred.out, 
    next: [email protected] 
    threadId: 78 
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _ 
    handler: [email protected] 
    out: MergePreferred.out 
    next: [email protected] 
    threadId: 78 
akka.stream.actor.ActorPublisher#onNext 
    element: List(Disconnected(0,0)) 
    threadId: 117 
akka.stream.actor.ActorPublisher#onNext 
    element: List(Disconnected(0,0)) 
    threadId: 117 
akka.stream.actor.ActorPublisher#onNext 
    element: List(Disconnected(1,0)) 
    threadId: 49 

Ich schätze irgendeine Hilfe. Vielen Dank!

Version ist 2.4.8 (2.11) (akka-Schauspieler, akka-http-Kern, akka-http-experimentell, akka-stream) es

Antwort

0

Verstanden! Sieht so aus, als ob ich meine Anfragen "falsch" gemacht hätte.

schlägt nach ~ 4 Anfragen:

Http http = Http.get(context().system()); 
ActorMaterializer materializer = ActorMaterializer.create(context().system()); 
HttpRequest request = HttpRequestPOST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message); 

http.singleRequest(request, materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t)); 

Works:

Http http = Http.get(context().system()); 
ActorMaterializer materializer = ActorMaterializer.create(context().system()); 
Flow<HttpRequest, HttpResponse, CompletionStage<OutgoingConnection>> flow = http.outgoingConnection(ConnectHttp.toHost("localhost", 8091)); 
HttpRequest request = HttpRequest.POST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message); 

Source.single(request).via(flow).runWith(Sink.head(), materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t)); 

Vielen Dank an this question (and answer), die mich in die richtige Richtung.

Verwandte Themen