Signatur
Der Unterschied ist am besten in der signatures hervorgehoben: Flow.map
in einer Funktion verwendet, die einen Typ T
kehrt während Flow.mapAsync
in einer Funktion übernimmt, die Future[T]
einen Typ zurückgibt.
Praxisbeispiel
Als Beispiel sei angenommen, dass wir eine Funktion haben, die eine Datenbank vollständigen Namen eines Benutzers auf einer Benutzer-ID basiert fragt:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
Bei einer Source
von UserID
Werte Wir könnten einfach Flow.map
in einem Stream verwenden, um die Datenbank abzufragen und die vollständigen Namen auf die Konsole zu drucken:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
Eine Einschränkung dieser Implementierung besteht darin, dass dieser Stream nur 1 db-Abfrage gleichzeitig erstellt. Dies wird ein "Engpass" sein und wahrscheinlich maximalen Durchsatz in unserem Stream verhindern. Um die Leistung zu verbessern, konnten wir einfach Gleichzeitigkeit hinzufügen, indem databaseLookup
innerhalb eines Future
Verpackung:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
Das Problem mit dieser simplen Gleichzeitigkeit Addendum ist, dass wir Gegendruck wirksam eliminiert. Da der Sink nur die Zukunft zieht und eine foreach println
hinzufügt, die im Vergleich zu Datenbankabfragen relativ schnell ist, wird der Stream die Nachfrage kontinuierlich an die Quelle weiterleiten und mehr Futures spawnen. Dies bedeutet, dass die Anzahl von databaseLookup
, die gleichzeitig ausgeführt wird, unbegrenzt ist, was schließlich die Datenbank überschwemmen könnte.
Flow.mapAsync
zur Rettung; wir können gleichzeitig db-Lookups haben, während gleichzeitig die Anzahl der gleichzeitig Lookups Capping:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
Beachten Sie auch, dass die Sink.foreach
bekam einfacher, es nimmt nicht mehr in einem Future[FullName]
sondern nur ein FullName
statt.
Ungeordnete Async Karte
Wenn Sie kümmern sich nicht Ordnung der UserIDs auf vollen Namen über die Aufrechterhaltung Sie Flow.mapAsyncUnordered
verwenden können. Dies wäre hilfreich, wenn Sie nur die vollständigen Namen drucken würden, aber nicht wissen, in welcher Reihenfolge sie auf der Konsole ankommen.
Ist 'mapAsync' ähnlich dem Anwenden einer asynchronen Grenze auf eine bestimmte Stufe? Gemäß der Dokumentation wird das Markieren einer asynchronen Grenze jede Stufe in einem Akteur durchlaufen und sich nur fragen, ob es gleich ist. – jarvis11