2016-02-03 7 views
5

Ich bin neu bei RxJS und fragte mich, ob jemand mir helfen könnte.Synchrone Stream von Antworten aus einem Strom von Anfragen mit RxJS

Ich möchte einen synchronen Stream von Antworten (vorzugsweise mit den entsprechenden Anforderungen) aus einem Strom von Anfragen (Nutzdaten) erstellen.

Ich möchte im Grunde die Anforderungen eins nach dem anderen gesendet werden, jeder wartet auf die Antwort von der letzten.

Ich versuchte dies, aber es sendet alles auf einmal (jsbin):

var requestStream, responseStream; 
 
requestStream = Rx.Observable.from(['a','b','c','d','e']); 
 

 
responseStream = requestStream.flatMap(
 
    sendRequest, 
 
    (val, response)=>{ return {val, response}; } 
 
); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('result for '+val);},1000); 
 
    }); 
 
};

Die folgenden Werke, in einem Ausmaß, aber Strom für die Anforderungsdaten nicht (jsbin verwenden).

var data, responseStream; 
 
data = ['a','b','c','d','e']; 
 
responseStream = Rx.Observable.create(observer=>{ 
 
    var sendNext = function(){ 
 
    var val = data.shift(); 
 
    if (!val) { 
 
     observer.onCompleted(); 
 
     return; 
 
    } 
 
    sendRequest(val).then(response=>{ 
 
     observer.onNext({val, response}); 
 
     sendNext(); 
 
    }); 
 
    }; 
 
    sendNext(); 
 
}); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500); 
 
    }); 
 
};

Vielen Dank!

EDIT:

Nur um zu klären, das ist, was ich erreichen wollte:

„Send A, wenn Sie Antwort für A, senden B erhalten, wenn Sie Antwort für B empfangen, senden C, etc ...“

concatMap Verwendung und aufschieben, wie user3743222 vorgeschlagen, scheint es zu tun (jsbin):

responseStream = requestStream.concatMap(
    (val)=>{ 
    return Rx.Observable.defer(()=>{ 
     return sendRequest(val); 
    }); 
    }, 
    (val, response)=>{ return {val, response}; } 
); 

Antwort

3

Versuchen Sie in Ihrem ersten Codebeispiel flatMap durch concatMap zu ersetzen und lassen Sie mich wissen, ob das resultierende Verhalten dem entspricht, wonach Sie suchen.

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    sendRequest, 
    (val, response)=>{ return {val, response}; } 
); 

concatMap Grundsätzlich hat eine ähnliche Signatur als flatMap, der Unterschied im Verhalten, dass es für die aktuelle beobachtbaren abgeflacht wird warten sein wird, bevor abzuschließen mit der nächsten fortgefahren wird. Also hier:

  • ein requestStream Wert wird den concatMap Bediener gedrückt werden.
  • der concatMap Operator ein sendRequest beobachtbar, und was auch immer Werte aus diesen beobachtbaren (scheint ein Tupel (val, response) zu sein) erzeugen wird durch die Auswahlfunktion und das Objekt Ergebnis, dass stromabwärts
  • übergeben werden, wenn die sendRequest übergeben wird wird abgeschlossen, ein weiterer requestStream Wert wird verarbeitet.
  • Kurz gesagt, Ihre Anfragen eins nach dem anderen

Alternativ Sie verwenden möchten, um die Ausführung des sendRequest zu verschieben, vielleicht defer verarbeitet werden.

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    function(x){return Rx.Observable.defer(function(){return sendRequest(x);})}, 
    (val, response)=>{ return {val, response}; } 
); 
+0

Vielen Dank für die Antwort. Ich habe Ihre Lösung versucht, aber die Anfragen werden immer noch sofort gesendet. Die Dokumentation legt nahe, dass FlatMap Interleaving verursachen kann, während concatMap dies nicht tut. Es scheint, dass der Unterschied in der Reihenfolge liegt. Es macht Sinn, concatMap zu verwenden, aber es produziert immer noch nicht das gewünschte Verhalten: Senden A, wenn Sie Antwort für A erhalten, senden Sie B, wenn Sie Antwort für B erhalten, senden Sie C, etc. – jamesref

+0

Vielleicht habe ich missverstanden, was Sie wollten. Kannst du in diesem Fall versuchen "aufzuheben"? Ich werde den Code aktualisieren – user3743222

+0

Vielen Dank! Es scheint zu funktionieren. – jamesref

Verwandte Themen