2016-07-18 4 views
2

Mein Stream hat einen Flow, dessen Ausgaben List [Any] -Objekte sind. Ich möchte eine mapAsync gefolgt von einigen anderen Phasen haben, von denen jede ein einzelnes Element anstelle der Liste verarbeitet. Wie kann ich das machen?Akka-Stream - Liste zu mapAsync einzelner Elemente

Effektiv möchte ich die Ausgabe von

Flow[Any].map { msg => 
    someListDerivedFrom(msg) 
} 

durch verbraucht werden verbinden -

Flow[Any].mapAsyncUnordered(4) { listElement => 
    actorRef ? listElement 
}.someOtherStuff 

Wie kann ich das tun?

Antwort

4

Ich denke, der Kombinator, den Sie suchen, ist mapConcat. Dieser Kombinator nimmt ein Eingabeargument und gibt etwas zurück, das eine Iterable ist. Ein einfaches Beispiel wäre wie folgt:

implicit val system = ActorSystem() 
implicit val mater = ActorMaterializer() 

val source = Source(List(List(1,2,3), List(4,5,6))) 
val sink = Sink.foreach[Int](println) 

val graph = 
    source. 
    mapConcat(identity). 
    to(sink) 
graph.run 

Hier, mein Source ist List Elemente auszuspucken, und mein Sink nimmt den zugrunde liegenden Typ von dem, was in diesen List s ist. Ich kann sie nicht direkt miteinander verbinden, da die Typen unterschiedlich sind. Aber wenn ich zwischen ihnen anwende, können sie angeschlossen werden, da dieser Kombinator die List Elemente abflacht und ihre einzelnen Elemente (Int) stromabwärts sendet. Da das Eingabeelement bereits ein Iterable ist, müssen Sie nur die identify-Funktion im Körper von verwenden, damit die Dinge funktionieren.

+0

Danke. Ich habe mapConcat ein bisschen anders versucht und das hat nicht funktioniert. Was genau bedeutet "Identität"? – anindyaju99

+0

_identity_ ist in _scala.Predef_ definiert und sieht genau so aus, wie es aussieht: eine Funktion, die etwas in sich verwandelt. Sehr nützlich für die Weitergabe als die von _map_, _flatMap_ usw. benötigte Funktion – Phasmid

Verwandte Themen