2016-12-28 6 views
6

Ich bin auf der Suche nach einer Möglichkeit, akka-Stream-Ströme problemlos wieder zu verwenden.Elegante Art der Wiederverwendung akka-Strom fließt

Ich behandle die Flow I als Funktion wieder verwenden möchten, so würde ich gerne seine Unterschrift halten wie:

Flow[Input, Output, NotUsed]

Nun, wenn ich diesen Strom nutzen möchte ich in der Lage sein möchten ‚Call 'diesen Fluss und halten Sie das Ergebnis für die weitere Verarbeitung beiseite.

Also ich möchte mit Flow emiting [Input] beginnen, meine Strömung anwenden, und weiter mit Strömung emittieren [(Input, Output)].

Beispiel:

val s: Source[Int, NotUsed] = Source(1 to 10) 

val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString) 

val via: Source[(Int, String), NotUsed] = ??? 

Nun ist dies auf einfache Art und Weise nicht möglich, da mit .via() Flow kombiniert ich nur [Output] emittierende

würde ein strömungs
val via: Source[String, NotUsed] = s.via(stringIfEven) 

Alternative ist meine wieder verwendbaren Strömungs [(Input, Output)] emittieren zu machen aber das erfordert, dass jeder Flow seinen Input durch alle Stufen treibt und meinen Code schlecht aussehen lässt.

So kam ich mit einem Kombinierer so zusammen:

def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = { 
    Flow.fromGraph(GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val broadcast = b.add(Broadcast[In](2)) 
    val zip = b.add(Zip[In, Out]) 

    broadcast.out(0) ~> zip.in0 
    broadcast.out(1) ~> flow ~> zip.in1 

    FlowShape(broadcast.in, zip.out) 
}) 

}

, die den Eingang zu dem Fluss senden und auch in einer parallelen Linie direkt -> sowohl auf die ‚Zip‘ Stufe, in der ich Werte zu einem Tupel zusammenfüge. Es kann dann elegant angewendet werden:

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven)) 

Alles toll, aber wenn man ihnen Strömung einen ‚Filter‘ Betrieb zu tun - diese Kombinierer klemmt und hält weitere Ereignisse zu verarbeiten.

Ich denke, das ist wegen 'Zip' Verhalten, das alle Subflows dasselbe tun müssen - in meinem Fall wird ein gegebenes Objekt direkt von einem Zweig übergeben, so dass ein anderer Unterfluss dieses Element nicht ignorieren kann. filter(), und da es tut - der Fluss stoppt, weil Zip auf Push wartet.

Gibt es eine bessere Möglichkeit, die Fließzusammensetzung zu erreichen? Kann ich irgendetwas in meinem tupledFlow tun, um das gewünschte Verhalten zu erhalten, wenn 'flow' Elemente mit 'filter' ignoriert?

+0

Das Hauptproblem des hier Konzepts ist, dass 'Durchfluss [T, U, ...]' keine Funktion ist. Für jedes Eingabeelement können 0, 1 oder mehr Ausgabeelemente zurückgegeben werden. Es kann sogar Eingabeelemente zurückhalten und sie erst später verwenden, wenn mehr Daten verfügbar sind. Aus diesem Grund ist es nicht möglich, dieses Merkmal generisch bereitzustellen, wenn der umhüllte Fluss es selbst nicht unterstützt. Es kann generisch arbeiten, wenn es strikt durchgesetzt wird, dass der umhüllte 'Flow' ein Eins-zu-Eins-Fluss ist, der tatsächlich wie eine Funktion funktioniert (aber der Filter funktioniert dann nicht). Normalerweise ist die Verwendung von 'mapAsync' in solchen Fällen ein einfacher Weg. – jrudolph

+0

ja, du hast Recht. Problem würde auftreten, wenn mein wiederverwendbarer Fluss N Elemente zurückgeben würde. Die Annahme, dass der umhüllte "Flow" 0 oder 1 Element für jedes 1 Eingabeelement ausgeben könnte, würde es erlauben, einen anderen semantischen 'Zip'-Operator zu schreiben, der nur dann gepackt würde, wenn' Flow' Ausgaben ausgibt und alles überspringt Flow' drückt kein Element. –

+0

Sogar das wäre schwer zu tun, weil das Ziehen und Drücken der umhüllten Strömung nicht synchron erfolgt. Sie können nicht überprüfen, ob "der umhüllte' Flow kein Element drückt "- er könnte langsam oder gepuffert sein usw. – jrudolph

Antwort

3

Zwei mögliche Ansätze - mit strittig Eleganz - sind:

