2017-01-19 1 views
1

Ich habe einigen Strom Artikel Unter der Annahme:Datenbank (SQLite) Transaktion für Downstream-Betreiber mit rxJava

Observable<Item> stream = ... 

, was ich erreichen möchte?

  • Stream hat eine beliebige Anzahl von Operationen. Alle Operationen, bevor eine Transaktion beginnen sollten außerhalb der Transaktion
  • Irgendwie Transaktion des Stroms in der Mitte db.beginTransaction()
  • Alle Betreiber nach dem Start Transaktion innerhalb der Transaktion ausgeführt werden sollten beginnen laufen haben
  • Transaktion im Fall abgeschlossen werden von erfolgreichen Operationen db.setTransactionSuccessful
  • Transaktion hat immer db.endTransaction
  • Es beiden Schnipsel haben wird groß sein, beendet sein: offen eine Transaktion in Downstream-Aktivitäten für alle Positionen; Transaktion öffnen und schließen für jedes Artikel in dem Strom

//some upstream operators 
stream.doOnNext(i -> ...) 
    .map(i -> ...) 
    //somehow start transaction here 
    //operator inside transaction. All database changes will be reverted in case error 
    .doOnNext(i -> /*database ops*/) 
    .subscribe() 

PS: db ist Anwendungsinstanz beschreibbarer scoped SQLiteDatabase

ich nun eine Lösung. Aber vielleicht hast du irgendwelche Vorschläge für einen saubereren Weg?

+0

Möchten Sie separate Transaktionen für jedes der 'Items von der Quelle kommen? Was ist 'Datenbank'? Ist es eine Verbindung für dieses bestimmte Element oder eine globale Variable? Wenn es global ist, würde die Transaktion auch global sein.Möchten Sie, dass die Transaktion so lange dauert, bis alle Artikel verarbeitet wurden? –

+0

'Datenbank' ist eine globale Instanz (Anwendungsbereich) der schreibbaren' SQLiteDatabase'. Ich möchte eine Transaktion für die Verarbeitung aller Stream-Elemente öffnen. Aber auch neue Transaktion für jeden Artikel ist ein interessanter Ansatz. Sie haben erwähnt, dass eine solche Transaktion global ist, ist es etwas einschränkt? – Beloo

+1

Globale Transaktion sieht für mich komisch aus. Das bedeutet, dass eine einzelne Transaktion alle Datenbankaktivitäten in Ihrem Programm gleichzeitig umfasst. Daher sollte es nicht viel ausmachen, wann genau Sie Ihre Transaktion starten. Jede Operation in Ihrer Pipeline wird für jeden Artikel wiederholt. Der erste Artikel öffnet die Transaktion, und * alle * Operationen für nachfolgende Artikel werden innerhalb dieser Transaktion sein. –

Antwort

2

1) Für den Fall, wenn alle Elemente in einzelner Transaktion verarbeitet:

stream 
    .doOnSubscribe(d -> database.beginTransaction()) 
    . ... 
    .subscribe(v -> {...}, 
     e -> database.endTransaction(), 
     () -> { database.setTransactionSuccessful(); database.endTransaction(); }) 

2) Für den Fall, dass es separate Transaktion für jeden Artikel ist:

class ItemWithTransaction { 
    Item item; 
    Connection conn; // connection associated with this item 
    boolean rollback; 
} 

stream 
    .map(i -> new ItemWithTransaction(i, openTransaction())) 
    .map(i -> i.conn.executeSql(..., i.item.prop1)) 
    . ... 
    .map(i -> { 
     if (...) i.rollback = true; // error related to this item 
     return i; 
    }) 
    . ... 
    .subscribe(i -> { 
      ... 
      if (i.rollback) i.conn.rollback(); 
      else i.conn.commit(); 
      i.conn.close(); 
     }, 
     e -> rollbackAndCloseAllOpenConnections(), 
     () -> {...}) 

Im Allgemeinen würde ich nicht gehen Sie mit dem zweiten Ansatz, da es möglicherweise zu viele (unkontrollierte) gleichzeitige Datenbankverbindungen erfordert.

3) Sie sollten Ihren Code besser umstrukturieren, so dass Sie zuerst alle erforderlichen Informationen sammeln und anschließend die Datenbank in einem Vorgang in kurzer Transaktion aktualisieren. Dies ist, wie ich es tun würde:

stream 
    . ... // processing 
    .buffer(...) // collect updates all or in batches 
    .subscribe(Collection<ItemUpdate> batch -> { 
      database.beginTransaction(); 
      try { 
       ... // update multiple items 
       database.setTransactionSuccessful(); 
      } finally { 
       database.endTransaction(); 
      } 
     }, 
     e -> {...}, 
     () -> {...}); 
+0

stimme ich zu. Der erste Ansatz ist besser, aber in Ihrer Implementierung wird die Datenbankverbindung in 'doOnSubscribe' geöffnet, so dass alle Stream-Operationen betroffen sind, einschließlich Upstream. Wenn zum Beispiel der Stream von 'Retrofit' aus beobachtbar ist, wird die Transaktion viel länger geöffnet, als sie eigentlich sein sollte, deshalb habe ich' doOnNext' dafür verwendet. – Beloo

+0

@Beloo Wenn es mehr als ein Element im Stream gibt, wird es sich sowieso auf den Upstream auswirken. Ich habe eine echte Lösung als Nummer 3 zu meiner Antwort hinzugefügt. –

1

ich geschaffen habe einen Transformator eine Transaktion für alle Positionen des Stroms zu erreichen:

/** @return transformer which starts SQLite database transaction for each downstream operators, 
* closes transaction in {@link Observable#doOnTerminate}. So transaction will be closed either on successful completion or error of stream. 
* set previously opened SQLite database transaction to completed in {@link Observable#doOnCompleted} call */ 
public <T> Observable.Transformer<T, T> inTransaction() { 
    return observable -> observable 
      .doOnNext(o -> { 
       if (!database.inTransaction()) database.beginTransaction(); 
      }) 
      .doOnCompleted(() -> { 
       if (database.inTransaction()) database.setTransactionSuccessful(); 
      }) 
      .doOnTerminate(() -> { 
       if (database.inTransaction()) database.endTransaction(); 
      }); 

Und nannte es:

stream 
    //start transaction here 
    .compose(inTransaction()) 
    .doOnNext(i -> /*database ops*/) 
    .subscribe() 

Beachten Sie, dass i Transaktion in .doOnNext starten und jedes Mal überprüfen wenn die Transaktion bereits gestartet wurde, weil es scheint, dass es unmöglich ist, sie nur beim ersten Mal anzurufen.

+0

Funktioniert das für Sie? Ich benutze 'ActiveAndroid'. @Beloo –

+0

ja, scheint, dass es wie erwartet funktioniert. mit grüner dao-datenbank, gedanke – Beloo

Verwandte Themen