Ich habe eine SourceQueue
. Wenn ich ein Element dazu anbiete, möchte ich, dass es durch die Stream
geht und wenn es die Sink
erreicht, wird die Ausgabe an den Code zurückgegeben, der dieses Element angeboten hat (ähnlich wie gibt ein Element an den Aufruf zurück).Akka Stream Rückgabeobjekt von Sink
Wie erreiche ich das? Ein einfaches Beispiel für mein Problem wäre:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
Edit: in dieser Frage Für das Beispiel I Vladimir Matveev die beste Antwort zur Verfügung gestellt gegeben haben. Es sollte jedoch beachtet werden, dass diese Lösung nur funktioniert, wenn die Elemente in der gleichen Reihenfolge in die sink
gehen, die sie der source
angeboten wurden. Wenn dies nicht garantiert werden kann, kann die Reihenfolge der Elemente in der sink
abweichen und das Ergebnis könnte sich von dem, was erwartet wird, unterscheiden.
Das ist cool und es funktioniert gut mit meinem Beispiel. In meinem eigentlichen Code benutze ich einen 'Stream', um' HttpRequests' zu behandeln. Der 'flow' verzweigt sich in mehrere Teilströme und wird wieder zusammengeführt. Einige der Teilströme sind schneller als andere, und wenn ich den "Sink" als "Warteschlange" benutze, denke ich, dass ich nicht garantieren kann, dass eine Anfrage die richtige Antwort liefert. – RemcoW
Nun, gemäß Ihren Beispielen und der Beschreibung möchten Sie einen Wert an die Quellwarteschlange senden und ihn anschließend synchron aus der Senke zurückholen. Solange Sie dem Muster "Push the input - pull the output" folgen, können Sie sicher sein, dass Sie die Antworten in der richtigen Reihenfolge verarbeiten. Aber selbst wenn Ihr Zugriffsmuster anders ist (und es wäre nett, wenn es in der Frage reflektiert würde), besteht der einfachste Weg, die Anfrage mit der Antwort zu korrelieren, darin, die Anfrage zusammen mit dem transformierten Wert in einem Tupel zu übergeben. –
Sie haben Recht, ich habe es versäumt, dies in meiner Beschreibung zu erwähnen. Deine Antwort ist die beste Antwort auf diese Frage. – RemcoW