1) vermeiden Filterstufen verwenden, die Filter in ein Flow[Int, Option[Int], NotUsed] mutiert. Auf diese Weise können Sie Ihren Zipping-Wrapper wie Ihr ursprünglicher Plan auf den gesamten Graphen anwenden. Der Code sieht jedoch etwas verdorbener aus, und es wird ein zusätzlicher Aufwand durch das Umgehen von None s verursacht.

2) Trennen Sie die Filter- und Transformationsstufen und wenden Sie die Filter vor dem Zipping-Wrapper an. Wahrscheinlich ein leichter und besserer Kompromiss.

val filterEven = Flow[Int].filter(_ % 2 == 0) 

val toString = Flow[Int].map(_.toString) 

val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString)) 

EDIT

3) für eine andere Lösung hier Posting Klarheit, nach den Diskussionen in den Kommentaren.

Dieser Flow-Wrapper ermöglicht das Emittieren jedes Elements aus einem bestimmten Fluss, gepaart mit dem ursprünglichen Eingabeelement, das es generiert hat. Es funktioniert für jede Art von innerem Fluss (emittiert 0, 1 oder mehr Elemente für jeden Eingang).

def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] = 
    Flow[In].flatMapConcat(in => Source.single(in).via(flow).map(out => in -> out)) 
+0

Ja, Option 0) ist etwas, das von allen wesentlich weniger schädlich ist. Es ist jedoch immer gefährlich, wenn Sie eine API haben, die zugänglich ist, aber * nicht verwendet werden sollte, wenn sie in einem 'TupledFlow'-Wrapper aufgerufen wird. Nicht, was wir von einem zusammensetzbaren Code erwarten würden. –

+0

Ich denke, dass die ultimative Lösung ein dedizierter Operator ist, der sich so verhalten würde, wie ich es in http://stackoverflow.com/questions/41366030/elegant-way-of-reusing-akka-stream-flows#comment69952575_41366030 –

+0

vereinbart habe starke Annahmen, dieser TupledFlow ist definitiv nicht etwas, an dem ich teilhaben würde - z - Eine eigenständige Bibliothek. Aber es wäre immer noch sinnvoll als intern wiederverwendbare Grafikbühne in Ihrem Projekt. –

1

kam ich mit einer Implementierung von TupledFlow up, das, wenn eingewickelt arbeitet Flow verwendet filter() oder mapAsync() und wenn gewickelte Flow 0,1 oder N Elemente für jeden Eingang aussendet:

def tupledFlow[In,Out](flow: Flow[In, Out, _])(implicit materializer: Materializer, executionContext: ExecutionContext):Flow[In, (In,Out), NotUsed] = { 
    val v:Flow[In, Seq[(In, Out)], NotUsed] = Flow[In].mapAsync(4) { in: In => 
    val outFuture: Future[Seq[Out]] = Source.single(in).via(flow).runWith(Sink.seq) 
    val bothFuture: Future[Seq[(In,Out)]] = outFuture.map(seqOfOut => seqOfOut.map((in,_))) 
    bothFuture 
    } 
    val onlyDefined: Flow[In, (In, Out), NotUsed] = v.mapConcat[(In, Out)](seq => seq.to[scala.collection.immutable.Iterable]) 
    onlyDefined 
} 

den einzigen Nachteil ich Sehen Sie hier, dass ich einen Fluss für eine einzelne Entität instanziiere und materialisiere - nur um eine Vorstellung davon zu bekommen, einen Fluss als eine Funktion zu bezeichnen.

Ich habe keine Leistungstests auf diese - aber seit Heavy-Lifting in einem eingewickelten Flow durchgeführt wird, die in einer Zukunft ausgeführt wird - ich glaube, das wird in Ordnung sein.

Diese Implementierung übergibt alle Tests von https://gist.github.com/kretes/8d5f2925de55b2a274148b69f79e55ac#file-tupledflowspec-scala

+2

Wenn Sie das suchen, könnten Sie wahrscheinlich mit 'def TupledFlow [In, Out] (flow : Fluss [In, Out, _]): Fluss [In, (In, Out), NotUsed] = { Fluss [In] .flatMapConcat (in => Quelle.single (in) .via (Fluss) .map (out => in -> out)) } ' –

+0

Ja, ich denke, das ist es, wonach ich bin. Es erfüllt alle Bedürfnisse und es wird jedes Verhalten von "Flow" korrekt verarbeiten. –

+0

Ihre Implementierung ist prägnant und minimal. Dank dafür. Ich denke, das ist die richtige Antwort für meine Frage –

Verwandte Themen