2016-12-23 2 views
5

Die Stornierung von der Verbraucherseite kann mit takeUntil aufgerufen werden, aber das ist nicht unbedingt sehr dynamisch. In diesem Fall möchte ich jedoch ein Observable von der Produzentenseite der Gleichung streichen, genauso wie Sie ein Promise innerhalb einer Versprechenskette stornieren möchten (was mit dem nativen Dienstprogramm nicht möglich ist).Eine Observable von der Herstellerseite abbrechen, nicht von der Verbraucherseite

Angenommen, ich habe diese Observable von einer Methode zurückgegeben. (Diese Warteschlangenbibliothek ist eine einfache persistente Warteschlange, die in eine Textdatei schreibt und schreibt, wir müssen Lese-/Schreibvorgänge sperren, damit nichts beschädigt wird).

Queue.prototype.readUnique = function() { 

    var ret = null; 
    var lockAcquired = false; 

    return this.obsEnqueue 
     .flatMap(() => acquireLock(this)) 
     .flatMap(() => { 
      lockAcquired = true; 
      return removeOneLine(this) 
     }) 
     .flatMap(val => { 
      ret = val; // this is not very good 
      return releaseLock(this); 
     }) 
     .map(() => { 
      return JSON.parse(ret); 
     }) 
     .catch(e => { 
      if (lockAcquired) { 
       return releaseLock(this); 
      } 
      else { 
       return genericObservable(); 
      } 
     }); 

}; 

Ich habe 2 verschiedene Fragen -

  1. Wenn ich die Sperre nicht erwerben kann, wie kann ich „cancel“ die beobachtbaren, nur ohne Ergebnis eine leere beobachtbare zurückschicken (n)? Müsste ich bei jedem Rückruf wirklich if/else-Logik machen, um zu entscheiden, ob die aktuelle Kette abgebrochen wird, und wenn ja, eine leere Observable zurückgeben? Mit leer, ich meine ein Observable, dass einfach onNext/onComplete ohne Möglichkeit für Fehler und ohne irgendwelche Werte für onNext feuert. Technisch gesehen, glaube ich nicht, dass das ein leerer Observable ist, also suche ich nach dem, was wirklich heißt, wenn es existiert.

  2. Wenn man sich diese besondere Folge von Code:

    .flatMap(() => acquireLock(this)) 
    .flatMap(() => { 
        lockAcquired = true; 
        return removeOneLine(this) 
    }) 
    .flatMap(val => { 
        ret = val; 
        return releaseLock(this); 
    }) 
    .map(() => { 
        return JSON.parse(ret); 
    }) 
    

, was ich tue ist eine Referenz Speicherung an der Spitze des Verfahrens ret und Referenzierung es dann später wieder einen Schritt weiter. Nach was ich suche, ist eine Weise, den Wert zu übergeben, der von removeOneLine() zu JSON.parse() gefeuert wird, ohne irgendeinen Zustand außerhalb der Kette zu setzen (die einfach unelegant ist).

Antwort

3

1) Es hängt davon ab, wie Sie Ihre Methode acquireLock funktioniert - aber ich gehe davon aus, dass es einen Fehler aus, wenn es nicht die Sperre erwerben kann, in diesem Fall könnten Sie erstellen Stream mit einem catch und stellen Sie den Fehlerstrom ein leerer:

return Rx.Observable.catch(
     removeLine$, 
     Rx.Observable.empty() 
    ); 

2) die Stateful externe Variable ersparen können Sie einfach die Kette eine mapTo:

let removeLine$ = acquireLock(this) 
    .flatMap(() => this.obsEnqueue 
     .flatMap(() => removeOneLine(this)) 
     .flatMap(val => releaseLock(this).mapTo(val)) 
     .map(val => JSON.parse(val)) 
     .catch(() => releaseLock(this)) 
    ); 
+0

danke @olsn, ich folge Teil 2, (obwohl ich den Unterschied zwischen map und mapTo nicht kenne), aber in Bezug auf Teil 1, folge ich nicht, stört es Sie mit anderen Worten zu erklären? –

+0

part1 ist im Grunde die letzten 4 Zeilen des Code-Blocks (sorry, die Antwort könnte ein bisschen schlecht strukturiert sein) Der Unterschied zwischen 'map' und' mapTo' ist nur, dass 'mapTo' direkt ein Argument an _map to_ nimmt, während 'map' eine Funktion übernimmt, so kann' .mapTo (val) 'genauso geschrieben werden wie' .map (() => val) ' – olsn

+0

ok Ich werde es so bearbeiten, wie ich es verstehe, bitte fühlen Sie sich frei bearbeite meine Bearbeitungen :) –

2

nach Ihrer Definition von cancel ist es, zu verhindern, dass eine Observable einen Wert stromabwärts sendet. Um einen beobachtbaren von drängen Wert zu verhindern, können Sie Filter verwenden:

Es kann so einfach sein wie:

observable.filter(_ => lockAcquired) 

Dies wird nur eine Benachrichtigung senden Downstream wenn lockAcquired wahr ist.

Verwandte Themen