Ströme Abflachung kann mit via
dergleichen verbunden werden ? Es scheint eine allgemeine Notwendigkeit für Dinge wie Option
Auspacken und Fehlerabflachung.Akka Ströme fließt über
Antwort
Es hängt wirklich davon ab, ob Sie daran interessiert sind, diese None
s herumzuhalten, oder wenn Sie sie wegwerfen wollen.
Als Sie Ihren Fluss eingegeben, wie Flow[A, Some[C], NotUsed]
scheint, Sie sind nicht interessiert an None
s überhaupt. Dies bedeutet, dass Sie sie mit collect
, z.
def aToSomeC: Flow[A, C, NotUsed] = { aToSomeB.collect{case Some(x) ⇒ x}.via(bToC) }
Wenn andernfalls Sie die None
s verfolgen müssen (oder die Left
s, wenn Sie mit Either
s zu tun hat), Sie benötigen, um Ihre „Heben“ Bühne selbst zu schreiben. Dies kann ziemlich allgemein geschrieben werden. Zum Beispiel kann es als eine Funktion geschrieben werden, die irgendeinen Fluss Flow[I, O, M]
nimmt und einen anderen Fluss zurückgibt Flow[Either[E, I], Either[E, O], M
]. Da es sich um Fan-out- und Fan-in-Phasen handelt, ist die Verwendung von GraphDSL erforderlich.
def liftEither[I, O, E, M](f: Flow[I, O, M]): Graph[FlowShape[Either[E, I], Either[E, O]], M] =
Flow.fromGraph(GraphDSL.create(f) { implicit builder: GraphDSL.Builder[M] => f =>
val fIn = builder.add(Flow[Either[E, I]])
val p = builder.add(Partition[Either[E, I]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))
val merge = builder.add(Merge[Either[E, O]](2))
val toRight = builder.add(Flow[O].map(Right(_)))
p.out(0).collect{case Left(x) ⇒ Left(x)} ~> merge
fIn.out ~> p.in
p.out(1).collect{case(Right(x)) ⇒ x} ~> f ~> toRight ~> merge
new FlowShape(fIn.in, merge.out)
})
Dies kann wie pro unten
def aToSomeB: Flow[A, Either[Throwable, B], NotUsed] = ???
def aToSomeC: Flow[A, Either[Throwable, C], NotUsed] = aToSomeB.via(liftEither(bToC))
Hinweis verwendet werden, dass Option
s leicht zu Either
s umgewandelt werden kann die gleiche Hilfsfunktion zu nutzen.
- 1. akka Ströme und -auslässe Mismatch
- 2. Elegante Art der Wiederverwendung akka-Strom fließt
- 3. Akka Ströme reduzieren, um kleineren Strom
- 4. Akka Ströme registrieren Fluss gleiche Signatur
- 5. Div fließt über den Bildschirm
- 6. Wie gruppiert Teilströme mit mapAsync in akka verbrauchen Ströme
- 7. Text fließt unter Bild
- 8. Akka Konstruktor Verdrahtung über Feder
- 9. Mein HTML-Inhalt fließt über den Container hinaus
- 10. Zwei Ströme zusammenführen
- 11. UIPickerView Inhalt fließt unerwartet
- 12. Retrieve Node-Red fließt
- 13. Wohin fließt Spark Streaming?
- 14. Senden Objekte über Ströme für mein Projekt nicht funktioniert, Sockel
- 15. Microservices OAuth2/OpenID Connect fließt
- 16. Verkettungs verschachtelte Ströme
- 17. Gemeinsame Lisp graue Ströme
- 18. Kafka Ströme mit Scala
- 19. Scala Ströme Auswertung
- 20. io-Ströme partitionEithers
- 21. file_get_contents nicht geöffnet Ströme
- 22. Ergiebige Ströme in Generatorfunktionen
- 23. Mehrere Ströme in Gulp
- 24. Wiederkehrende beobachtbare Ströme
- 25. in einem Akka Fluss
- 26. abgeleitete Strom Entsorgung enthalten Ströme
- 27. abfangen STDOUT und stream über akka
- 28. Akka ActorSelection über einen ganzen Cluser
- 29. Warum fließt dieser Überlauf nicht?
- 30. Fehler Benachrichtigung auf Mule fließt
Das funktioniert, danke. Ich bin ein bisschen überrascht, dass dafür nichts eingebaut ist; Verflacht ein Strom von Eithers oder Optionals nicht ein gemeinsames Bedürfnis? –
Ich stimme der Tatsache zu, dass es ein allgemeines Bedürfnis ist, besonders wenn Sie Ihre Stream-Stufen "referenziell transparent" halten wollen. Akka-stream ist jedoch ein Toolkit der unteren Ebene, und seine Anwendungen sind so viele, dass es schwierig wird klar zu definieren, was ein Teil davon sein soll und was nicht. –