2016-07-06 7 views
4

Hallo RxJava Meister zu ändern,RxJava Faden nach concat Karte

In meinem aktuellen Android-Projekt, traf ich einige Deadlock Probleme während mit RxJava und SQLite zu spielen. Mein Problem ist:

  1. ich eine Transaktion an einem Faden
  2. Anruf ein Web-Service starten und ein paar Sachen in der Datenbank
  3. speichern concat eine andere beobachtbare Funktion
  4. try Karte andere Sachen auf die Datenbank zu schreiben ---> eine Sackgasse erhalten

Hier ist mein Code:

//define a scheduler for managing transaction in the same thread 
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 


Observable.just(null) 
      /* Go to known thread to open db transaction */ 
      .observeOn(mScheduler) 
      .doOnNext(o -> myStore.startTransaction()) 
      /* Do some treatments that change thread */ 
      .someWebServiceCallWithRetrofit() 
      /* Return to known thread to save items in db */ 
      .observeOn(mScheduler) 
      .flatMap(items -> saveItems(items)) 
      .subscribe(); 

public Observable<Node> saveItems(List<Item> items) { 
    Observable.from(items) 
      .doOnNext(item -> myStore.saveItem(item)) //write into the database OK 
      .concatMap(tab -> saveSubItems(item)); 
} 

public Observable<Node> saveSubItems(Item item) { 
    return Observable.from(item.getSubItems()) 
      .doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different 
} 

Warum ändert plötzlich RxJava Thread? Selbst wenn ich spezifiziert habe, dass ich möchte, dass er auf meinem eigenen Scheduler beobachtet. Ich habe eine dreckige Korrektur gemacht, indem ich noch ein weiteres observeOn vor saveSubItem hinzugefügt habe, aber das ist wahrscheinlich nicht die richtige Lösung.

Ich weiß, dass wenn Sie einen Webservice mit Retrofit aufrufen, die Antwort an einen neuen Thread weitergeleitet wird (deshalb habe ich meinen eigenen Scheduler erstellt, um wieder in den Thread zu kommen, den ich mit meiner SQL-Transaktion gestartet habe). Aber ich verstehe wirklich nicht, wie RxJava die Threads verwaltet.

Vielen Dank für Ihre Hilfe.

Antwort

0

Nach meinem Wissen doOnNext Methode wird in anderen Thread aufgerufen, als der Code davor, weil es asynchron vom Rest der Sequenz läuft.

Beispiel: Sie können mehrere Ruheanrufe tätigen, speichern Sie sie in der Datenbank und innerhalb doOnNext(...) informieren Sie eine Ansicht/Presenter/Controller eines Progres. Sie können dies vor dem Speichern in der Datenbank oder/und nach dem Speichern in der Datenbank tun. Was ich vorschlagen würde ist "FlatMapping" einen Code.

So ist die saveItems Methode würde wie folgt aussehen (wenn myStore.saveSubItems gibt ein Ergebnis zurück):

public Observable<Node> saveSubItems(Item item) { 
return Observable.from(item.getSubItems()) 
     .flatMap(subItem -> myStore.saveSubItems(subItem)) 
} 

Mit Garantien „flatMapping“, dass der Betrieb auf dem gleichen Thread wie die vorherige Sequenz ausgeführt wird, und die Sequenz geht dann weiter flaMap Funktion endet.

1

Die Side-Effect-Operatoren (wie auch flatMap) werden synchron auf jedem Thread ausgeführt, der sie aufruft. Versuchen Sie etwas wie

Observable.just(null)    
      .doOnNext(o -> myStore.startTransaction()) 
      .subscribeOn(mScheduler)  // Go to known thread to open db transaction 
      /* Do some treatments that change thread */ 
      .someWebServiceCallWithRetrofit()      
      .flatMap(items -> saveItems(items)) 
      .subscribeOn(mScheduler) // Return to known thread to save items in db 
      .observeOn(mScheduler) // Irrelevant since we don't observe anything 
      .subscribe();