2017-07-27 3 views
1

Sagen, ich möchte eine Observable in RxJava machen, die eine Feedback-Kopplung wie auf dem Bild unten hat.RxJava: Erstellen Sie einen eigenständigen Stream

Self-dependent stream

Ich habe es geschafft, dass bei der Verwendung von Probanden zu erreichen, wie folgt aus:

// Observable<Integer> source = Observable.range(0, 6); 

public Observable<Integer> getFeedbackSum(Observable<Integer> source) { 
    UnicastSubject<Integer> feedback = UnicastSubject.create(); 
    Observable<Integer> feedbackSum = Observable.zip(source, feedback.startWith(0), Pair::create) 
     .map(pair -> pair.first + pair.second); 

    feedbackSum.subscribe(feedback); 
    return feedbackSum; 
} 

Es ziemlich hässlich aussieht. Gibt es einen besseren Weg?

+3

Es gibt einen Operator dafür: ['scan'] (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#scan (R,% 20io.reactivex. Funktionen.BiFunktion)). – akarnokd

+0

Ja, total übersehen. Danke, @akarnokd! – 4emodan

Antwort

1

Es ist ein Betreiber dafür: scan:

public final <R> Observable<R> scan(R initialValue, BiFunction<R,? super T,R> accumulator)

Observable.range(0, 6) 
    .scan(0, (a, b) -> a + b) 
    .test() 
    .assertResut(0, 1, 3, 6, 10, 15, 21); 

Wenn Ihr Akku-Typ nicht unveränderlich ist, können Sie scanWith:

public final <R> Observable<R> scanWith(Callable<R> seedSupplier, BiFunction<R,? super T,R> accumulator)

Verwandte Themen