2016-10-08 3 views
3

Ich bin relativ neu zu RxSwift, aber ich freue mich darauf, es mehr in meinen Projekten zu verwenden, und ich würde gerne etwas Feedback zu einem Operator hören, den ich gerade geschrieben habe.Implementieren eines entprellten Puffers mit RxSwift, ist das korrekt?

Die fehlende Funktionalität ist ein entprellter Puffer: Ein Puffer, der sich genau wie der Operator debounce verhält, aber statt nur den letzten Wert ausgibt, sollte er alle gesammelten Werte seit der letzten Emission ausgeben.

In RxJava dies ist leicht erreichbar durch einen Puffer mit einem anderen beobachtbaren als "Schließungswähler" mit:

// From: https://github.com/ReactiveX/RxJava/wiki/Backpressure 
// 
// we have to multicast the original bursty Observable so we can use it 
// both as our source and as the source for our buffer closing selector: 
Observable<Integer> burstyMulticast = bursty.publish().refCount(); 
// burstyDebounced will be our buffer closing selector: 
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS); 
// and this, finally, is the Observable of buffers we're interested in: 
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced); 

In RxSwift obwohl diese Version des Puffers Operators existiert nicht (I denke, dieses Problem ist verwandt: https://github.com/ReactiveX/RxSwift/issues/590), also habe ich versucht, das Problem selbst zu lösen.


Mein erster Ansatz war gerade den Bau der entprellten Puffer:

extension ObservableType { 
    func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> { 
     var valueBuffer: [E] = [] 

     let observable = self.do(onNext: { (value) in 
      valueBuffer.append(value) 
     }, onError: { (error) in 
      valueBuffer = [] 
     }, onCompleted: { 
      valueBuffer = [] 
     }, onSubscribe: { 
      valueBuffer = [] 
     }, onDispose: { 
      valueBuffer = [] 
     }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in 
      let emitValues = valueBuffer 
      valueBuffer = [] 
      return Observable<[E]>.just(emitValues) 
     } 

     return observable 
    } 
} 

Mein zweiter Ansatz der Puffer wurde Gebäude, das jede Schließzustand (wie die RxJava Version):

extension ObservableType { 
    func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> { 
     var valueBuffer: [E] = [] 

     return Observable.create { observer in 
      let selectorSubscription = selector.subscribe(onNext: { (value) in 
       let emitValues = valueBuffer 
       valueBuffer = [] 
       observer.on(.next(emitValues)) 
      }, onError: { (error) in 
       valueBuffer = [] 
       observer.on(.error(error)) 
      }, onCompleted: { 
       valueBuffer = [] 
       observer.on(.completed) 
      }, onDisposed: { 
       valueBuffer = [] 
      }) 

      let subscription = self.subscribe(onNext: { (value) in 
       valueBuffer.append(value) 
      }, onError: { (error) in 
       observer.on(.error(error)) 
       selectorSubscription.dispose() 
      }, onCompleted: { 
       observer.on(.completed) 
       selectorSubscription.dispose() 
      }, onDisposed: { 
       observer.on(.completed) 
       selectorSubscription.dispose() 
      }) 
      return subscription 
     } 
    } 
} 

Ich habe t getestet Diese beiden Operatoren scheinen zu funktionieren und testeten auch verschiedene Kombinationen von onError-, onDispose- und onCompleted-Ereignissen.

Aber ich würde immer noch gerne etwas Feedback von erfahreneren Leuten hören, wenn das zumindest eine akzeptable Lösung ohne Lecks ist, und wenn ich irgendwelche RX-Verträge verletze.

Ich habe auch einen pasterbin mit einigen Testcode: http://pastebin.com/1iAbUPf8

+1

Ich würde vorschlagen, dass Sie eine PR auf vorschlagen [RxSwiftExt] (https://github.com/RxSwiftCommunity/RxSwiftExt) und auch den [RxSwift Slack Kanal] check out (http: //rxswift-slack.herokuapp .com /). – solidcell

+0

Danke, Slack-Kanal ist eine gute Idee, und ich werde darüber nachdenken, PR zu erstellen. – michaelk

Antwort

1

Hier ist mein für buffer(bufferOpenings, bufferClosingSelector). Es kann eine weitere Überprüfung erfordern.

extension ObservableType { 

    func buffer<R>(bufferOpenings: Observable<R>, bufferClosingSelector: (R)->Observable<R>) -> Observable<[E]> { 
     var valueBuffer: [E]? = nil 

     let operatorObservable = Observable<[E]>.create({ observer in 
      let subject = PublishSubject<[E]>() 

      let closingsSub = bufferOpenings 
       .doOnNext({ _ in 
        valueBuffer = [] 
       }) 
       .flatMap({ opening in 
        return bufferClosingSelector(opening) 
       }) 
       .subscribeNext({ _ in 
        if let vb = valueBuffer { 
         subject.onNext(vb) 
        } 
        valueBuffer = nil 
       } 
      ) 

      let bufferSub = self.subscribe(
       onNext: { value in 
        valueBuffer?.append(value) 
       }, 
       onError: { error in 
        subject.onError(error) 
       }, 
       onCompleted: { 
        subject.onCompleted() 
       }, 
       onDisposed: { 
       } 
      ) 

      let subjectSub = subject.subscribe(
       onNext: { (value) in 
        observer.onNext(value) 
       }, 
       onError: { (error) in 
        observer.onError(error) 
       }, 
       onCompleted: { 
        observer.onCompleted() 
       }, 
       onDisposed: { 
       } 
      ) 

      let combinedDisposable = CompositeDisposable() 

      combinedDisposable.addDisposable(closingsSub) 
      combinedDisposable.addDisposable(bufferSub) 
      combinedDisposable.addDisposable(subjectSub) 

      return combinedDisposable 

     }) 

     return operatorObservable 
    } 

} 
Verwandte Themen