2017-10-03 1 views
0

Ich habe eine Observable, die Elemente emittiert und sie auf den Server hochladen. HierRxJava 2 Observable mit flatMapCompletable wird nicht abgeschlossen

ist der Code:

repository 
      .getItems() 
      .doOnComplete(() -> Log.d(TAG, "No items left.")) 
      .flatMapCompletable(item -> 
        repository 
          .uploadItems(item) 
          .onErrorComplete() 
          .andThen(
            deleteTemporaryItem() 
              .onErrorComplete() 
          ) 
      ); 

getItems Methode Elemente nacheinander emittiert und dann abgeschlossen ist, uploadItems Methode, um sie auf den Server hochladen. Das Problem ist, wenn es keine Elemente gibt, die alle Kette onComplete-Ereignis gerade gut funktioniert und alle meine Abonnenten erhalten dieses Ereignis und fortfahren, aber wenn es einige Elemente und alle von ihnen hochgeladen wurden OnComplete Ereignisse geht nicht weiter als .doOnComplete (() -> Log.d (TAG, "Keine Artikel übrig.")) Methode und alle Abonnenten erhalten dieses Ereignis nicht. Ich habe onErrorComplete hinzugefügt, um sicherzustellen, dass alle Methoden nach uploadItems abgeschlossen sind, und ich sehe auch in Protokollen, dass alle von ihnen abgeschlossen wurden, aber onComplete-Ereignis von repository.getItems() nicht an alle Abonnenten geht.

Könnte jemand bitte helfen, herauszufinden, was der Grund für dieses Verhalten sein könnte?

Vielen Dank im Voraus!

+0

Sorry, ich verstehe nicht, was das Problem ist. Könnten Sie bitte mehr Informationen über Ihr Problem und Methoden-Signaturen der verwendeten Methoden geben? –

+0

@HansWurst Hallo! Ich habe gerade die Frage bearbeitet! Danke für Ihre Aufmerksamkeit! –

+0

Ich denke, es gibt ein Missverständnis. DoOnComplete wird aufgerufen, wenn das Observable beendet ist. Das Observable wird nur beendet, wenn alle von getItems() emulierten Werte verarbeitet wurden. Wenn alle Elemente mit Completeable.Complete abgeschlossen sind, wird das Observable abgeschlossen. Wenn ein Element, das flatMapped blockiert (Larg-Datei zum Hochladen), sieht es so aus, als ob nichts verarbeitet wird. –

Antwort

0

haben Sie einen Blick auf dieses Beispiel:

ich das Einzelteil durch jeden Schritt passieren, so wird die Zeichnung für jedes Element mitgeteilt werden, die verarbeitet worden ist. Die Verarbeitungspipeline beinhaltet das Hochladen und Löschen der Datei.

Bitte versuchen Sie, die Implementierung zu ändern und ein Protokoll der Ausgabe zu veröffentlichen.

@Test 
void name() throws Exception { 
    Flowable<Integer> completed_work = Flowable.just(1, 2, 3) 
      .map(integer -> integer * 1000) 
      .flatMapSingle(integer -> 
        Completable.fromAction(() -> { 
         Thread.sleep(integer); 
         // do upload stuff here 
        }) 
          .doOnComplete(() -> System.out.println("Uploaded file ....")) 
          //.timeout(10, TimeUnit.SECONDS) 
          .retry(3) 
          .andThen(
            Completable.fromAction(() -> { 
             // do delete stuff... 
            }) 
              .retry(2) 
              //.timeout(10, TimeUnit.SECONDS) 
              .doOnComplete(() -> System.out.println("Deleted file ...")) 
          ) 
          .toSingle(() -> integer) 
      ) 
      .doOnComplete(() -> System.out.println("Completed work")); 


    completed_work.test() 
      .await() 
      .assertResult(1000, 2000, 3000); 
} 
Verwandte Themen