2016-12-08 1 views
3

Ich habe eine SourceQueue. Wenn ich ein Element dazu anbiete, möchte ich, dass es durch die Stream geht und wenn es die Sink erreicht, wird die Ausgabe an den Code zurückgegeben, der dieses Element angeboten hat (ähnlich wie gibt ein Element an den Aufruf zurück).Akka Stream Rückgabeobjekt von Sink

Wie erreiche ich das? Ein einfaches Beispiel für mein Problem wäre:

val source = Source.queue[String](100, OverflowStrategy.fail) 
val flow = Flow[String].map(element => s"Modified $element") 
val sink = Sink.ReturnTheStringSomehow 
val graph = source.via(flow).to(sink).run() 

val x = graph.offer("foo") 
println(x) // Output should be "Modified foo" 
val y = graph.offer("bar") 
println(y) // Output should be "Modified bar" 
val z = graph.offer("baz") 
println(z) // Output should be "Modified baz" 

Edit: in dieser Frage Für das Beispiel I Vladimir Matveev die beste Antwort zur Verfügung gestellt gegeben haben. Es sollte jedoch beachtet werden, dass diese Lösung nur funktioniert, wenn die Elemente in der gleichen Reihenfolge in die sink gehen, die sie der source angeboten wurden. Wenn dies nicht garantiert werden kann, kann die Reihenfolge der Elemente in der sink abweichen und das Ergebnis könnte sich von dem, was erwartet wird, unterscheiden.

Antwort

5

Ich glaube, es ist einfacher, das bereits vorhandene Grundelement zum Ziehen von Werten aus einem Strom, genannt Sink.queue. Hier ein Beispiel:

val source = Source.queue[String](128, OverflowStrategy.fail) 
val flow = Flow[String].map(element => s"Modified $element") 
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(1, 1)) 

val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run() 

def getNext: String = Await.result(sinkQueue.pull(), 1.second).get 

sourceQueue.offer("foo") 
println(getNext) 

sourceQueue.offer("bar") 
println(getNext) 

sourceQueue.offer("baz") 
println(getNext) 

Es macht genau das, was Sie wollen.

Beachten Sie, dass das Festlegen des inputBuffer-Attributs für die Warteschlangensenke für Ihren Anwendungsfall wichtig sein kann oder nicht - wenn Sie es nicht festlegen, wird der Puffer null sein und die Daten werden nicht durch den Stream fließen bis Sie die Methode pull() auf der Senke aufrufen.

sinkQueue.pull() ergibt eine Future[Option[T]], die erfolgreich mit Some abgeschlossen wird, wenn die Senke ein Element oder mit einem Fehler empfängt, wenn der Strom ausfällt. Wenn der Stream normal beendet wird, wird er mit None abgeschlossen. In diesem speziellen Beispiel ignoriere ich dies mit Option.get, aber Sie würden wahrscheinlich benutzerdefinierte Logik hinzufügen, um diesen Fall zu behandeln.

+0

Das ist cool und es funktioniert gut mit meinem Beispiel. In meinem eigentlichen Code benutze ich einen 'Stream', um' HttpRequests' zu behandeln. Der 'flow' verzweigt sich in mehrere Teilströme und wird wieder zusammengeführt. Einige der Teilströme sind schneller als andere, und wenn ich den "Sink" als "Warteschlange" benutze, denke ich, dass ich nicht garantieren kann, dass eine Anfrage die richtige Antwort liefert. – RemcoW

+0

Nun, gemäß Ihren Beispielen und der Beschreibung möchten Sie einen Wert an die Quellwarteschlange senden und ihn anschließend synchron aus der Senke zurückholen. Solange Sie dem Muster "Push the input - pull the output" folgen, können Sie sicher sein, dass Sie die Antworten in der richtigen Reihenfolge verarbeiten. Aber selbst wenn Ihr Zugriffsmuster anders ist (und es wäre nett, wenn es in der Frage reflektiert würde), besteht der einfachste Weg, die Anfrage mit der Antwort zu korrelieren, darin, die Anfrage zusammen mit dem transformierten Wert in einem Tupel zu übergeben. –

+0

Sie haben Recht, ich habe es versäumt, dies in meiner Beschreibung zu erwähnen. Deine Antwort ist die beste Antwort auf diese Frage. – RemcoW

1

Nun, wissen Sie, was offer() Methode gibt, wenn Sie einen Blick auf seine Definition nehmen :) Was können Sie tun, ist Source.queue[(Promise[String], String)] zu erstellen, erstellen Hilfefunktion, die Paar schiebt über offer zu streamen, stellen Sie sicher, offer nicht versagt, weil Die Warteschlange ist möglicherweise voll. Vervollständigen Sie dann das Versprechen in Ihrem Stream und verwenden Sie die Zukunft des Versprechens, um das Abschlussereignis im externen Code abzufangen.

Ich tun, um die Rate zu externen API von mehreren Orten meines Projekts verwendet zu drosseln.

Hier ist, wie es in meinem Projekt sah vor Typesafe Hub-Quellen hinzugefügt

import scala.concurrent.Promise 
import scala.concurrent.Future 
import java.util.concurrent.ConcurrentLinkedDeque 

import akka.stream.scaladsl.{Keep, Sink, Source} 
import akka.stream.{OverflowStrategy, QueueOfferResult} 

import scala.util.Success 

private val queue = Source.queue[(Promise[String], String)](100, OverflowStrategy.backpressure) 
    .toMat(Sink.foreach({ case (p, param) => 
     p.complete(Success(param.reverse)) 
    }))(Keep.left) 
    .run 

private val futureDeque = new ConcurrentLinkedDeque[Future[String]]() 

private def sendQueuedRequest(request: String): Future[String] = { 

    val p = Promise[String] 

    val offerFuture = queue.offer(p -> request) 

    def addToQueue(future: Future[String]): Future[String] = { 
    futureDeque.addLast(future) 
    future.onComplete(_ => futureDeque.remove(future)) 
    future 
    } 

    offerFuture.flatMap { 
    case QueueOfferResult.Enqueued => 
     addToQueue(p.future) 
    }.recoverWith { 
    case ex => 
     val first = futureDeque.pollFirst() 
     if (first != null) 
     addToQueue(first.flatMap(_ => sendQueuedRequest(request))) 
     else 
     sendQueuedRequest(request) 
    } 
} 

ich Akka erkennen, dass synchronisierte Warteschlange blockiert Engpass sein kann und unbegrenzt wachsen, aber da API-Aufrufe in meinem Projekt nur sind aus andere Akka-Streams, die Backpressured sind, habe ich nie mehr als Dutzend Artikel in futureDeque. Ihre Situation kann sich unterscheiden.

Wenn Sie stattdessen MergeHub.source[(Promise[String], String)]() erstellen, erhalten Sie eine wiederverwendbare Spüle. Auf diese Weise erstellen Sie jedes Mal, wenn Sie ein Objekt bearbeiten müssen, ein vollständiges Diagramm und führen es aus. In diesem Fall brauchen Sie keinen Hacky-Java-Container, um Anfragen zu warten.

+0

Das ist brilliant. Ich hatte nicht daran gedacht, ein 'Versprechen' zu verwenden. Können Sie die Hilfsfunktion näher erläutern?Ich verstehe nicht, was du damit meinst. – RemcoW

+0

@RemcoW Aktualisiert. – expert

+0

Süße, danke dafür – RemcoW