Hallo RxJava Meister zu ändern,RxJava Faden nach concat Karte
In meinem aktuellen Android-Projekt, traf ich einige Deadlock Probleme während mit RxJava und SQLite zu spielen. Mein Problem ist:
- ich eine Transaktion an einem Faden
- Anruf ein Web-Service starten und ein paar Sachen in der Datenbank
- speichern concat eine andere beobachtbare Funktion
- try Karte andere Sachen auf die Datenbank zu schreiben ---> eine Sackgasse erhalten
Hier ist mein Code:
//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
Observable.just(null)
/* Go to known thread to open db transaction */
.observeOn(mScheduler)
.doOnNext(o -> myStore.startTransaction())
/* Do some treatments that change thread */
.someWebServiceCallWithRetrofit()
/* Return to known thread to save items in db */
.observeOn(mScheduler)
.flatMap(items -> saveItems(items))
.subscribe();
public Observable<Node> saveItems(List<Item> items) {
Observable.from(items)
.doOnNext(item -> myStore.saveItem(item)) //write into the database OK
.concatMap(tab -> saveSubItems(item));
}
public Observable<Node> saveSubItems(Item item) {
return Observable.from(item.getSubItems())
.doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}
Warum ändert plötzlich RxJava Thread? Selbst wenn ich spezifiziert habe, dass ich möchte, dass er auf meinem eigenen Scheduler beobachtet. Ich habe eine dreckige Korrektur gemacht, indem ich noch ein weiteres observeOn vor saveSubItem hinzugefügt habe, aber das ist wahrscheinlich nicht die richtige Lösung.
Ich weiß, dass wenn Sie einen Webservice mit Retrofit aufrufen, die Antwort an einen neuen Thread weitergeleitet wird (deshalb habe ich meinen eigenen Scheduler erstellt, um wieder in den Thread zu kommen, den ich mit meiner SQL-Transaktion gestartet habe). Aber ich verstehe wirklich nicht, wie RxJava die Threads verwaltet.
Vielen Dank für Ihre Hilfe.