Ich bin auf der Suche nach einer Möglichkeit, akka-Stream-Ströme problemlos wieder zu verwenden.Elegante Art der Wiederverwendung akka-Strom fließt
Ich behandle die Flow I als Funktion wieder verwenden möchten, so würde ich gerne seine Unterschrift halten wie:
Flow[Input, Output, NotUsed]
Nun, wenn ich diesen Strom nutzen möchte ich in der Lage sein möchten ‚Call 'diesen Fluss und halten Sie das Ergebnis für die weitere Verarbeitung beiseite.
Also ich möchte mit Flow emiting [Input]
beginnen, meine Strömung anwenden, und weiter mit Strömung emittieren [(Input, Output)]
.
Beispiel:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
Nun ist dies auf einfache Art und Weise nicht möglich, da mit .via()
Flow kombiniert ich nur [Output]
emittierende
val via: Source[String, NotUsed] = s.via(stringIfEven)
Alternative ist meine wieder verwendbaren Strömungs [(Input, Output)]
emittieren zu machen aber das erfordert, dass jeder Flow seinen Input durch alle Stufen treibt und meinen Code schlecht aussehen lässt.
So kam ich mit einem Kombinierer so zusammen:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
}
, die den Eingang zu dem Fluss senden und auch in einer parallelen Linie direkt -> sowohl auf die ‚Zip‘ Stufe, in der ich Werte zu einem Tupel zusammenfüge. Es kann dann elegant angewendet werden:
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
Alles toll, aber wenn man ihnen Strömung einen ‚Filter‘ Betrieb zu tun - diese Kombinierer klemmt und hält weitere Ereignisse zu verarbeiten.
Ich denke, das ist wegen 'Zip' Verhalten, das alle Subflows dasselbe tun müssen - in meinem Fall wird ein gegebenes Objekt direkt von einem Zweig übergeben, so dass ein anderer Unterfluss dieses Element nicht ignorieren kann. filter(), und da es tut - der Fluss stoppt, weil Zip auf Push wartet.
Gibt es eine bessere Möglichkeit, die Fließzusammensetzung zu erreichen? Kann ich irgendetwas in meinem tupledFlow tun, um das gewünschte Verhalten zu erhalten, wenn 'flow' Elemente mit 'filter' ignoriert?
Das Hauptproblem des hier Konzepts ist, dass 'Durchfluss [T, U, ...]' keine Funktion ist. Für jedes Eingabeelement können 0, 1 oder mehr Ausgabeelemente zurückgegeben werden. Es kann sogar Eingabeelemente zurückhalten und sie erst später verwenden, wenn mehr Daten verfügbar sind. Aus diesem Grund ist es nicht möglich, dieses Merkmal generisch bereitzustellen, wenn der umhüllte Fluss es selbst nicht unterstützt. Es kann generisch arbeiten, wenn es strikt durchgesetzt wird, dass der umhüllte 'Flow' ein Eins-zu-Eins-Fluss ist, der tatsächlich wie eine Funktion funktioniert (aber der Filter funktioniert dann nicht). Normalerweise ist die Verwendung von 'mapAsync' in solchen Fällen ein einfacher Weg. – jrudolph
ja, du hast Recht. Problem würde auftreten, wenn mein wiederverwendbarer Fluss N Elemente zurückgeben würde. Die Annahme, dass der umhüllte "Flow" 0 oder 1 Element für jedes 1 Eingabeelement ausgeben könnte, würde es erlauben, einen anderen semantischen 'Zip'-Operator zu schreiben, der nur dann gepackt würde, wenn' Flow' Ausgaben ausgibt und alles überspringt Flow' drückt kein Element. –
Sogar das wäre schwer zu tun, weil das Ziehen und Drücken der umhüllten Strömung nicht synchron erfolgt. Sie können nicht überprüfen, ob "der umhüllte' Flow kein Element drückt "- er könnte langsam oder gepuffert sein usw. – jrudolph