Ich bin ein mehr oder weniger Neuling zu akka und akka-http und ein Problem mit dem Senden eines http singleRequest von einem Schauspieler.akka http, singleRequest (java), Anfrage wird nicht gesendet
Was ich versuche zu erreichen:
- Akteure berechnen Werte bei Empfang eines Berechnungsreihenfolge
- sobald Berechnung einen anderen Schauspieler gemacht wird, ihn nennen wir ReplyActor wird aufgerufen sende eine "done" http Anfrage an ein anderes System
- Nachrichten werden in die Warteschlange gestellt, in meinem Setup gibt es immer nur eine einzige ReplyActor lebendig
Alles funktioniert gut, wenn ich eine einzige Berechnungsreihenfolge senden. Wenn 10 Berechnungsaufträge vorliegen, wird die Anforderung irgendwann nicht mehr gesendet, je nachdem, wie stark das System mit Debug-Meldungen verlangsamt wird. Keine Ausnahmen, keine Timeouts, noting.
Mein Akteur-Setup ist ähnlich dem verteilten Master-Arbeiter Beispiel aus den Akka-Beispielen. Beim Versuch, herauszufinden, was schief geht, habe ich nur einen einzigen Arbeiter (CalculationActor und ReplyActor).
Nun, ich habe noch ein paar Details für Sie.
Zunächst funktioniert alles auch gut, wenn der Anfrage-Endpunkt in akka http geschrieben wird. Leider ist es in sparkjava geschrieben, das auf Anlegesteg beruht. Aber soweit ich das beurteilen kann, ist es nicht die Schuld des Endpunkts. Die Anfrage wird nicht gesendet.
Bei akka.http.impl.engine.client.PoolConductor # gelten ist ein Diagramm des Befehlsflusses:
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Command
Context | retry | | slot- | Command | doubler | | route +-------------->
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
| | | | | | | Route) +-------------->
+----+------+ +-----+-----+ +-------------+ +-----------+ to slots
^ ^
| | SlotEvent
| +----+----+
| | flatten | mapAsync
| +----+----+
| | RawSlotEvent
| Request- |
| Context +---------+
+-------------+ retry |<-------- RawSlotEvent (from slotEventMerge)
| Split |
+---------+
Anfragen, die nicht gesendet werden, werden die Schlitz-Selector-Befehl fehlt . Keine Ahnung warum. Ich habe schon einige Zeit mit dem Debuggen verbracht. Folgendes ist von einer Art von Hilfe Vielleicht:
Befehlskette erfolgreich gesendet Anfragen:
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 51
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 51
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 49
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (cass): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand
threadId: 49
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.rendering.RequestRenderingContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): scala.collection.immutable.$colon$colon
threadId: 78
akka.stream.actor.ActorPublisher#onNext
List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ...
threadId: 78
Befehlskette nicht gesendet Anfrage (gleiche run):
akka.stream.actor.ActorPublisher#onNext
RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 116
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 116
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List([email protected]),0)
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: [email protected]
out: MergePreferred.out,
next: [email protected]
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: [email protected]
out: MergePreferred.out,
next: [email protected]
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _
handler: [email protected]
out: MergePreferred.out
next: [email protected]
threadId: 78
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(1,0))
threadId: 49
Ich schätze irgendeine Hilfe. Vielen Dank!
Version ist 2.4.8 (2.11) (akka-Schauspieler, akka-http-Kern, akka-http-experimentell, akka-stream) es