2017-11-22 2 views
0

Ich bin mir nicht sicher, dass ich repeatUntil verstehe. Ich wollte so etwas haben:RxJava2, repeatUntil und whileDo

  1. beobachtbaren Erstellen von nur
  2. Zustand prüfen und wenn es alle downstreams wahr laufen ist, sonst nur komplette
  3. Wiederholen Sie den zweiten Punkt

Hier ist mein Test Observable, die in IndexOutOfBoundsException läuft

List<Integer> values = new ArrayList<Integer>(); 

    Observable<Integer> observable = Observable.just(values) 
      .repeatUntil(() -> values.isEmpty()) 
      .map(integers -> values.remove(0)); 
    observable.subscribe(integer -> System.out.println(integer), 
      throwable -> System.out.println(throwable)); 

Es scheint mir, repeatUntil wird zum ersten Mal am Ende ausgeführt werden.

Bin ich falsch? Und wenn ich recht habe, wie kann ich den Zustand am Anfang überprüfen?

Ich sah auch, dass es whileDo-Operator in rxJava1 gab. Wo ist es jetzt?

UPD

I unter Verwendung von Takewhile vor repeatUntil das Problem irgendwie lösen könnte, aber vielleicht ist es eine bessere Lösung?

List<Integer> values = new ArrayList<Integer>(); 
    /* 
    values.add(0); 
    values.add(1); 
    values.add(2); 
    values.add(3); 
    values.add(4); 
    */ 

    Observable<Integer> observable = Observable.just(values) 
      .takeWhile(integers -> values.size() > 0) 
      .map(integers -> values.remove(0)) 
      .repeatUntil(() -> values.isEmpty()); 
    observable.subscribe(integer -> System.out.println("Test " + integer), 
      throwable -> System.out.println(throwable), 
      () -> System.out.println("Completed")); 

Antwort

2

RepeatUntil ruft die Funktion, wenn seine Upstream abgeschlossen ist, das ist, nachdem er den leeren ArrayList signalisiert hat, so dass Sie die Ausnahme, weil die remove, dass leere ArrayList finden.

Der Operator whileDo lebt im Projekt RxJava 2 Extensions.

List<Integer> values = new ArrayList<Integer>(); 

StatementFlowable.whileDo(
    Observable.just(values), 
    () -> values.isEmpty() 
) 
.map(integers -> values.remove(0)) 
.subscribe(integer -> System.out.println(integer), 
     throwable -> System.out.println(throwable)) 
; 
+0

Vielen Dank für Ihre Antwort. Ich habe meine Frage aktualisiert. – Tima

+0

Und was geschah während whileDo Betreiber von der rxJava1? Oder war es immer in deiner Bibliothek? – Tima

+0

Das 'whileDo' war fast 4 Jahre in der [RxJavaComputationExpressions] (https://github.com/ReactiveX/RxJavaComputationExpressions) -Bibliothek. Der Release-Prozess für diese Bibliothek wurde nicht ordnungsgemäß eingerichtet und es war viel einfacher, das Upgrade auf Version 2 in meinem eigenen Repository durchzuführen, das andere ältere Addon-Bibliotheksinhalte enthält. – akarnokd