2016-08-21 2 views
0

Es ist, wie ich meine Daten mit RxJava sparen:SQLite Transaktion und RxJava

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnUnsubscribe { dbKeeper.endTransaction() } 
} 

Und dann benutze ich diese Methode wie folgt:

 notesManager.put(note) 
      .switchMap { notesManager.getHashtags() } 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe {view.setHashtags(it) } 

Und doOnUnsubscribe nie als getHashtags() genannt versucht, von SELECT db, die von startTransaction() gesperrt wurde. Deadlock, heh. Okay. Lassen Sie uns doOnUnsubscribe(...) durch doOnTerminate(...) ersetzen.

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .map { notesMapper.transform(note) } 
      .flatMap { notesDataStore.put(it) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnTerminate { dbKeeper.endTransaction() } 
} 

Aber jetzt Transaktion nicht schließen, wenn Observable wird durch subscriber.unsubscribe() unterbrochen werden.

Was können Sie empfehlen, um meine Situation zu lösen?

Zusätzliche Informationen: Ich benutze eine writableDb-Instanz zum Schreiben/Lesen von Daten.

Antwort

1

Ich würde sagen, das ist keine gute Lösung für Rx; Meine Empfehlung wäre, um die Transaktion zu tun mit dem üblichen Anprobe schließlich (verzeihen Sie das Java):

Observable<Note> put(Note note) { 
    return just(note) 
     .doOnNext(n -> { 
      validateNote(n); 
      dbKeeper.startTransaction(); 
      try { 
       storeHashtags(n); 
       storeImages(n); 
       notesDataStore.put(notesMapper.transform(n)); 
       dbKeeper.setTransactionSuccessful(); 
      } finally { 
       dbKeeper.endTransaction(); 
      } 
     }); 
} 

Edit: Vielleicht ist dies nützlich:

fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> { 
    val latch = AtomicInteger(1) 
    val maybeEndTransaction = Action0 { 
     if (latch.decrementAndGet() == 0) { 
      endTransaction() 
     } 
    } 
    return Observable.empty<T>() 
      .doOnCompleted { startTransaction() } 
      .mergeWith(sourceObservable) 
      .doOnNext { setTransactionSuccessful() } 
      .doOnTerminate(maybeEndTransaction) 
      .doOnUnsubscribe(maybeEndTransaction) 
} 

es wie folgt verwendet:

override fun put(note: Note): Observable<Note> { 
    return Observable.just(note) 
     .doOnNext { validateNote(note) } 
     .doOnNext { storeHashtags(note) } 
     .doOnNext { storeImages(note) } 
     .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
     .map { notesMapper.transform(it) } 
     .compose { withTransaction } 
} 

Es wird sichergestellt, dass die Transaktion genau einmal endet; Seien Sie jedoch vorsichtig, dass Sie die Threads nicht innerhalb Ihrer ursprünglichen beobachtbaren Kette wechseln (es sei denn, Ihre Transaktion amanger kann damit umgehen, oder Sie haben Ihre Scheduler geändert, um neue Threads bestehenden Transaktionen zuzuordnen).

+0

Ja, ich habe schon verstanden, dass Rx für diesen Fall nicht geeignet ist. Aber es ist zu spät, um die Architektur des Projekts zu ändern. Danke für deine Lösung. Vielleicht werde ich 'BlockingObservable' für' notesDataStore.put() 'und ähnliche Methoden verwenden. – Alexandr

+1

Ich habe eine Bearbeitung hinzugefügt. –

+0

Sieht interessant aus, ich werde es versuchen. Vielen Dank! – Alexandr