2017-03-10 4 views
1

Ich habe eine Ereignissequenz eingerichtet mit RxJava Observablen. Grundsätzlich füge ich verschiedene Ereignisse zusammen, die mit Observable.just(Events.*) mit verschiedenen Verzögerungen erstellt wurden, die mit observable.delay(time, timeUnit, scheduler) Funktion eingerichtet werden. Dann poste ich sie auf PublishSubject (events im Code unten) und abonniere das PublishSubject, um die Reihenfolge zu beobachten (observeEvents() Funktion in dem Code unten). Früher funktionierte es gut, aber in letzter Zeit sehe ich ein sehr seltsames Verhalten auf meinem Gerät (OnePlus One mit Android 5.0.2) (und sehe es nicht im Emulator). Grundsätzlich werden die Ereignisse durcheinander gebracht, Ereignisse mit höherer Verzögerung können vor Ereignissen mit kleinerer Verzögerung auftreten, Ereignisse mit geringer Verzögerung können am Ende der Warteschlange auftreten, manchmal kommen alle Ereignisse in der richtigen Reihenfolge. Die ersten 3 Ereignisse werden besonders oft gemischt. Manchmal werden einige Ereignisse überhaupt nicht beobachtet. Was könnte hier passieren?RxJava verzögerte Observables Feuer mit unerwarteten Verzögerungen

Der Code ist in Kotlin:

var computationScheduler = Schedulers.computation() 

private val events: PublishSubject<Events> = PublishSubject.create() 
private val userActionSubject: PublishSubject<Events> = PublishSubject.create() 

Observable.merge(
      event0(), 
      event1(), 
      event2(), 
      userActionOrEvent3(), 
      userActionOrEvent4()) 
      .subscribe({ 
       // Weird timings are observed here already 
       events.onNext(it) 
      }, { e -> 
       events.onError(e) 
      })) 

private fun userActionOrEvent4(): Observable<Events> { 
    return Observable.amb(Observable.just(Events.Event4) 
      .delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler)) 
      .take(1) 
} 

private fun userActionOrEvent3(): Observable<Events> { 
    return Observable.amb(Observable.just(Events.Event3) 
      .delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler)) 
      .take(1) 
} 

private fun event2() = Observable.just(Events.Event2) 
     .delay(1800, TimeUnit.MILLISECONDS, computationScheduler) 

private fun event1() = Observable.just(Events.Event1) 
     .delay(200, TimeUnit.MILLISECONDS, computationScheduler) 

private fun event0() = Observable.just(Events.Event0) 
     .subscribeOn(computationScheduler) 

open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread()) 

open fun onUserAction() { 
    userActionSubject.onNext(Events.Action) 
} 

Antwort

0

stellte sich heraus, verwenden wird erklären, Problem wurde von computationScheduler verursacht, als ich Schedulers.computation() zu Schedulers.newThread() Ereignisse änderte, fing an, zu erwarteten Zeiten zu schießen.

Verwandte Themen