2015-05-12 10 views
7

Ich möchte asynchron zwei Netzwerkaufrufe durchführen - ich verwende Retrofit + RxJava, um dies zu erreichen. Diese Logik stammt aus einer einfachen Runner-Klasse, um die Lösung zu testen. HINWEIS: Dies betrifft hauptsächlich RxJava auf der Serverseite.Korrekte Verwendung von Retrofit + RxJava's combateLatest

Mein Code sieht wie folgt aus:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.combineLatest(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, String>() { 
     @Override public String call(List<State> states, CmsContent content) { 
      ... 
      return "PLACEHOLDER"; 
     } 
     }) 
     .observeOn(Schedulers.immediate()) 
     .subscribeOn(Schedulers.immediate()) 
     .subscribe(new Observer<String>() { 
     @Override public void onCompleted() { 
      System.out.println("COMPLETED"); 
     } 

     @Override public void onError(Throwable e) { 
      System.out.println("ERROR: " + e.getMessage()); 
     } 

     @Override public void onNext(String s) { 
      // I don't care what's returned here 
     } 
     }); 
} 

Drei Fragen:

  1. Ist Observable.combineLatest der beste Operator zu verwenden, wenn Sie mehrere REST Anrufe asynchron ausführen wollen und gehen, wenn alle Anrufe beendet haben ?
  2. Meine Func2 Implementierung gibt derzeit ein String. Nachdem die 2 API-Aufrufe ausgeführt wurden, werden die Ergebnisse innerhalb der Func2#call()-Methode verarbeitet. Es ist mir egal, was zurückgegeben wird - es muss einen besseren Weg geben, damit umzugehen - bin ich richtig?
  3. Die API-Aufrufe werden korrekt mit dem obigen Code ausgeführt. Aber die main Methode wird nicht mit dem richtigen Process finished with exit code 0 abgeschlossen, wenn ich das Programm ausführen. Was könnte dazu führen, dass der Code hängen bleibt?

UPDATE - 2015-05-14

auf Empfehlung Basierend habe ich die Logik wie folgt geändert:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.zip(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, Boolean>() { 
     @Override public Boolean call(List<State> states, CmsContent content) { 
      // process data 
      return true; 
     } 
     }) 
     .subscribeOn(Schedulers.io()) 
     .toBlocking() 
     .first(); 
} 

Diese Lösung sieht aus wie ich war Auf der Suche nach. Ich werde es einige Zeit benutzen, um zu sehen, ob ich irgendwelche Probleme habe. Kein

Antwort

1

1) die beste ist zip() zu verwenden. Letztes Kombinieren ist gut, wenn eine der zwei (oder mehr) Apis "langsamere" unterschiedliche Ergebnisse zurückgibt/es hat die Essenz des Caching.

2) Fun2 erleichtert das Zusammenführen der Ergebnisse. Es ist besser (in der Architektur), das Ergebnis entweder in onNext() oder onError() zu verarbeiten. Sie können eine einfache Klasse Pair<T,Y> verwenden, um die Ergebnisse von Func2 an onNext() zu übergeben.

3) Nichts ist falsch. Das Ergebnis sollte wie gesagt in onNext() und nicht in onComplete verarbeitet werden. Laut Retrofit's source code werden die Ergebnisse nur (natürlich korrekt) in onNext() übergeben.

Hoffen Sie diese Hilfe.

5

1) Wenn Sie wissen, dass Sie einen einzelnen Wert auf beiden Pfaden haben, ist es so gut wie zip.

2) Was möchten Sie tun? Sie erhalten das Wertepaar in Ihrem Func2 und wenn Sie nicht wirklich interessiert, was mit onNext reist, geben Sie einen Wert Ihrer Wahl zurück.

3) Schedulers.immediate() ist in keinster Weise ein echter Scheduler und ist ziemlich anfällig für Deadlock-Szenarien im selben Pool. Du brauchst es wirklich nicht zu benutzen. Wenn Sie den Hauptthread blockieren möchten, bis die asynchrone Arbeit beendet ist, verwenden Sie beispielsweise toBlocking().first().

+0

Danke, ich habe meine Codebeispiel mit Ihren Empfehlungen aktualisiert. – Kasa

1

Ich weiß, dass ich über ein Jahr zu spät auf das bin, aber die Bearbeitung durch die OP auf 2015.05.14 geschrieben erfüllt nicht seine ursprüngliche Forderung:

I-2-Netzwerk Anrufe ausführen möchten asynchron

  1. Observable getStates und getCmsContent werden nicht gleichzeitig ausgeführt werden, wenn sie einzeln auf separaten Threads abonnieren. Dies ist ein entscheidender Punkt in seinem Beitrag weggelassen und keiner der vorherigen Antwort rief es auf.

    Observable.fromCallable(() -> doStuff()) 
        .subscribeOn(Schedulers.computation()); 
    

@akarnokd Wie in Fall gesagt, beide Ströme Einzelwerte haben, und zipcombineLatest ähnlich verhalten. Die Merge-Funktion wird so lange blockiert, bis sowohl getStates als auch getCmsContent zurückkehren, aber wie oben gezeigt, werden alle gleichzeitig auf separaten Threads ausgeführt.

  1. Eine andere Lösung auf der Fähigkeit abhängt, die List<States> und CmsContent zu fusionieren, wie sie ankommen. Angesichts seines Codes gibt es eindeutig einen "Datenhalter" irgendeiner Art (nicht gezeigt), weil das, was er zurückgibt, ein ist, nicht die zusammengeführten Daten. Im Folgenden wird forEach gleichzeitig ausgeführt.

    Observable.just(api.getStates(), api.getCmsContent()) 
    // subscribe on separate thread as shown previously 
    .flatMap(this::buildObservable) 
    .toBlocking() 
    // executes concurrently 
    .forEach(item -> { 
        // merge into "data holder"   
    }); 
    

Natürlich ist dieser Code hat das Problem der nicht stark typisiert ist, so dass ist eine Wahl zu treffen.

Verwandte Themen