2016-11-05 1 views
6

ich die folgenden einfachen Fall Klassenhierarchie haben:Akka Streams aufgeteilt Strom nach Typ

sealed trait Message 
case class Foo(bar: Int) extends Message 
case class Baz(qux: String) extends Message 

Und ich habe ein Flow[Message, Message, NotUsed] (von einem Websocket-basiertes Protokoll mit Codec bereits vorhanden).

Ich möchte diese Flow[Message] in separate Flüsse für Foo und Baz Typen demultiplexen, da diese auf völlig verschiedenen Wegen verarbeitet werden.

Was ist der einfachste Weg? Sollte offensichtlich sein, aber ich vermisse etwas ...

Antwort

5

Eine Möglichkeit ist die Erstellung eines RunnableGraph, das die Flüsse für jeden Nachrichtentyp enthält.

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 

    val in = Source(...) // Some message source 
    val out = Sink.ignore 

    val foo = builder.add(Flow[Message].map (x => x match { case [email protected](_) => f })) 
    val baz = builder.add(Flow[Message].map (x => x match { case [email protected](_) => b })) 
    val partition = builder.add(Partition[Message](2, { 
    case Foo(_) => 0 
    case Baz(_) => 1 
    })) 

    partition ~> foo ~> // other Flow[Foo] here ~> out 
    partition ~> baz ~> // other Flow[Baz] here ~> out 

    ClosedShape 
} 

g.run() 
+0

Richtig, Partition. OK, ich könnte genau das tun. Wahrscheinlich würde es einen integrierten Kombinator dafür geben; vielleicht werde ich eine Pull-Anfrage machen. –

+0

@AlexanderTemerev Dies könnte von Interesse sein: http://doc.akka.io/api/akka/2.4/?_ga=1.34091558.643806930.1478315511#akka.stream.scaladsl.Partition – Brian

Verwandte Themen