Ich bin neu bei RxJS und fragte mich, ob jemand mir helfen könnte.Synchrone Stream von Antworten aus einem Strom von Anfragen mit RxJS
Ich möchte einen synchronen Stream von Antworten (vorzugsweise mit den entsprechenden Anforderungen) aus einem Strom von Anfragen (Nutzdaten) erstellen.
Ich möchte im Grunde die Anforderungen eins nach dem anderen gesendet werden, jeder wartet auf die Antwort von der letzten.
Ich versuchte dies, aber es sendet alles auf einmal (jsbin):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
Die folgenden Werke, in einem Ausmaß, aber Strom für die Anforderungsdaten nicht (jsbin verwenden).
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
Vielen Dank!
EDIT:
Nur um zu klären, das ist, was ich erreichen wollte:
„Send A, wenn Sie Antwort für A, senden B erhalten, wenn Sie Antwort für B empfangen, senden C, etc ...“
concatMap Verwendung und aufschieben, wie user3743222 vorgeschlagen, scheint es zu tun (jsbin):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
Vielen Dank für die Antwort. Ich habe Ihre Lösung versucht, aber die Anfragen werden immer noch sofort gesendet. Die Dokumentation legt nahe, dass FlatMap Interleaving verursachen kann, während concatMap dies nicht tut. Es scheint, dass der Unterschied in der Reihenfolge liegt. Es macht Sinn, concatMap zu verwenden, aber es produziert immer noch nicht das gewünschte Verhalten: Senden A, wenn Sie Antwort für A erhalten, senden Sie B, wenn Sie Antwort für B erhalten, senden Sie C, etc. – jamesref
Vielleicht habe ich missverstanden, was Sie wollten. Kannst du in diesem Fall versuchen "aufzuheben"? Ich werde den Code aktualisieren – user3743222
Vielen Dank! Es scheint zu funktionieren. – jamesref