2015-09-22 5 views
7

Angenommen, ich habe folgenden RxJava Code (die eine DB zugreift, aber der genaue Anwendungsfall ist irrelevant):Ist Thread in RxJava sicher abgemeldet?

public Observable<List<DbPlaceDto>> getPlaceByStringId(final List<String> stringIds) { 
    return Observable.create(new Observable.OnSubscribe<List<DbPlaceDto>>() { 
     @Override 
     public void call(Subscriber<? super List<DbPlaceDto>> subscriber) { 
      try { 
       Cursor c = getPlacseDb(stringIds); 

       List<DbPlaceDto> dbPlaceDtoList = new ArrayList<>(); 
       while (c.moveToNext()) { 
        dbPlaceDtoList.add(getDbPlaceDto(c)); 
       } 
       c.close(); 

       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(dbPlaceDtoList); 
        subscriber.onCompleted(); 
       } 
      } catch (Exception e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 
     } 
    }); 
} 

dieser Code erhalten, habe ich folgende Fragen:

  1. Wenn jemand Melden Sie sich von der Observable ab, die von dieser Methode zurückgegeben wurde (nach einer vorherigen Subskription), ist diese Operation Thread-sicher? Also sind meine 'isUnsubscribed()' Checks in diesem Sinne korrekt, unabhängig von der Planung?

  2. Gibt es einen saubereren Weg mit weniger Code, um auf Abmeldestatus zu prüfen, als ich hier verwende? Ich konnte im Rahmen nichts finden. Ich dachte, SafeSubscriber löst das Problem, keine Ereignisse weiterzuleiten, wenn der Abonnent abgemeldet ist, aber anscheinend nicht.

Antwort

8

ist, dass der Betrieb threadsicher?

Ja. Sie erhalten einen rx.Subscriber, der (eventuell) einen flüchtigen Booleschen Wert überprüft, der auf "true" gesetzt ist, wenn das Abonnement des Abonnenten abgemeldet wird.

sauberer Weg mit weniger Standardcode für unsubscribed Staaten zu überprüfen

Die SyncOnSubscribe und die AsyncOnSubscribe (erhältlich als @Experimental api ab Release 1.0.15) wurde für diesen Anwendungsfall erstellt. Sie funktionieren als sichere Alternative zum Anruf Observable.create. Hier ist ein (konstruiertes) Beispiel für den synchronen Fall.

public static class FooState { 
    public Integer next() { 
     return 1; 
    } 
    public void shutdown() { 

    } 
    public FooState nextState() { 
     return new FooState(); 
    } 
} 
public static void main(String[] args) { 
    OnSubscribe<Integer> sos = SyncOnSubscribe.createStateful(FooState::new, 
      (state, o) -> { 
       o.onNext(state.next()); 
       return state.nextState(); 
      }, 
      state -> state.shutdown()); 
    Observable<Integer> obs = Observable.create(sos); 
} 

Beachten Sie, dass die SyncOnSubscribe nächste Funktion nicht observer.onNext mehr nennen darf als einmal pro Iteration noch kann es in diesem Beobachter nennen gleichzeitig. Hier sind ein paar Links zu den SyncOnSubscribeimplementation und tests auf dem Kopf der 1.x Zweig. Die primäre Verwendung besteht darin, das Schreiben von Observablen zu vereinfachen, die Daten synchron und on-next downstream iterieren oder analysieren, dies jedoch in einem Framework, das Rückdruck unterstützt und überprüft, ob es nicht abonniert ist. Im Wesentlichen würden Sie eine next-Funktion erstellen, die jedes Mal aufgerufen wird, wenn die Downstream-Operatoren ein neues Datenelement onNexted benötigen. Ihre nächste Funktion kann onNext entweder 0 oder 1 Mal aufrufen.

Die AsyncOnSubscribe ist so konzipiert, dass sie gut mit Gegendruck für beobachtbare Quellen funktioniert, die asynchron arbeiten (z. B. Off-Box-Anrufe). Zu den Argumenten für Ihre nächste Funktion gehören die Anzahl der Anfragen und die bereitgestellte beobachtbare Information sollte eine beobachtbare Information liefern, die Daten bis zu dem angeforderten Betrag erfüllt. Ein Beispiel für dieses Verhalten wären paginierte Abfragen von einer externen Datenquelle.

Bisher war es eine sichere Übung, Ihre OnSubscribe in eine Iterable umzuwandeln und Observable.from(Iterable) zu verwenden. Diese Implementierung erhält einen Iterator und überprüft subscriber.isUnsubscribed() für Sie.

+0

Danke, Sie haben tatsächlich eine andere Frage von mir bezüglich der Erstellung von "benutzerdefinierten" Observablen mit angemessener Gegendruckunterstützung beantwortet! Ich habe SyncSubscriber ausgecheckt und es sieht wirklich gut aus.In vielen Fällen würde ich eine semantische Umwandlung einer Operation in ein Iterable ein bisschen peinlich sehen, aber es ist immer noch gut zu wissen, dass wir auf diese Weise eine einfache Gegendruck-Unterstützung bekommen können. Ich sehe aber, dass diese Klasse immer noch als @Experimental markiert ist, wann denkst du, dass es grob gesagt produktionsbereit sein kann? –

+0

Gut zu hören! Der nächste Schritt ist, es in eine Veröffentlichung zu bringen (die bald in Version 1.0.15 erscheinen sollte). Danach wird es im Allgemeinen in den Status "@ Beta" oder direkt in den öffentlichen Status befördert, wenn wir Vertrauen gewinnen. – Aaron