2017-04-16 4 views
5

Ich habe den folgenden Code:RxJava: beobachtbare und Standard-Thread

Observable.create(new ObservableOnSubscribe<String>() { 
      @Override 
      public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { 
       Thread thread = new Thread(new Runnable() { 
        @Override 
        public void run() { 
         s.onNext("1"); 
         s.onComplete(); 
        } 
       }); 
       thread.setName("background-thread-1"); 
       thread.start(); 
      } 
     }).map(new Function<String, String>() { 
      @Override 
      public String apply(@NonNull String s) throws Exception { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("map: thread=" + threadName); 
       return "map-" + s; 
      } 
     }).subscribe(new Observer<String>() { 
      @Override 
      public void onSubscribe(Disposable d) {} 

      @Override 
      public void onNext(String s) { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onNext: thread=" + threadName + ", value=" + s); 
      } 

      @Override 
      public void onError(Throwable e) {} 

      @Override 
      public void onComplete() { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onComplete: thread=" + threadName); 
      } 
     }); 

Und hier ist der Ausgang:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1 

Wichtiges Detail: Ich bin die subscribe Methode von einem anderen Thread aufrufen (main Faden in Android).

sieht also wie Observable Klasse synchron und ist standardmäßig und führt alles (Operatoren wie map + anmelde Abonnenten) auf dem gleichen Thread, die Ereignisse emittiert (s.onNext), nicht wahr? Ich frage mich ... ist es beabsichtigtes Verhalten oder ich habe gerade etwas falsch verstanden? Eigentlich habe ich erwartet, dass mindestens onNext und onComplete Callbacks auf dem Thread des Aufrufers aufgerufen werden, nicht auf dem einen, der Ereignisse ausgibt. Verstehe ich richtig, dass in diesem speziellen Fall der tatsächliche Thread des Aufrufers keine Rolle spielt? Zumindest wenn Ereignisse asynchron generiert werden.

Eine weitere Sorge - was passiert, wenn ich einige Observable als Parameter von einer externen Quelle (dh ich erstelle es nicht allein) ... es gibt keine Möglichkeit für mich als Benutzer zu prüfen, ob es ist synchron oder asynchron und ich muss nur explizit angeben, wo ich Callbacks über subscribeOn und observeOn Methoden erhalten möchte, oder?

Danke!

Antwort

4

RxJava ist unpioniert über Nebenläufigkeit. Wenn Sie keinen anderen Mechanismus wie "observeOn/subscribeOn" verwenden, werden im abonnierenden Thread Werte generiert. Bitte verwenden Sie keine Low-Level-Konstrukte wie Thread in Operatoren, Sie könnten den Vertrag brechen.

Aufgrund der Verwendung von Thread wird der onNext von dem aufrufenden Thread ('background-thread-1') aufgerufen. Das Abonnement erfolgt auf dem aufrufenden (UI-Thread). Jeder Operator in der Kette wird von 'background-thread-1'-calling-Thread aufgerufen. Das Abonnement onNext wird auch von 'background-thread-1' aufgerufen.

Wenn Sie Werte produzieren möchten, die nicht im aufrufenden Thread sind, verwenden Sie: subscribeOn. Wenn Sie den Thread wieder zur Hauptverwendung zurückschalten möchten, beobachten Sie irgendwo in der Kette. Höchstwahrscheinlich vor dem Abonnieren.

Beispiel:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads 
      .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite 
      .map(integer -> integer) // map happens on Computational-Threads 
      .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread 
      .subscribe(integer -> { 
       // called from mainThread 
      }); 

Hier ist ein guter explanitation. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

Verwandte Themen