2017-09-10 8 views

Antwort

1

Es hängt wirklich davon ab, ob Sie daran interessiert sind, diese None s herumzuhalten, oder wenn Sie sie wegwerfen wollen.

Als Sie Ihren Fluss eingegeben, wie Flow[A, Some[C], NotUsed] scheint, Sie sind nicht interessiert an None s überhaupt. Dies bedeutet, dass Sie sie mit collect, z.

def aToSomeC: Flow[A, C, NotUsed] = { aToSomeB.collect{case Some(x) ⇒ x}.via(bToC) } 

Wenn andernfalls Sie die None s verfolgen müssen (oder die Left s, wenn Sie mit Either s zu tun hat), Sie benötigen, um Ihre „Heben“ Bühne selbst zu schreiben. Dies kann ziemlich allgemein geschrieben werden. Zum Beispiel kann es als eine Funktion geschrieben werden, die irgendeinen Fluss Flow[I, O, M] nimmt und einen anderen Fluss zurückgibt Flow[Either[E, I], Either[E, O], M]. Da es sich um Fan-out- und Fan-in-Phasen handelt, ist die Verwendung von GraphDSL erforderlich.

def liftEither[I, O, E, M](f: Flow[I, O, M]): Graph[FlowShape[Either[E, I], Either[E, O]], M] = 
    Flow.fromGraph(GraphDSL.create(f) { implicit builder: GraphDSL.Builder[M] => f => 

     val fIn  = builder.add(Flow[Either[E, I]]) 
     val p  = builder.add(Partition[Either[E, I]](2, _.fold(_ ⇒ 0, _ ⇒ 1))) 
     val merge = builder.add(Merge[Either[E, O]](2)) 
     val toRight = builder.add(Flow[O].map(Right(_))) 

       p.out(0).collect{case Left(x) ⇒ Left(x)}    ~> merge 
     fIn.out ~> p.in 
       p.out(1).collect{case(Right(x)) ⇒ x} ~> f ~> toRight ~> merge 

     new FlowShape(fIn.in, merge.out) 
    }) 

Dies kann wie pro unten

def aToSomeB: Flow[A, Either[Throwable, B], NotUsed] = ??? 
    def aToSomeC: Flow[A, Either[Throwable, C], NotUsed] = aToSomeB.via(liftEither(bToC)) 

Hinweis verwendet werden, dass Option s leicht zu Either s umgewandelt werden kann die gleiche Hilfsfunktion zu nutzen.

+0

Das funktioniert, danke. Ich bin ein bisschen überrascht, dass dafür nichts eingebaut ist; Verflacht ein Strom von Eithers oder Optionals nicht ein gemeinsames Bedürfnis? –

+0

Ich stimme der Tatsache zu, dass es ein allgemeines Bedürfnis ist, besonders wenn Sie Ihre Stream-Stufen "referenziell transparent" halten wollen. Akka-stream ist jedoch ein Toolkit der unteren Ebene, und seine Anwendungen sind so viele, dass es schwierig wird klar zu definieren, was ein Teil davon sein soll und was nicht. –