2017-12-19 5 views
1

Ich habe einen Stream ähnliche und zwei Waschbecken, aber nur eine zu einem Zeitpunkt verwendet:Mehrere sinkt im gleichen Strom

Source.fromElements(1, 2, 3) 
.via(flow) 
.runWith(sink1) 

oder

Source.fromElements(1, 2, 3) 
.via(flow) 
.runWith(sink2) 

Es ist konfigurierbar, die wir sinken verwenden , aber was, wenn ich beide Senken parallel benutze. Wie kann ich das tun?

Ich dachte über Sink.combine, aber es erfordert auch eine Merge-Strategie und ich möchte nicht Ergebnisse dieser Senken in keiner Weise kombinieren. Ich interessiere mich nicht wirklich für sie, also möchte ich die gleichen Daten über HTTP nur an einen Endpunkt senden und gleichzeitig an die Datenbank senden. Sink-Kombination ist sehr ähnlich zu Broadcast, aber die Implementierung einer Broadcast von Grund auf verringert die Lesbarkeit meines Codes wo ich jetzt nur einfache Quelle, Fluss und eine Senke, keine Low-Level-Grafik-Stufen.

Kennen Sie einen richtigen Weg, wie man das macht (mit Gegendruck und anderen Dingen, die ich mit nur einer Senke habe)?

Antwort

4

können Sie alsoTo (siehe API docs):

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore) 
+0

Wie kann ich diese Senken parallel laufen, indem Sie eine einfache .async vor der zweiten Senke hinzufügen? Ich möchte sie parallel laufen lassen, aber immer noch einen Gegendruck haben, mit anderen Worten, ich möchte meinen Stream so schnell laufen lassen wie die Zeit in der langsamsten Senke und nicht als die Summe der Zeit in allen Senken (weil sie synchron laufen). –

3

Rundfunk GraphDSL in seiner einfachsten Form verwenden, sollten nicht die Lesbarkeit verringern - in der Tat könnte man sogar argumentieren, dass die ~> Klauseln in irgendeiner Weise die visualisieren Stream-Struktur:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val bcast = builder.add(Broadcast[Int](2)) 

    Source.fromElements(1, 2, 3) ~> flow ~> bcast.in 
    bcast.out(0) ~> sink1 
    bcast.out(1) ~> sink2 

    ClosedShape 
}) 
graph.run() 
+0

Laufen diese Senken parallel? –

+0

Akka-Streams führen standardmäßig Grafik-Verarbeitungsstufen sequenziell aus, aber wenn gewünscht, können Sie sie mit der Methode 'async' parallel ausführen. Für weitere Details, hier ist ein [Akka Stream doc] (https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html?language=scala) zum Thema. –

Verwandte Themen