2017-07-05 5 views
1

Ich bin neu in Rxjava und ich habe Probleme mit Flowable. Wenn onComplete Methode aufgerufen flatMap, onNext Methode in Teilnehmer nicht angerufen hat. Was ist hier falsch ?. Haben Sie einen Vorschlag?RxJava2 Flowable nicht zu OnNext Methode in Abonnenten

 ResourceSubscriber<List<Song>> subscriber = new ResourceSubscriber<List<Song>>() { 
     @Override 
     public void onStart() { 
      Log.e(TAG, "onStart"); 
      mView.showProgress(); 
     } 

     @Override 
     public void onNext(List<Song> songs) { 
      mView.onLocalMusicLoaded(songs); 
      mView.emptyView(songs.isEmpty()); 
      Log.e(TAG, "onNext"); 
      //Not call here after onComplete called 
     } 

     @Override 
     public void onError(Throwable t) { 
      mView.hideProgress(); 
      Log.e(TAG, "onError: ", t); 
     } 

     @Override 
     public void onComplete() { 
      mView.hideProgress(); 
      Log.e(TAG, "onComplete"); 
     } 
    }; 


    Disposable disposable = Flowable.just(cursor) 
      .flatMap(new Function<Cursor, Publisher<List<Song>>>() { 
       @Override 
       public Publisher<List<Song>> apply(@NonNull Cursor cursor) throws Exception { 
        final List<Song> songs = new ArrayList<>(); 
        if (cursor != null && cursor.getCount() > 0) { 
         cursor.moveToFirst(); 
         do { 
          Song song = cursorToMusic(cursor); 
          songs.add(song); 
         } while (cursor.moveToNext()); 
        } 
        return Flowable.create(new FlowableOnSubscribe<List<Song>>() { 
         @Override 
         public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception { 
          e.onNext(songs); 
          e.onComplete(); 
         } 
        }, BackpressureStrategy.LATEST); 
       } 
      }) 
      .doOnNext(new Consumer<List<Song>>() { 
       @Override 
       public void accept(@NonNull List<Song> songs) throws Exception { 
        Log.e(TAG, "onLoadFinished doOnNext: " + songs.size()); 
        Collections.sort(songs, new Comparator<Song>() { 
         @Override 
         public int compare(Song left, Song right) { 
          return left.getDisplayName().compareTo(right.getDisplayName()); 
         } 
        }); 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeWith(subscriber); 

    mCompositeDisposable.add(disposable); 

Antwort

0

Vom Javadoc von ResourceSubscriber:

Der Standard onStart() Anfragen Long.MAX_VALUE standardmäßig. Überschreiben Sie die Methode, um einen benutzerdefinierten positiven Betrag anzufordern. Verwenden Sie das geschützte Objekt request(long), um weitere Elemente anzufordern, und dispose(), um die Sequenz innerhalb einer onNext Implementierung abzubrechen.

@Override 
    public void onStart() { 
     Log.e(TAG, "onStart"); 
     mView.showProgress(); 
     request(Long.MAX_VALUE); 
    } 

danach auch Sie erfolgreich die Flowable.just Operator, warum Sie es über Flowable.create replizieren hat die skalare sogs Liste zu emittieren?

+0

Vielen Dank. Weil es nur ein Codebeispiel ist –

Verwandte Themen