Ich bin relativ neu auf RxJava2 und ich bekomme einige seltsame Verhaltensweisen, so ist es wahrscheinlich, dass ich das Tool auf dem falschen Weg benutze.RxJava2 flatMap erstellt doppelte Ereignisse
Es ist ein ziemlich großes Projekt, aber ich habe das Snippet unten als Mindest reproduzierbaren Code getrennt:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
Dieser Code simuliert von meinem rx-Stream-Ereignisse eine .interval
und flatMap
mit erhalten diese Ereignisse " mach etwas Verarbeitung "und verwendet eine Subject
, um Ergebnisse in den Stream zu drücken.
Der Stream ist ein fortlaufender Prozess mit mehreren verschiedenen Ereignissen. Dieser Mindestcode ist albern, weil ich nur auf den apply
Callback dränge, aber im wirklichen Fall gibt es mehrere mögliche Momente, in denen ein Push passieren kann und die Anzahl der Ereignisse, die während apply
empfangen werden, ist nicht die gleiche Menge das wird über den Betreff gesendet.
Was ich mit diesem Code zu sehen erwarten:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
, was ich habe eigentlich ist:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
Ich habe auch eine Observable<Integer> o = s.share();
und Rückkehr, es zu haben versucht, oder direkt zurück s.share();
mit den gleichen Ergebnissen.
Ich verstehe irgendwie, warum das passiert. Die ObservableSource
wird wieder abonniert und wieder n wieder, so dass es mehr Ereignisse auf jeder Schleife gibt.
Die Frage:
Wie kann ich mein erwartetes Verhalten erreichen?
(im Fall war mein erwartetes Verhalten nicht klar, bitte mehr auf den Kommentaren fragen)
Ich würde versuchen, 'private Betreff verschieben s = PublishSubject.create();' auf einem anderen Bereich, wenn ich in dir wäre –
Blackbelt
@Blackbelt wie ich schon sagte. Das ist ein minimaler reproduzierbarer Code. Auf dem vollständigen Code hat die 'Funktion' ihre eigene Klasse. – Budius
Ich kann beurteilen, was ich sehe und ich kann nicht erraten, was du geschrieben hast. – Blackbelt