2016-05-12 4 views
1

Ich bin auf der Suche nach einem RxJS-Beispiel, wie eine Reihe von XHR-Aufrufe (oder andere asynchrone Operationen) zwischengespeichert werden, so dass derselbe Aufruf nicht wiederholt werden muss Unveränderlichkeit und ohne Nebenwirkungen.RxJS: Cache Mehrere XHR-Aufrufe ohne Mutabilität und Nebenwirkungen

Hier ist ein nackten Knochen, wandelbar Beispiel:

var dictionary = {}; // mutable 

var click$ = Rx.Observable.fromEvent(document.querySelector('button'), 'click', function (evt) { 
    return Math.floor(Math.random() * 6) + 1; // click -> random number 1-6 (key) 
}) 
    .flatMap(getDefinition); 

var clicksub = click$.subscribe(function (key) { 
    console.log(key); 
}); 

function getDefinition (key) { 
    if (dictionary[key]) { // check dict. for key 
    console.log('from dictionary'); 
    return Rx.Observable.return(dictionary[key]); 
    } 
    // key not found, mock up async operation, to be replaced with XHR 
    return Rx.Observable.fromCallback(function (key, cb) { 
    dictionary[key] = key; // side effect 
    cb(dictionary[key); // return definition 
    })(key); 
} 

JSBin Demo

Frage: Gibt es eine Möglichkeit zur Verwendung des dictionary Variable ohne Rückgriff Cachen mehr ähnlichen Asynchron-Operationen zu erreichen, aufgrund Wandelbarkeit und Nebenwirkung?


Ich habe bei scan als Mittel sieht die XHR Anruf Ergebnisse zu „sammeln“, aber ich sehe nicht, wie ein asynchronen Betrieb innerhalb Scan zu handhaben.

Ich denke, ich habe hier zwei Probleme: ein Zustand wird vom Ereignisstrom verwaltet und nicht in einer Variablen gespeichert, und der zweite enthält eine bedingte Operation, die möglicherweise von einer asynchronen Operation abhängt Stromfluss.

Antwort

2

Mit derselben Technik wie in einer vorherigen Frage (RxJS wait until promise resolved) können Sie scan verwenden und den http-Aufruf als Teil des Status hinzufügen, den Sie verfolgen. Dies ist nicht getestet, aber Sie sollten leicht in der Lage sein, um es für Tests anpassen:

restCalls$ = click$ 
    .scan(function (state, request){ 
    var cache = state.cache; 
    if (cache.has(request)) { 
     return {cache : cache, restCallOrCachedValue$ : Rx.Observable.return(cache.get(request))} 
    } 
    else { 
     return { 
     cache : cache, 
     restCallOrCachedValue$ : Rx.Observable 
      .defer(function(){ 
      return Rx.Observable 
       .fromPromise(executeRestCall(request)) 
       .do(function(value){cache.add(request,value)}) 
      }) 
     } 
    } 
    }, {cache : new Cache(), restCallOrCachedValue$ : undefined}) 
    .pluck('restCallOrCachedValue$') 
    .concatAll() 

Also im Grunde Sie eine beobachtbare passieren, die den Anruf hinunter dem Strom tun oder Sie den Wert direkt in dem Cache in einem beobachtbaren gewickelten zurückzukehren. In beiden Fällen werden die relevanten Observablen in der Reihenfolge concatAll ausgepackt. Beachten Sie, dass eine Kaltbeobachtungsfunktion nur zum Zeitpunkt des Abonnements zum Starten des http-Aufrufs verwendet wird (vorausgesetzt, dass executeRestCall den Aufruf ausführt und sofort eine Zusage zurückgibt).

+0

Ich bekomme einen Fehler mit "Defer", aber "vonPromise" (in Verbindung mit einem XHR-Aufruf, der eine Zusage zurückgibt) übergibt den abgerufenen Wert im Stream. Allerdings bleibt das Problem bestehen, wie der abgerufene Wert aus der asynchronen Operation (Versprechen, XHR ...) in den Cache abgerufen werden kann. Das Hinzufügen des Schlüssels zum Cache, wie in Ihrer Antwort, speichert den abgerufenen Wert nicht für die zukünftige Verwendung. – bloodyKnuckles

+0

hat den Code aktualisiert. Was ist es, das nicht mit Defer funktioniert? Meine Vermutung ist, dass die als Parameter übergebene Funktion ein Observable zurückgeben muss und ein Versprechen abgegeben hat. Es sollte jetzt funktionieren. Die wichtigsten Einsichten sind in jedem Fall drei: 1. Die Ausführung in eine Funktion umbrechen (d. H. Die Ausführung des Aufrufs verzögern). 2. Den Stream als Teil des Zustands übergeben. 3. Mit "concatAll" Ergebnisse in der richtigen Reihenfolge erhalten. Über Insight 1 könnten Sie den Aufruf direkt ausführen, aber dann führen Sie einen Nebeneffekt innerhalb des "Scan" aus. Nichts ist falsch daran, aber ich bevorzuge eine reine Funktion in der "Scan". – user3743222

+0

Das hilft sehr. Frage, was ist der Zweck von "Aufschub"? Ich habe es herausgezogen (und brachte "fromPromise" nach vorne) und das Caching funktioniert wie gewünscht (siehe meine aktualisierte Antwort). – bloodyKnuckles

0

Arbeiten von @ user3743222 Antwort, dieser Code die XHR zwischenspeichert ruft:

// set up Dictionary object 
function Dictionary() { this.dictionary = {} } 
Dictionary.prototype.has = function (key) { return this.dictionary[key] } 
Dictionary.prototype.add = function (key, value) { this.dictionary[key] = value } 
Dictionary.prototype.get = function (key) { return this.dictionary[key] } 

var definitions$ = click$ 

    .scan(function (state, key) { 
    var dictionary = state.dictionary 

    // check for key in dict. 
    if (dictionary.has(key)) { 
     console.log('from dictionary') 
     return { 
     dictionary: dictionary, 
     def$: Rx.Observable.return(dictionary.get(key)) 
     } 
    } 

    // key not found 
    else { 
     return { 
     dictionary: dictionary, 
     def$: Rx.Observable.fromPromise(XHRPromise(key)) 
      .do(function (definition) { dictionary.add(key, definition) }) 
     } 
    } 

    }, {dictionary : new Dictionary(), def$ : undefined}) 

    .pluck('def$') // pull out definition stream 
    .concatAll() // flatten 

Aktualisiert: Im // key not found Block, Ausführen der Funktion XHRPromise im key vorbei (die eine Zusage zurückgibt) und das Ergebnis übergeben nach der RxJS-Methode fromPromise. Als nächstes ketten Sie eine do Methode ein, in der die Definition erfasst und zum Cache dictionary hinzugefügt wird.

Follow-up-Frage: Es scheint, wir die Nebenwirkung Problem beseitigt haben, aber mutiert dies noch gilt, wenn die Definition und Schlüssel zu dem Wörterbuch hinzugefügt werden?


dieser Putting hier für Archivierungszwecke, hatte die erste Iteration diesen Code, das gleiche Cache-Verfahren erreichen:

// ALTERNATIVE key not found 
    else { 
     var retdef = Rx.Observable.fromPromise(function() { 
     var prom = XHRPromise(key).then(function (dd) { 
      dictionary.add(key, dd) // add key/def to dict. 
      return dd 
     }) 
     return prom 
     }()) // immediately invoked anon. func. 
     return { dictionary: dictionary, def$: retdef } 
    } 

Hier mit der Funktion XHRPromise, die ein Versprechen zurück. Bevor Sie jedoch das Versprechen an den Ereignisstrom zurücksenden, ziehen Sie eine then-Methode heraus, bei der die versprochene Definition erfasst und zum Wörterbuch-Cache hinzugefügt wird. Danach wird das Promise-Objekt über die RxJS-Methode fromPromise an den Ereignisstream zurückgegeben.

Um Zugriff auf die vom XHR-Aufruf zurückgegebene Definition zu erhalten, damit sie zum Dictionary-Cache hinzugefügt werden kann, wird eine anonyme, sofort aufgerufene Funktion verwendet.

+0

cf. http://stackoverflow.com/questions/36141280/how-to-manage-state-without-using-subject-or-imperative-manipulation-in-a-simple/36142372#36142372 – user3743222

Verwandte Themen