Ich habe Schwierigkeiten, das richtige Verhalten zu verstehen, um eine stabile Lösung für PublishSubject in RxJava zu implementieren. Ich habe an verschiedenen Stellen gelesen, dass onError ... die richtige Lösung ist, aber ich habe nicht getippt, wo ich den Code dafür platzieren soll. Bitte sehen Sie ein vereinfachtes Beispiel als Teil eines JUnit-Tests. Die Tests, die die Probleme hervorzuheben sind shouldHandleExceptionsForMultipleSubsSuccessfulOnesContinuing und shouldHandleErrorSuccessfulCallForRetryVersionWithSuccessfulOnesContinuing die zeigen, dass die gesamte Verarbeitung für den Bus stoppt, nachdem eine Ausnahme aufgetreten ist.Robuste Fehlerbehandlung in RxJava
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestClientException;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
@RunWith(MockitoJUnitRunner.class)
public class TestSubPubRobustness {
private static final Integer INTEGER_VALUE_OF_14 = Integer.valueOf(14);
private static final Integer INTEGER_VALUE_OF_12 = Integer.valueOf(12);
@Mock
private ValidatoryInterface mockInterface;
private static final Logger logger = LoggerFactory.getLogger(TestSubPubRobustness.class);
PublishSubject<Integer> subject;
Subject<Integer, Integer> inboundMessageBus;
@Before
public void setUp() throws Exception {
subject = PublishSubject.create();
inboundMessageBus = new SerializedSubject<>(subject);
}
@Test
public void shouldHandleSimplySuccessfulCall() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.subscribe(eventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleMultipleSuccessfulCall() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.subscribe(eventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleSimplySuccessfulCallForRetryVersion() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.retry().subscribe(eventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleErrorSuccessfulCallForRetryVersion() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.retry().subscribe(eventHandler);
doThrow(new RuntimeException()).when(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleErrorSuccessfulCallForRetryVersionWithSuccessfulOnesContinuing() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.onExceptionResumeNext(Observable.empty())
.subscribe(eventHandler);
doThrow(new RestClientException("error")).when(mockInterface)
.haveBeenCalled(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleMultipleSubsSuccessfulOnesContinuing() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
final TestEventHandler additionalEventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.retry().subscribe(eventHandler);
inboundMessageBus.retry().subscribe(additionalEventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleExceptionsForMultipleSubsSuccessfulOnesContinuing() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
final TestEventHandler additionalEventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.asObservable()
.onErrorReturn(error -> Integer.MAX_VALUE)
.retry()
.subscribe(eventHandler);
inboundMessageBus.asObservable()
.onErrorReturn(error -> Integer.MAX_VALUE)
.retry()
.subscribe(additionalEventHandler);
doThrow(new RuntimeException()).when(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
private final class TestEventHandler extends Subscriber<Integer> {
private final ValidatoryInterface validatoryInterface;
public TestEventHandler(final ValidatoryInterface validatoryInterface) {
this.validatoryInterface = validatoryInterface;
}
@Override
public void onCompleted() {
logger.debug("Completed");
}
@Override
public void onError(final Throwable e) {
logger.error("Argggggggg", e);
}
@Override
public void onNext(final Integer t) {
logger.debug("Next", t);
validatoryInterface.haveBeenCalled(t);
}
}
private interface ValidatoryInterface {
void haveBeenCalled(Integer testNumber);
}
}
Also eine gültige Behandlung hier in den Beobachter onError Empfangen wäre wieder die Observer-Instanz abonnieren? – KramKroc
Ja, AFAICR die Observable, die den Fehler lieferte, setzt sich selbst in einen Fehlerzustand und weigert sich, weitere Nachrichten zu liefern, aber observables früher in der Kette funktionieren noch. Sie müssen also wissen, woher der Fehler stammt. Wenn Sie die Kette nach und nach dem Fehler neu initialisieren können, ist es gut, wieder zu gehen. –