Sagen, ich habe eine API, die, basierend auf einigen Abfragekriterien, finden oder ein Widget erstellen:Asynchron, zusammensetzbare Rückgabewert für Vektor/Stream-Daten in Java 9
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
Die (synchron) Client-Code aussieht Jetzt
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
sagen Befund oder ein Widget Konstruktion ist unvorhersehbar teuer, und ich möchte nicht, Kunden, während darauf gewartet blockieren: wie. So ändern ich es:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
Clients können dann schreiben entweder:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
oder:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
Nun lassen Sie uns sagen, anstatt die synchrone API 0 bis n-Widgets zurückkehren :
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
Naiv, ich schreiben konnte:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
Dies ist jedoch nicht wirklich den Code nicht blockierend machen, drückt er einfach die Blockierung um - entweder die Future
blockiert, bis alle Widgets
zur Verfügung stehen, oder die Code, der über die Stream
Blöcke, die auf jede Widget
warten, iteriert. Was ich will, ist etwas, das ich jedes Widget verarbeiten lassen, wie sie ankommen, so etwas wie:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
Aber dies ist nicht die Fehlerbehandlung zur Verfügung stellen, und auch wenn ich eine zusätzliches Consumer<Throwable> errorHandler
hinzufügen, wird es mich nicht B. meinen Widget-Abruf mit anderen Abfragen zusammenstellen oder die Ergebnisse transformieren.
Also ich bin auf der Suche nach etwas zusammensetzbare Sache, die die Eigenschaften eines Stream
(Iterability, Transformability) mit den Eigenschaften eines CompletableFuture
(asynchrone Ergebnis und Fehlerbehandlung) kombiniert. (Und während wir dabei sind, könnte der Gegendruck gut sein.)
Ist dies ein java.util.concurrent.Flow.Publisher? Ein io.reactivex.Observable? Etwas komplizierter? Etwas einfacher?
Ich würde Dinge in eine Warteschlange schieben und Dinge aus der Warteschlange ziehen. –