2016-03-08 10 views
7

ich verwende RxJava 1.1 eine beobachtbare Sequenz aus dem Innern einer Feder Anwendung zu komponieren, die wie folgt aussieht:Transaktion Rollback in einer reaktiven Anwendung

@Transaction 
public Observable<Event> create(Event event) { 
    return Observable.just(event) 
      .flatMap(event -> { 
       //save event to db (blocking JPA operation) 
       Event event = eventRepository.save(event); 
       return Observable.just(event); 
      }) 
      //async REST call to service A 
      .flatMap(this::sendEventToServiceA) <---- may execute on different thread 
      //async REST call to service B 
      .flatMap(this::sendEventToServiceB) <---- may execute on different thread 
      .doOnError(throwable -> { 
       // ? rollback initally created transaction? 
      }) 
} 
von einer Controller-Klasse

Ein Ereignis erreicht die Dienstschicht meiner Bewerbung und dies breitet sich durch eine Kette von Operationen aus, die mit der Funktion flatMap() von RxJava erstellt wurden. Das Ereignis wird zuerst in der Datenbank (Spring Data) gespeichert, und die nächsten beiden asynchronen HTTP-Anforderungen werden nacheinander ausgeführt, wobei die AsyncRestTemplate-Bibliothek von Spring im Hintergrund verwendet wird.

Wenn ein Fehler/Ausnahme irgendwo in der Pipeline geworfen wird, würde ich gerne in der Lage sein, die Datenbanktransaktion Rollback, so dass das Ereignis nicht in der Datenbank gespeichert wird. Ich habe festgestellt, dass dies nicht einfach ist, da der Transaktionskontext im Frühjahr dem bestimmten Ausführungsstrang zugeordnet ist. Wenn der Code den doOnError-Callback für einen anderen Thread erreicht (AsyncRestTemplate verwendet seinen eigenen AsyncTaskExecutor), ist es nicht möglich, die ursprünglich erstellte Transaktion zurückzusetzen.

Können Sie bitte irgendeinen Mechanismus beraten, um Transaktionen über eine Multithread-Anwendung zu erreichen, die aus mehreren asynchronen Operationen besteht, die auf diese Weise geschrieben wurden?

ich auch eine Transaktion programmatisch mit zu erstellen versucht haben:

TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); 

und dann zusammen mit dem Ereignisse in der Pipeline, aber auch hier die Transaction Objekt senden, wenn ein Fehler auftritt, und ich rufe „platformTransactionManager.rollback (Status); "Ich bekomme" Transaktionssynchronisation ist nicht aktiv ", da dies auf einem anderen Thread läuft, denke ich.

p.s. Die sendEventToServiceA/sendEventToServiceB Methoden ähnlich aussehen wie diese:

public Observable<Event> sendEventToServiceA(event) { 
    .......... 
    ListenableFuture<ResponseEntity<String>> listenableFuture = asyncRestTemplate.exchange(
       "/serviceA/create?event_id=" + event.id, 
       HttpMethod.POST, requestEntity, String.class); 

    return ObservableUtil.toRxObservable(listenableFuture); 
} 

Antwort

3

Eine Möglichkeit, dies zu tun ist, um sicherzustellen, dass der Fehler auf dem gleichen Thread wie der db beobachtet sparen:

@Transaction 
public Observable<Event> create(Event event) { 

    Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 
    return Observable.just(event) 
      .flatMap(event -> { 
       //save event to db (blocking JPA operation) 
       Event event = eventRepository.save(event); 
       return Observable.just(event); 
      }) 
      .subscribeOn(scheduler) 
      //async REST call to service A 
      .flatMap(this::sendEventToServiceA) <---- may execute on different thread 
      //async REST call to service B 
      .flatMap(this::sendEventToServiceB) <---- may execute on different thread 
      .observeOn(scheduler) 
      .doOnError(throwable -> { 
       // ? rollback initally created transaction? 
      }) 
} 
+0

Dank Dave! Ihre Lösung mit dem Scheduler scheint gut zu funktionieren. – odybour

+0

Ein kleines Problem, das ich dabei fand, war, dass die flatMap-Methode, die das Speichern in der Datenbank ausführt, aufgrund der Annotation in einem anderen Thread als demjenigen ausgeführt wird, der die Transaktion an erster Stelle erstellt hat. Um dies zu umgehen, habe ich die Transaktion programmatisch innerhalb der flatMap-Methode kurz vor dem Speichervorgang erstellt, dann die Transaktion in ein Context-Objekt gespeichert, das ich die beobachtbare Pipeline durchgebe, und im doOnError mache ich so etwas: 'transactionManager.rollback (context. getTransaction()); '. – odybour