2016-04-03 15 views
1

Hallo ich bin ein Neuling mit RXJava. Ich suche nach einer beobachtbaren Lösung, die fortfahren und die Ausstrahlung von Elementen entsprechend der empfangenen Elemente unterbrechen würde.RXJava PausableBuffer

Können sagen, unsere Bedingung dieser Integer Prädikats ist:

Func1<Integer, Boolean> isOdd = number -> number%2==1; 

Wenn wir Zahlen addieren wie zum Thema: myNumberSubject.onNext(someInt); alle Zahlen, die vor einer ungeraden Zahl hinzugefügt werden, hinzugefügt wird in einem Puffer gelagert sind, sondern diejenigen, die Die erste ungerade Zahl wird hinzugefügt, alle Zahlen im Puffer werden in einem Burst ausgegeben (einschließlich des Gegenstandes, der ungerade war).

Danach wird jede Zahl einzeln ausgegeben, solange sie ungerade ist. Wenn eine gerade Zahl hinzugefügt wird, wird sie erneut in den Puffer gesetzt.

Ich konnte zwar keine echten Arbeitsbeispiele finden, aber dieses Beispiel von pausableBuffer könnte genau das Potenzial haben, das ich versuche zu erreichen. http://rxmarbles.com/#pausableBuffered Ich hoffe, es gibt eine bereits existierende RXJava-Lösung, die den Trick machen würde. Hier ist meine eigene Hack-Job-Lösung.

public class PausableBuffer<R> { 

    private boolean isPaused; 
    private List<R> buffer; 
    private ReplaySubject<R> regulatedSubject; 

    private PausableBuffer(){ 
     regulatedSubject = ReplaySubject.create(); 
     buffer=new LinkedList<>(); 
    } 

    public static <R>Observable<R> create(Observable<R> observable, Func1<R, Boolean> continueCondition){ 
     PausableBuffer<R> pausableBuffer = new PausableBuffer<>(); 
     observable.subscribe(value -> { 
      synchronized(pausableBuffer) { 
       if(pausableBuffer.isPaused){ 
        pausableBuffer.buffer.add(value); 
        if(continueCondition.call(value)){ 
         pausableBuffer.isPaused=false; 
         for (R r : pausableBuffer.buffer) { 
          pausableBuffer.regulatedSubject.onNext(r); 
         } 
         pausableBuffer.buffer.clear(); 
        } 
       }else{ 
        if(continueCondition.call(value)){ 
         pausableBuffer.regulatedSubject.onNext(value); 
        }else{ 
         pausableBuffer.isPaused=true; 
         pausableBuffer.buffer.add(value); 
        } 
       } 
      } 
     }); 
     return pausableBuffer.regulatedSubject.asObservable(); 
    } 

    public static void main(String[] args) { 
     BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(); 
     Observable<Integer> observable = PausableBuffer.<Integer>create(
       behaviorSubject.asObservable(),      
       intValue -> intValue==5 || 6<intValue);//continueCondition 
     observable.subscribe(v -> System.out.print(v+", ")); 
     for (int i = 0; i <= 8; i++) { 
      System.out.print("adding " + i + " : "); 
      behaviorSubject.onNext(i); 
      System.out.println(); 
     } 
    } 
} 

druckt:

  • 0 Zugabe:
  • 1 Zugabe:
  • 2 Zusatz:
  • 3 Hinzufügen:
  • 4 Hinzufügen:
  • Zugabe von 5: 0 , 1, 2, 3, 4, 5,
  • Zugabe von 6:
  • Zugabe von 7: 6, 7,
  • Zugabe von 8: 8,
+1

Sie suchen wahrscheinlich nach etwas wie diesem benutzerdefinierten 'BufferUntil'-Operator http://StackOverflow.com/a/35890924/4096987 – AndroidEx

Antwort

0

Zweite Version:

public final class ContinueWhile<T> implements Observable.Operator<T, T> { 

final Func1<T, Boolean> continuePredicate; 

private ContinueWhile(Func1<T, Boolean> continuePredicate) { 
    this.continuePredicate = continuePredicate; 
} 

public static <T>ContinueWhile<T> create(Func1<T, Boolean> whileTrue){ 
    return new ContinueWhile<>(whileTrue); 
} 

@Override 
public Subscriber<? super T> call(Subscriber<? super T> child) { 
    ContinueWhileSubscriber parent = new ContinueWhileSubscriber(child); 
    child.add(parent); 
    return parent; 
} 

final class ContinueWhileSubscriber extends Subscriber<T> { 

    final Subscriber<? super T> actual; 
    Deque<T> buffer = new ConcurrentLinkedDeque<>(); 

    public ContinueWhileSubscriber(Subscriber<? super T> actual) { 
     this.actual = actual; 
    } 

    @Override 
    public void onNext(T t) { 
     buffer.add(t); 
     if (continuePredicate.call(t)) { 
      while(!buffer.isEmpty()) 
       actual.onNext(buffer.poll()); 
     } 
    } 

    @Override 
    public void onError(Throwable e) { 
     buffer = null; 
     actual.onError(e); 
    } 

    @Override 
    public void onCompleted() { 
     while (!buffer.isEmpty()) 
      actual.onNext(buffer.poll()); 
     buffer=null; 
     actual.onCompleted(); 
    } 
} 
} 




public static void main(String[] args) { 
    BehaviorSubject<Integer> subject = BehaviorSubject.create(); 
    subject.asObservable() 
      .doOnNext(v -> System.out.print("next ")) 
      .lift(ContinueWhile.create(i -> i%3==0)) 
      .subscribe(v -> System.out.print(v + ", ")); 
    for (int i = 0; i < 10; i++) { 
     subject.onNext(i); 
    } 
} 
} 

Dank AndroidEx.