2017-09-30 4 views
5

Wieder vergleiche ich RxJava mit Java 9 Flow. Ich sehe, dass Flow standardmäßig asynchron ist, und ich frage mich, ob es eine Möglichkeit gibt, es synchron laufen zu lassen.Run Flow im Hauptthread

Manchmal wollen wir es nicht nur für Nio, sondern auch für Zuckersyntax verwenden und haben einen homogeneren Code.

In RxJava ist es standardmäßig synchron, und Sie können es asynchron mit observerOn und subscribeOn in Ihrer Pipeline laufen lassen.

Es gibt einen Operator in Flow, damit er im Hauptthread ausgeführt wird.

Grüße.

Antwort

5

Sie können Ihre benutzerdefinierte Publisher wie in Flow für die Verwendung einer synchronen Ausführung definiert definieren.

Ein sehr einfacher Herausgeber, der nur einen einzigen TRUE-Artikel an einen einzelnen Abonnenten ausgibt (wenn angefordert). Da der Abonnent nur ein einzelnes Element empfängt, verwendet diese Klasse keine Puffer- und Sortiersteuerung.

class OneShotPublisher implements Publisher<Boolean> { 
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based 
    private boolean subscribed; // true after first subscribe 
    public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { 
    if (subscribed) 
     subscriber.onError(new IllegalStateException()); // only one allowed 
    else { 
     subscribed = true; 
     subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); 
    } 
    } 
    static class OneShotSubscription implements Subscription { 
    private final Subscriber<? super Boolean> subscriber; 
    private final ExecutorService executor; 
    private Future<?> future; // to allow cancellation 
    private boolean completed; 
    OneShotSubscription(Subscriber<? super Boolean> subscriber, 
         ExecutorService executor) { 
     this.subscriber = subscriber; 
     this.executor = executor; 
    } 
    public synchronized void request(long n) { 
     if (n != 0 && !completed) { 
     completed = true; 
     if (n < 0) { 
      IllegalArgumentException ex = new IllegalArgumentException(); 
      executor.execute(() -> subscriber.onError(ex)); 
     } else { 
      future = executor.submit(() -> { 
      subscriber.onNext(Boolean.TRUE); 
      subscriber.onComplete(); 
      }); 
     } 
     } 
    } 
    public synchronized void cancel() { 
     completed = true; 
     if (future != null) future.cancel(false); 
    } 
    } 
} 
+2

Danke für den aufwendigen Beispielcode. – paul

3

Es gibt keine Betreiber so zu tun, aber die API ermöglicht es Ihnen, die Art und Weise Artikel veröffentlicht zu steuern. Sie können daher die Teilnehmermethoden direkt aus dem aktuellen Thread aufrufen.

class SynchronousPublisher implements Publisher<Data> { 
     public synchronized void subscribe(Subscriber<? super Data> subscriber) { 
      subscriber.onSubscribe(new SynchronousSubscription(subscriber)); 
     } 
} 
static class SynchronousSubscription implements Subscription { 
     private final Subscriber<? super Data> subscriber; 

     SynchronousSubscription(Subscriber<? super Data> subscriber) { 
      this.subscriber = subscriber; 
     } 
     public synchronized void request(long n) { 
      ... // prepare item    
      subscriber.onNext(someItem);  
     } 

     ... 
    } 
} 
+0

Danke, das ist auch eine gute Workaround, immer noch ziemlich ausführlich, aber hey! das ist Java :) – paul

+1

@paul Beachten Sie, dass Java dies als RS-Spezifikation (keine Implementierung) bereitstellt. Es ist keine Alternative zu RxJava. Nur das Minimum benötigt, um als spec. – manouti

2

Es hängt davon ab, was Sie meinen, indem Sie auf dem Hauptthread laufen.

Wenn Sie die Ausführung eines beliebigen Flow in einem bestimmten Thread erzwingen möchten, gibt es keine Standardmethode dafür, es sei denn, der Flow ist in einer Bibliothek implementiert, mit der Sie die asynchronen Teile überschreiben können. In RxJava-Begriffen sind dies die Scheduler s, die von der Schedulers-Dienstprogrammklasse bereitgestellt werden.

Wenn Sie einen Flow in einem Hauptthread beobachten möchten, müssen Sie einen Blockierwarteschlangenkonsumenten auf Flow.Subscriber schreiben, der den Thread blockiert, bis die Warteschlange Elemente enthält. Dies kann kompliziert werden, daher werde ich Sie auf die blockingSubscribe Implementierung in Reactive4JavaFlow verweisen.

Wenn Sie den Java-Haupt-Thread als Executor/Scheduler verwenden möchten, ist das noch komplizierter und benötigt ähnliche Blockierungsmechanismen sowie einige Ideen von einem Threadpool-Executor. Reactive4JavaFlow hat zufällig einen solchen Scheduler, den Sie als Executor verwenden können: new SubmissionPublisher<>(128, blockingScheduler::schedule).

Verwandte Themen