2017-01-13 6 views
1

Ich habe Schwierigkeiten, das richtige Verhalten zu verstehen, um eine stabile Lösung für PublishSubject in RxJava zu implementieren. Ich habe an verschiedenen Stellen gelesen, dass onError ... die richtige Lösung ist, aber ich habe nicht getippt, wo ich den Code dafür platzieren soll. Bitte sehen Sie ein vereinfachtes Beispiel als Teil eines JUnit-Tests. Die Tests, die die Probleme hervorzuheben sind shouldHandleExceptionsForMultipleSubsSuccessfulOnesContinuing und shouldHandleErrorSuccessfulCallForRetryVersionWithSuccessfulOnesContinuing die zeigen, dass die gesamte Verarbeitung für den Bus stoppt, nachdem eine Ausnahme aufgetreten ist.Robuste Fehlerbehandlung in RxJava

import static org.mockito.Mockito.doThrow; 
import static org.mockito.Mockito.times; 
import static org.mockito.Mockito.verify; 
import static org.mockito.Mockito.verifyNoMoreInteractions; 

import org.junit.Before; 
import org.junit.Test; 
import org.junit.runner.RunWith; 
import org.mockito.Mock; 
import org.mockito.runners.MockitoJUnitRunner; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.web.client.RestClientException; 

import rx.Observable; 
import rx.Subscriber; 
import rx.subjects.PublishSubject; 
import rx.subjects.SerializedSubject; 
import rx.subjects.Subject; 

@RunWith(MockitoJUnitRunner.class) 
public class TestSubPubRobustness { 

    private static final Integer INTEGER_VALUE_OF_14 = Integer.valueOf(14); 

    private static final Integer INTEGER_VALUE_OF_12 = Integer.valueOf(12); 

    @Mock 
    private ValidatoryInterface mockInterface; 

    private static final Logger logger = LoggerFactory.getLogger(TestSubPubRobustness.class); 

    PublishSubject<Integer> subject; 
    Subject<Integer, Integer> inboundMessageBus; 

    @Before 
    public void setUp() throws Exception { 
     subject = PublishSubject.create(); 
     inboundMessageBus = new SerializedSubject<>(subject); 
    } 

    @Test 
    public void shouldHandleSimplySuccessfulCall() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.subscribe(eventHandler); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    @Test 
    public void shouldHandleMultipleSuccessfulCall() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.subscribe(eventHandler); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_14); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_14); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    @Test 
    public void shouldHandleSimplySuccessfulCallForRetryVersion() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.retry().subscribe(eventHandler); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    @Test 
    public void shouldHandleErrorSuccessfulCallForRetryVersion() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.retry().subscribe(eventHandler); 
     doThrow(new RuntimeException()).when(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    @Test 
    public void shouldHandleErrorSuccessfulCallForRetryVersionWithSuccessfulOnesContinuing() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.onExceptionResumeNext(Observable.empty()) 
         .subscribe(eventHandler); 
     doThrow(new RestClientException("error")).when(mockInterface) 
               .haveBeenCalled(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_14); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_14); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    @Test 
    public void shouldHandleMultipleSubsSuccessfulOnesContinuing() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     final TestEventHandler additionalEventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.retry().subscribe(eventHandler); 
     inboundMessageBus.retry().subscribe(additionalEventHandler); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_14); 
     verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_14); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    @Test 
    public void shouldHandleExceptionsForMultipleSubsSuccessfulOnesContinuing() { 
     final TestEventHandler eventHandler = new TestEventHandler(mockInterface); 
     final TestEventHandler additionalEventHandler = new TestEventHandler(mockInterface); 
     inboundMessageBus.asObservable() 
         .onErrorReturn(error -> Integer.MAX_VALUE) 
         .retry() 
         .subscribe(eventHandler); 
     inboundMessageBus.asObservable() 
         .onErrorReturn(error -> Integer.MAX_VALUE) 
         .retry() 
         .subscribe(additionalEventHandler); 
     doThrow(new RuntimeException()).when(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_12); 
     inboundMessageBus.onNext(INTEGER_VALUE_OF_14); 
     verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_12); 
     verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_14); 
     verifyNoMoreInteractions(mockInterface); 
    } 

    private final class TestEventHandler extends Subscriber<Integer> { 

     private final ValidatoryInterface validatoryInterface; 

     public TestEventHandler(final ValidatoryInterface validatoryInterface) { 
      this.validatoryInterface = validatoryInterface; 
     } 

     @Override 
     public void onCompleted() { 
      logger.debug("Completed"); 
     } 

     @Override 
     public void onError(final Throwable e) { 
      logger.error("Argggggggg", e); 
     } 

     @Override 
     public void onNext(final Integer t) { 
      logger.debug("Next", t); 
      validatoryInterface.haveBeenCalled(t); 
     } 

    } 

    private interface ValidatoryInterface { 
     void haveBeenCalled(Integer testNumber); 
    } 
} 

Antwort

1

AFAIK Sie können ein Observable nicht einmal in einem Fehlerzustand fortsetzen. Früher gab es dafür einen Operator: onErrorFlatMap. Aber es wurde in issue 1465 veraltet. Sie können es für eine gründlichere Erklärung dort lesen, aber grundsätzlich ist die Logik, dass, sobald ein Observable im Fehlerzustand ist, es nie wieder etwas ausstrahlen sollte. Es ist verbraucht. Dies hängt mit dem Vertrag zusammen, onError nur einmal und nur einmal zu liefern. Sie können "neu starten", indem Sie die Verbindung zur Quelle wiederherstellen, aber das "Wiederherstellen" ist verboten.

So ein Observable wird nicht wie ein Nachrichtenbus verhalten: Es wird aufhören, Nachrichten zu liefern, sobald es einen Fehler geliefert hat. Die Fehlerbehandlungsoperatoren stellen den Stream nie wieder her, sondern starten ihn neu oder wechseln zu anderen Streams, sobald Fehler auftreten.

In Ihrem Beispiel sagen Sie onErrorReturn(e -> MAX_VALUE), die den Stream durch einen Strom ersetzen wird, der nur MAX_VALUE enthält, sobald ein Fehler im Quelldatenstrom auftritt, der effektiv mit MAX_VALUE endet, wenn ein Fehler auftritt. Dann sagen Sie retry(), was bedeutet, dass der Stream neu gestartet werden sollte, wenn ein Fehler auftritt. AFAIK diese beiden sind widersprüchlich.

Soweit Ratschläge gehen, kann ich nur empfehlen, Ihre Streams als flüchtig zu behandeln. Es gibt einige Problemumgehungen wie materialize, die im Ticket erwähnt werden, aber die Art und Weise, wie ich es normalerweise behandle ist, neue Streams zu erstellen. In HTTP-Servern beispielsweise erstelle ich einen Stream pro Anfrage, anstatt einen Stream, der alle Anfragen bearbeitet. Auf diese Weise werden Ausnahmen für einzelne Anfragen isoliert.

Es wäre nett zu hören, wenn benjchristensen irgendwelche neuen Ideen hat seit dieser Ausgabe geschrieben wurde.

+0

Also eine gültige Behandlung hier in den Beobachter onError Empfangen wäre wieder die Observer-Instanz abonnieren? – KramKroc

+0

Ja, AFAICR die Observable, die den Fehler lieferte, setzt sich selbst in einen Fehlerzustand und weigert sich, weitere Nachrichten zu liefern, aber observables früher in der Kette funktionieren noch. Sie müssen also wissen, woher der Fehler stammt. Wenn Sie die Kette nach und nach dem Fehler neu initialisieren können, ist es gut, wieder zu gehen. –