2017-01-05 3 views
2

Ich bin neu in Rx, und ich würde wirklich ein wenig Hilfe bei der Fehlerbehandlung zu schätzen wissen. Ich habe den folgenden Code, der im Grunde ein typeahead ist:RxJS - Senden Sie weiterhin Benachrichtigungen, nachdem ein Fehler in onNext

var observable = Rx.Observable.fromEvent(searchField, 'keyup') 
    .map(ev => ev.target.value) 
    .filter(text => text.length > 2) 
    .debounce(500 /* ms */) 
    .distinctUntilChanged() 
    .flatMapLatest(getAsyncSearchResults); 

observable.subscribe(function(value) { 
     console.log("onNext"); 
     throw "error"; // THIS TERMINATES THE OBSERVABLE 
    }, 
    function(error) { 
     console.log("Error"); 
    }, 
    function() { 
     console.log("Completed"); 
    }); 

beobachtbaren endet nachdem eine Ausnahme in OnNext Funktion auftritt. Dies scheint mir ein unerwünschtes Verhalten zu sein, da ich nicht meinen ganzen onNext-Code in einen try-catch einpacken möchte. Gibt es irgendeine Möglichkeit, dass ich dem Observablen mitteilen könnte, die Benachrichtigung immer weiter zu machen, egal welche Ausnahme auftritt?

+1

Ich weiß nicht, Ihren Zweck einen Fehler zu werfen in der Subscribe-Methode, aber das ist eine sehr un-RxJS-y-Art, Dinge zu tun. Ich würde Ihr Muster überdenken und reaktive Fehlerbehandlung in die Pipeline integrieren. –

Antwort

0

Das ist das Verhalten von Rx auf jeder Plattform, und die einzige Möglichkeit, Ausnahmen bei Beobachtern zu erfassen, ist ... nun ... sie zu fangen.

Sie könnten Ihre eigene subscribeSafe Methode erstellen, die einen Rückruf in try/catch umschließt, wenn Sie diese Notwendigkeit oft haben.

Alternativ können Sie, wenn Sie die Teile, die vom Beobachter geworfen werden, in den Stream selbst verschieben, den Fehlerfluss steuern und (zum Beispiel) erneut versuchen.

Beispiel unten:

// simulates (hot) keyup event 
 
const obs1 = Rx.Observable.interval(1000).publish() 
 
obs1.connect() 
 

 
function getAsyncSearchResults(i) { 
 
    return Rx.Observable.timer(500).mapTo(i) 
 
} 
 

 
obs1.switchMap(getAsyncSearchResults).do(x => { 
 
    console.log(x) // all events gets logged 
 
    throw new Error() 
 
}).retry().subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>

3

Sie sollten wirklich Ihre next, completed und error Handler sicher für Ausnahmen machen. Es gibt keinen vernünftigen Weg für den Quellenstrom, ein Abonnement "wiederaufzunehmen", wenn die Teilnehmerfehler auftreten - die einzige vernünftige Vorgehensweise besteht darin, das Gespräch mit dem Teilnehmer zu beenden.

Für eine vollständige Diskussion, warum das ist, siehe meine Antwort How to handle exceptions thrown by observer's onNext in RxJava? - das gilt auch für jede Plattform-Implementierung von Rx.

Futhermore finden Why is the OnError callback never called when throwing from the given subscriber? für eine gute Erklärungs Metapher (die Zeitungshändler) der Überlegungen - insbesondere die Notwendigkeit zu prüfen, da andere Teilnehmer sein kann, die keinen Fehler in ihrer next Handler geworfen haben.

0

Sie müssen zu einem neuen Einweg-Stream wechseln, und wenn ein Fehler auftritt, wird er sicher entsorgt und der ursprüngliche Stream bleibt erhalten.

Ich habe Ihren Code in einen einfacheren Stream von 0 - 5 um einfach zu demonstrieren. Es wirft einen Fehler/Ausnahme, wenn die Zahl 3.

Rx.Observable.from([0,1,2,3,4,5]) 
 
    .switchMap(value => { 
 

 
     // This is the disposable stream! 
 
     // Errors can safely occur in here without killing the original stream 
 

 
     return Rx.Observable.of(value) 
 
      .map(value => { 
 
       if (value === 3) { 
 
        throw new Error('Value cannot be 3'); 
 
       } 
 
       return value; 
 
      }) 
 
      .catch(error => { 
 
       // You can do some fancy stuff here with errors if you like 
 
       // Below we are just returning the error object to the outer stream 
 
       return Rx.Observable.of(error); 
 
      }); 
 

 
    }) 
 
    .map(value => { 
 
     if (value instanceof Error) { 
 
      // Maybe do some error handling here 
 
      return `Error: ${value.message}`; 
 
     } 
 
     return value; 
 
    }) 
 
    .subscribe(
 
     (x => console.log('Success', x)), 
 
     (x => console.log('Error', x)), 
 
     (() => console.log('Complete')) 
 
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

Mehr Informationen zu dieser Technik in dieser Blog-Post ist: The Quest for Meatballs: Continue RxJS Streams When Errors Occur

Verwandte Themen