2017-02-16 5 views
0

Ich bin relativ neu auf RxJava2 und ich bekomme einige seltsame Verhaltensweisen, so ist es wahrscheinlich, dass ich das Tool auf dem falschen Weg benutze.RxJava2 flatMap erstellt doppelte Ereignisse

Es ist ein ziemlich großes Projekt, aber ich habe das Snippet unten als Mindest reproduzierbaren Code getrennt:

Observable 
    .interval(333, TimeUnit.MILLISECONDS) 
    .flatMap(new Function<Long, ObservableSource<Integer>>() { 
    private Subject<Integer> s = PublishSubject.create(); 
    private int val = 0; 

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception { 
     val++; 
     s.onNext(val); 
     return s; 
     } 
    }) 
    .subscribe(new Consumer<Integer>() { 
    @Override public void accept(Integer integer) throws Exception { 
     Log.w("value: %s", integer); 
    } 
    }); 

Dieser Code simuliert von meinem rx-Stream-Ereignisse eine .interval und flatMap mit erhalten diese Ereignisse " mach etwas Verarbeitung "und verwendet eine Subject, um Ergebnisse in den Stream zu drücken.

Der Stream ist ein fortlaufender Prozess mit mehreren verschiedenen Ereignissen. Dieser Mindestcode ist albern, weil ich nur auf den apply Callback dränge, aber im wirklichen Fall gibt es mehrere mögliche Momente, in denen ein Push passieren kann und die Anzahl der Ereignisse, die während apply empfangen werden, ist nicht die gleiche Menge das wird über den Betreff gesendet.

Was ich mit diesem Code zu sehen erwarten:

value: 2 // 1 got skipped because onNext is called before there's a subscriber. 
value: 3 
value: 4 
value: 5 
value: 6 ... etc 

, was ich habe eigentlich ist:

value: 2 
value: 3 
value: 3 // 3 twice 
value: 4 
value: 4 
value: 4 // 4 repeated 3 times 
value: 5 
value: 5 
value: 5 
value: 5 // 5 repeated 4 times 
value: 6 
value: 6 
value: 6 
value: 6 
value: 6 // 6 repeated 5 times 
... etc 

Ich habe auch eine Observable<Integer> o = s.share(); und Rückkehr, es zu haben versucht, oder direkt zurück s.share(); mit den gleichen Ergebnissen.

Ich verstehe irgendwie, warum das passiert. Die ObservableSource wird wieder abonniert und wieder n wieder, so dass es mehr Ereignisse auf jeder Schleife gibt.

Die Frage:

Wie kann ich mein erwartetes Verhalten erreichen?

(im Fall war mein erwartetes Verhalten nicht klar, bitte mehr auf den Kommentaren fragen)

+0

Ich würde versuchen, 'private Betreff verschieben s = PublishSubject.create();' auf einem anderen Bereich, wenn ich in dir wäre – Blackbelt

+0

@Blackbelt wie ich schon sagte. Das ist ein minimaler reproduzierbarer Code. Auf dem vollständigen Code hat die 'Funktion' ihre eigene Klasse. – Budius

+0

Ich kann beurteilen, was ich sehe und ich kann nicht erraten, was du geschrieben hast. – Blackbelt

Antwort

1

Ihre PublishSubject abonnierte mehrfach, einmal aus Intervall pro Stück().

Bearbeiten: Sie müssen jedes Mal eine neue PublishSubject übergeben (wechseln Sie zu BehaviorSubject, wenn Sie die erste/letzte Emission beibehalten möchten); Übergeben Sie das an den lang andauernden Prozess, und stellen Sie sicher, dass sein onComplete ordnungsgemäß aufgerufen wird, wenn der langwierige Prozess beendet wird.

+0

Das habe ich schon gesagt. Meine Frage ist, wie ich das Verhalten erreichen kann, das ich brauche. – Budius

+0

Bearbeitete Antwort mit Empfehlungen. –

+0

Ich denke auch, dass die Verwendung von PublishSubject ein wenig umständlich ist. Überlegen Sie, Ihren lang andauernden Prozess enger mit Rx zu integrieren. –

1

bearbeiten

Nach jüngsten Äußerungen ich mit dieser Art von einer Lösung kommen könnte:

class MyBluetoothClient { 
    private PublishSubject<BTLEEvent> statusPublishSubject = PublishSubject.create() 

    public Observable<BTLEEvent> getEventObservable() { 
    return statusPublishSubject 
    } 

    private void publishEvent(BTLEEvent event) { 
    statusPublishSubject.onNext(event) 
    } 

    public void doStuff1() { 
    // do something that returns: 
    publishEvent(BTLEEvent.someEvent1) 
    } 

    public void doStuff2() { 
    // do something else that eventually yields 
    publishEvent(BTLEEvent.someEvent2) 
    } 
} 

