2017-11-22 1 views
2

der Suche nach Rxjava Betreiber zu verschmelzen Quellen in einen Strom haben derzeit dieseder Suche nach Rxjava Betreiber Quellen in einen Strom

Disposable observable = Observable.concat(
       service.loadPopCells().toObservable(), 
       service.loadChallangeData().toObservable(), 
       service.loadUserCell().toObservable() 
     )    
    .subscribe(data->sendtoViewmodel(data)); // called 3 times 

ich habe 3 Strom, usw. abonnieren genannt wird drei Mal, aber ich will fusionieren es einmal mit allen Daten aufgerufen werden

suchen so etwas wie dieses

Disposable observable = Observable.concat(
       service.loadPopCells().toObservable(), 
       service.loadChallangeData().toObservable(), 
       service.loadUserCell().toObservable() 
     ) 
     // i want to achieve something like this 
     .mapallresult(data,data2,data3){ 
     private List<SimpleCell> shots = new ArrayList<>(); 
     shots.add(data); 
     shots.add(data2); 
     shots.add(data2); 
     return shots; } 
     /// 
     .subscribe(dataList->sendtoViewmodel(dataList); // called once 
+0

Ich denke, Sie suchen nach 'zip' - es dauert, in Ihrem Beispiel, 3 Werte und eine' Function3', die diese zu einem Ergebnis kombiniert. –

+0

Sie müssen den ZIP-Operator verwenden, dies ist ein Implementierungsbeispiel: https://github.com/Fakher-Hakim/Rx-Android-Samples/blob/master/app/src/main/java/com/fakher/rx/ ui/fragments/ZipOperatorFragment.java – Fakher

Antwort

2

zip Betreiber Ihnen helfen:

Observable.zip(
     service.loadPopCells().toObservable(), 
     service.loadChallangeData().toObservable(), 
     service.loadUserCell().toObservable(), 
     (data1, data2, data3) -> Arrays.asList(data1, data2, data3)) 
     .subscribe(dataList -> sendtoViewmodel(dataList)); 
    } 

Oder noch kürzer:

Observable.zip(
    service.loadPopCells().toObservable(), 
    service.loadChallangeData().toObservable(), 
    service.loadUserCell().toObservable(), 
    Arrays::asList) 
    .subscribe(this::sendtoViewmodel); 
1

ich denke, bekommen, was Sie brauchen, ist die Zip-Operator , Alle beobachtbaren Betrieb synchron oder Asynchron-laufen und dann die Ergebnisse zusammenkommen

Sehen Sie sich diese Beispiele

/** 
    * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline 
    */ 
    @Test 
    public void testZip() { 
     long start = System.currentTimeMillis(); 
     Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) 
       .concat(s3)) 
       .subscribe(result -> showResult("Sync in:", start, result)); 
    } 


    public void showResult(String transactionType, long start, String result) { 
     System.out.println(result + " " + 
       transactionType + String.valueOf(System.currentTimeMillis() - start)); 
    } 

    public Observable<String> obString() { 
     return Observable.just("") 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
          .getName()); 
       }) 
       .map(val -> "Hello"); 
    } 

    public Observable<String> obString1() { 
     return Observable.just("") 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
          .getName()); 
       }) 
       .map(val -> " World"); 
    } 

    public Observable<String> obString2() { 
     return Observable.just("") 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
          .getName()); 
       }) 
       .map(val -> "!"); 
    } 

Oder das gleiche Beispiel async

private Scheduler scheduler; 
private Scheduler scheduler1; 
private Scheduler scheduler2; 

/** 
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel. 
* By default Rx is not async, only if you explicitly use subscribeOn. 
*/ 
@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
      .concat(s3)) 
      .subscribe(result -> showResult("Async in:", start, result)); 
} 

private Observable<String> obAsyncString() { 
    return Observable.just("") 
      .observeOn(scheduler) 
      .doOnNext(val -> { 
       System.out.println("Thread " + Thread.currentThread() 
         .getName()); 
      }) 
      .map(val -> "Hello"); 
} 

private Observable<String> obAsyncString1() { 
    return Observable.just("") 
      .observeOn(scheduler1) 
      .doOnNext(val -> { 
       System.out.println("Thread " + Thread.currentThread() 
         .getName()); 
      }) 
      .map(val -> " World"); 
} 

private Observable<String> obAsyncString2() { 
    return Observable.just("") 
      .observeOn(scheduler2) 
      .doOnNext(val -> { 
       System.out.println("Thread " + Thread.currentThread() 
         .getName()); 
      }) 
      .map(val -> "!"); 
} 
+0

Ihre "(s, s2, s3)" sind keine Observablen, daher können Sie sie nicht konkludieren. –

Verwandte Themen