2016-02-02 5 views
14

Kann mir bitte jemand erklären mir Unterschied zwischen map und mapAsync w.r.t AKKA Stream? In the documentation wird gesagt, dassUnterschied zwischen map und mapAsync

Stream-Transformationen und Nebenwirkungen mit externem Nicht-Stream basierte Diensten mit mapAsync durchgeführt werden kann oder mapAsyncUnordered

Warum können wir uns einfach hier Karte? Ich nehme an, dass Flow, Source, Sink alles von Natur aus monadisch sein würden und somit sollte die Karte gut funktionieren, wenn die Verzögerung in der Natur von ihnen liegt.

Antwort

31

Signatur

Der Unterschied ist am besten in der signatures hervorgehoben: Flow.map in einer Funktion verwendet, die einen Typ T kehrt während Flow.mapAsync in einer Funktion übernimmt, die Future[T] einen Typ zurückgibt.

Praxisbeispiel

Als Beispiel sei angenommen, dass wir eine Funktion haben, die eine Datenbank vollständigen Namen eines Benutzers auf einer Benutzer-ID basiert fragt:

type UserID = String 
type FullName = String 

val databaseLookup : UserID => FullName = ??? //implementation unimportant 

Bei einer Source von UserID Werte Wir könnten einfach Flow.map in einem Stream verwenden, um die Datenbank abzufragen und die vollständigen Namen auf die Konsole zu drucken:

val userIDSource : Source[UserID, _] = ??? 

val stream = 
    userIDSource.via(Flow[UserID].map(databaseLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

Eine Einschränkung dieser Implementierung besteht darin, dass dieser Stream nur 1 db-Abfrage gleichzeitig erstellt. Dies wird ein "Engpass" sein und wahrscheinlich maximalen Durchsatz in unserem Stream verhindern. Um die Leistung zu verbessern, konnten wir einfach Gleichzeitigkeit hinzufügen, indem databaseLookup innerhalb eines Future Verpackung:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
    Future { databaseLookup(userID) } 

val concurrentStream = 
    userIDSource.via(Flow[UserID].map(concurrentDBLookup)) 
       .to(Sink.foreach[Future[FullName]](_ foreach println)) 
       .run() 

Das Problem mit dieser simplen Gleichzeitigkeit Addendum ist, dass wir Gegendruck wirksam eliminiert. Da der Sink nur die Zukunft zieht und eine foreach println hinzufügt, die im Vergleich zu Datenbankabfragen relativ schnell ist, wird der Stream die Nachfrage kontinuierlich an die Quelle weiterleiten und mehr Futures spawnen. Dies bedeutet, dass die Anzahl von databaseLookup, die gleichzeitig ausgeführt wird, unbegrenzt ist, was schließlich die Datenbank überschwemmen könnte.

Flow.mapAsync zur Rettung; wir können gleichzeitig db-Lookups haben, während gleichzeitig die Anzahl der gleichzeitig Lookups Capping:

val maxLookupCount = 10 

val maxLookupConcurrentStream = 
    userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup)) 
       .to(Sink.foreach[FullName](println)) 
       .run() 

Beachten Sie auch, dass die Sink.foreach bekam einfacher, es nimmt nicht mehr in einem Future[FullName] sondern nur ein FullName statt.

Ungeordnete Async Karte

Wenn Sie kümmern sich nicht Ordnung der UserIDs auf vollen Namen über die Aufrechterhaltung Sie Flow.mapAsyncUnordered verwenden können. Dies wäre hilfreich, wenn Sie nur die vollständigen Namen drucken würden, aber nicht wissen, in welcher Reihenfolge sie auf der Konsole ankommen.

+0

Ist 'mapAsync' ähnlich dem Anwenden einer asynchronen Grenze auf eine bestimmte Stufe? Gemäß der Dokumentation wird das Markieren einer asynchronen Grenze jede Stufe in einem Akteur durchlaufen und sich nur fragen, ob es gleich ist. – jarvis11

Verwandte Themen