Und Sie es auf diese Weise verwenden:

MyBluetoothClient client = MyBluetoothClient() 
client 
    .getEventObservable() 
    .subscribe(/* */) 

/// 

client.doStuff1() 

/// 

client.doStuff2 

Original-Antwort

Wird dies tun?

Observable 
    .interval(333, TimeUnit.MILLISECONDS) 
    .flatMap(new Function<Long, ObservableSource<Integer>>() { 
    private int val = 0; 

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception { 
     val++; 
     return Observable.just(val); 
     } 
    }) 
    .subscribe(new Consumer<Integer>() { 
    @Override public void accept(Integer integer) throws Exception { 
     Log.w("value: %s", integer); 
    } 
    }); 
+0

Danke für Ihre Antwort. Für diesen sehr kleinen Beispielcode würde ich es schreiben. Wenn Sie jedoch die vollständige Frage lesen, kann das Subjekt jederzeit neue Ereignisse basierend auf anderen Bedingungen auslösen. Meine erste Sache am Morgen zurück im Büro werde ich versuchen Tassos Vorschlag der Verwendung einer switchMap statt – Budius

+0

Ich sehe. Es ist schwer für mich, eine bessere Antwort zu bekommen, die so wenig gegeben wird wie Sie. Beachten Sie, dass Sie jede Art von Observable wie Observable.range (0, 5) .map (...). Take (4) ... usw. zurückgeben können. Gib mir etwas mehr und ich werde versuchen, eine bessere Antwort zu finden;) – ULazdins

+0

Eine konkretere Anwendungsfälle habe ich (von mehreren anderen): Hören von Bluetooth-LE-Scans, Parsen der Werbung Rahmen (Karte), filtern erwartete Bilder, und (in diesem Fall) trennen Sie die Lesungen in "Enter" -Ereignis, "Update rssi" -Ereignis und verwenden Sie eine Zeitüberschreitung, um ein "Exit" -Ereignis zu generieren. Daher speichert die Klasse den Zeitstempel jedes Ereignisses, wenn sie in einer Karte ankommen, und führt jede Sekunde eine Überprüfung aus, um die "Exit" -Geräte zu verlassen. – Budius

0

Also hier ist die Antwort, die ich kam. Ich werde @Tassos Antwort als richtig markieren, da er mich auf den richtigen Weg gezeigt hat.

Zuerst brauche ich eine CachedSubject (ein Subjekt, das Elemente zwischenspeichert, während es keine Beobachter gibt und sie versendet, sobald ein Beobachter sich verbindet), dies ist notwendig, um sicherzustellen, dass Emissionen aus dem apply wirklich durchkommen. Die Klasse wickelt meist eine PublishSubject.

class CachedSubject<T> extends Subject<T> { 

     private PublishSubject<T> publishSubject = PublishSubject.create(); 
     private Queue<T> cache = new ConcurrentLinkedQueue<>(); 

     @Override public boolean hasObservers() { 
      return publishSubject.hasObservers(); 
     } 

     @Override public boolean hasThrowable() { 
      return publishSubject.hasThrowable(); 
     } 

     @Override public boolean hasComplete() { 
      return publishSubject.hasComplete(); 
     } 

     @Override public Throwable getThrowable() { 
      return publishSubject.getThrowable(); 
     } 

     @Override protected void subscribeActual(Observer<? super T> observer) { 
      while (cache.size() > 0) { 
       observer.onNext(cache.remove()); 
      } 
      publishSubject.subscribeActual(observer); 
     } 

     @Override public void onSubscribe(Disposable d) { 
      publishSubject.onSubscribe(d); 
     } 

     @Override public void onNext(T t) { 
      if (hasObservers()) { 
       publishSubject.onNext(t); 
      } else { 
       cache.add(t); 
      } 
     } 

     @Override public void onError(Throwable e) { 
      publishSubject.onError(e); 
     } 

     @Override public void onComplete() { 
      publishSubject.onComplete(); 
     } 
    } 

dann verwende ich diese Klasse mit einem switchMap:

Observable 
    .interval(1000, TimeUnit.MILLISECONDS) 
    .switchMap(new Function<Long, ObservableSource<Integer>>() { 

     private Subject<Integer> s = new CachedSubject<>(); 
     private int val = 0; 

     @Override public ObservableSource<Integer> apply(Long aLong) throws Exception { 
     val++; 
     s.onNext(val); 
     return s; 
     } 
    }) 
    .subscribe(new Consumer<Integer>() { 
     @Override public void accept(Integer integer) throws Exception { 
     Log.w("value: %s", integer); 
     } 
    }); 

Diese mich effektiv ermöglicht eine beliebige Anzahl von Veranstaltungen auf dem apply<T t> Methode zu erhalten und haben nur 1 Consumer diesen abonniert, empfängt alle Ereignisse davon.

Verwandte Themen