2017-09-05 2 views
0

ich RX bin neu und habe mit dem nächsten stecken:RXJava2 „nicht enden wollende“ beobachtbare

ich SQLBrite bin mit und wenn Abfrage-Methode aufgerufen wird, gibt mir eine „unendliche“ beobachtbare zurück, die an die DB hört Änderungen. Wenn einige Objekte ausgesendet werden, muss ich einige asynchrone Anrufe in einer Schleife machen und die erhaltenen Objekte zurückgeben. Der Beispielcode:

public Observable<List<Entity>> execute() { 
    return someManager.getAccount() 
      .take(1) // I do not need RX to trigger updates when account changed, so use take(1) 
      .flatMap(accountOptional -> { 
       if (accountOptional.isPresent()) { 
        SomeAccount account = accountOptional.get(); 
        return someManager.getEntityDataSource() 
          .query(new EntitySpecification(account.getId())); 
       } 
       return Observable.just(new ArrayList<Entity>()); // No account - no entities 
      }) 
      .flatMapIterable(entities -> entities) // Here I have a list of entities i I need to make async calls to fill entities with some iiner data (some other entities) 
      .flatMap(this::loadInnerData) 
      .toList() 
      .toObservable(); 
} 


private Observable<Entity> loadInnerData(Entity entity) { 
    return do some work with entity; 
    } 

Das Problem kommt, wenn ich ToList() verwenden - es wartet, bis beobachtbare endet es die Aufgabe - aber es wird nicht durchgeführt werden, da dies zu beobachten ist mit dem db hören. Wie kann ich die Fähigkeit erreichen zu hören, nicht aufhören zu hören, die "unendlichen" Observable und asynchrone Schleifenaufrufe mit RX (wie es mit loadInnerData() getan)?

Antwort

0

Vielleicht mit

return Observable.create(new Action1<Emitter<String>>() { 
     @Override 
     public void call(final Emitter<String> stringEmitter) { 
      // here u can use onNext/onError 
          stringEmitter.onNext(s); 

          stringEmitter.onError(e); 
         } 
        }); 

      stringEmitter.setCancellation(new Cancellable() { 
       @Override 
       public void cancel() throws Exception { 
        subscription.unsubscribe(); 
       } 
      }); 
     } 
    }, Emitter.BackpressureMode.BUFFER); 
versuchen
Verwandte Themen