2017-10-19 4 views
5

Sagen, ich habe eine API, die, basierend auf einigen Abfragekriterien, finden oder ein Widget erstellen:Asynchron, zusammensetzbare Rückgabewert für Vektor/Stream-Daten in Java 9

Widget getMatchingWidget(WidgetCriteria c) throws Throwable 

Die (synchron) Client-Code aussieht Jetzt

try { 
    Widget w = getMatchingWidget(criteria); 
    processWidget(w); 
} catch (Throwable t) { 
    handleError(t); 
} 

sagen Befund oder ein Widget Konstruktion ist unvorhersehbar teuer, und ich möchte nicht, Kunden, während darauf gewartet blockieren: wie. So ändern ich es:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

Clients können dann schreiben entweder:

CompletableFuture<Widget> f = getMatchingWidget(criteria); 
f.thenAccept(this::processWidget) 
f.exceptionally(t -> { handleError(t); return null; }) 

oder:

getMatchingWidget(criteria).whenComplete((t, w) -> { 
    if (t != null) { handleError(t); } 
    else { processWidget(t); } 
}); 

Nun lassen Sie uns sagen, anstatt die synchrone API 0 bis n-Widgets zurückkehren :

Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

Naiv, ich schreiben konnte:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

Dies ist jedoch nicht wirklich den Code nicht blockierend machen, drückt er einfach die Blockierung um - entweder die Future blockiert, bis alle Widgets zur Verfügung stehen, oder die Code, der über die Stream Blöcke, die auf jede Widget warten, iteriert. Was ich will, ist etwas, das ich jedes Widget verarbeiten lassen, wie sie ankommen, so etwas wie:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

Aber dies ist nicht die Fehlerbehandlung zur Verfügung stellen, und auch wenn ich eine zusätzliches Consumer<Throwable> errorHandler hinzufügen, wird es mich nicht B. meinen Widget-Abruf mit anderen Abfragen zusammenstellen oder die Ergebnisse transformieren.

Also ich bin auf der Suche nach etwas zusammensetzbare Sache, die die Eigenschaften eines Stream (Iterability, Transformability) mit den Eigenschaften eines CompletableFuture (asynchrone Ergebnis und Fehlerbehandlung) kombiniert. (Und während wir dabei sind, könnte der Gegendruck gut sein.)

Ist dies ein java.util.concurrent.Flow.Publisher? Ein io.reactivex.Observable? Etwas komplizierter? Etwas einfacher?

+2

Ich würde Dinge in eine Warteschlange schieben und Dinge aus der Warteschlange ziehen. –

Antwort

6

Ihr Anwendungsfall fällt sehr natürlich in die Welt, die RxJava anspricht. Wenn wir einen beobachtbaren:

Observable<Widget> getMatchingWidgets(wc); 

, die Widgets null oder mehr auf der Grundlage der Kriterien erzeugt, dann können Sie jedes Widget verarbeiten, wie es mit angezeigt:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

Die beobachtbaren Kette wird auf die backgroundScheduler laufen Dies ist oft ein Wrapper für einen Thread-Pool-Executor-Service.Wenn Sie die endgültige Verarbeitung jedes Widget in Ihrer Benutzeroberfläche tun müssen, können Sie den observeOn() Betreiber zu wechseln, um den UI-Scheduler vor der Verarbeitung verwenden:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .observeOn(uiScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

Für mich ist die Eleganz des RxJava Ansatzes ist, dass es handhabt so viele der Schrauben und Muttern des Pipeline-Managements in einer fließenden Art und Weise. Wenn Sie diese Beobachterkette betrachten, wissen Sie genau, was passiert und wo.