2016-10-28 2 views
1

Es gibt Methode updateFromRemote():RxJava für boolean (Bedingung) warten

public class WorkshiftSettingsDaoImpl implements WorkshiftSettingsDao { 

    private boolean isUpdating = false; 

    public Observable<WorkshiftSettings> updateFromRemote() { 
     return remoteDataStore.get() 
       .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
       .doOnSubscribe(this::setUpdatingStarted) 
       .doOnUnsubscribe(this::setUpdatingFinished) 
       .flatMap(workshiftSettings -> localDataStore.put(workshiftSettings)); 
    } 

    private void setUpdatingStarted() { 
     if(isUpdating) throw new RuntimeException("already updating"); 
     isUpdating = true; 
    } 

    private void setUpdatingFinished() { 
     if(!isUpdating) throw new RuntimeException("already finished"); 
     isUpdating = false; 
    } 

} 

Wie kann ich dieses Verhalten implementieren:

wenn isUpdating == true dann warten, bis es zu false und ausführen präsentiert Kette geändert werden.

wenn isUpdating == false dann nur die vorgestellte Kette ausführen.

Es ist meine Lösung:

public Observable<WorkshiftSettings> updateFromRemote() { 
    Observable<WorkshiftSettings> updateRemoveDataObservable = remoteDataStore.get() 
      .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
      .doOnSubscribe(this::setUpdatingStarted) 
      .doOnUnsubscribe(this::setUpdatingFinished) 
      .flatMap(workshiftSettings -> localDataStore.put(workshiftSettings)); 

    return Observable.fromCallable(() -> { 
     while (isUpdating) { 
      Thread.sleep(1000); 
     } 
     return null; 
    }).concatMap(o -> updateRemoveDataObservable); 
} 

Aber ich denke, es ist etwas falsch :)

Jede bessere Idee?

+0

Ist es eine harte Anforderung, die Sie auf handeln lesen die Änderung von isUpdating, oder ist es möglich, eine setUpdating-Methode hinzuzufügen? Wenn Sie das können, können Sie 'onNext' auf einem' Subject'-Objekt von setUpdating aufrufen und den Betreff mit Ihrem Observable zusammenführen. –

+0

'isUpdating' geändert durch' setUpdatingStarted() '/' setUpdatingFinished() '. Ich habe meine Frage bearbeitet, um sie einzuschließen. – Alexandr

Antwort

3

Nicht sicher, habe ich richtig Ihren Fluss bekommen. Aber ich denke, Sie haben einen "Ort", wo isUpdating geändert wird. Wenn ja, können Sie einfach final BehaviorSubject<Boolean> isUpdatingSubject = BehaviorSubject.<Boolean>create() erstellen und in diesem "Ort", statt boolean Variable zu ändern, was Sie tun: isUpdatingSubject.onNext(false)

Und

public Observable<WorkshiftSettings> updateFromRemote() { 
    return isUpdatingSubject 
      .distinctUntilChanged() 
      .filter(new Func1<Boolean, Boolean>() { 
       @Override 
       public Boolean call(BookingErrorActivity isUpdating) { 
        return !isUpdating; 
       } 
      }) 
      .flatMap(new Func1<Object, Observable<WorkshiftSettings>>() { 
       @Override 
       public Observable<WorkshiftSettings> call(Object o) { 
        return remoteDataStore.get() 
          .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
          .doOnSubscribe(this::setUpdatingStarted) 
          .doOnUnsubscribe(this::setUpdatingFinished) 
          .flatMap(workshiftSettings -> localDataStore.put(workshiftSettings)); 
       } 
      }); 

} 

Aber carefule: mit einer solchen Lösung dieses beobachtbare wird jedes Mal emittieren, wenn isUpdatingSubjecttrue ausstrahlt, so kann es manchmal nicht cool sein. von ihm zu verhindern, dass Sie .first() vor flatMap()

PS Sie mehr über BehaviorSubject

PSS Leider verwendet, verwenden kann ich zu Java7 Syntax

+0

Danke für eine Antwort, aber ich meinte etwas anderes. 'isUpdating' wurde von' WorkshiftSettingsDaoImpl' selbst geändert. Siehe aktualisierte Frage. – Alexandr

+0

Okay, ich denke du brauchst '.debounce (1000, TimeUnit.MILLISECONDS, uiScheduler)' –

+1

Okay, ich denke du brauchst etwas wie: '.debounce (1000, TimeUnit.MILLISECONDS, uiScheduler) .withLatestFrom (isUpdatingSubject, [secondParam]) .filter ([isFalse]) ' –