Ich bin ziemlich neu mit Rxjs. Ich rufe unten eine Funktion auf und der komplette Strom wird gelesen und die gelesenen Konsolenanweisungen werden gedruckt, aber ich sehe nie einen "betitelten Teilnehmer" und ich weiß nicht warum. Was braucht es, um diesen Stream zu beenden? Ist etwas offensichtlich falsch?RxJS abonnieren nie beendet
const readline$ = RxNode.fromReadLineStream(rl)
.filter((element, index, observable) => {
if (index >= range.start && index < range.stop) {
console.log(`kept line is ${JSON.stringify(element)}`);
return true;
} else {
console.log(`not keeping line ${JSON.stringify(element)}`);
return false;
}
})
.concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
.do(response => console.log(JSON.stringify(response)));
readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); },
err => { console.error(`Subscribe error: ${util.inspect(err)}`); },
done => { console.log("Subscribe done."); // NEVER CALLED
anotherFunc(); // NEVER CALLED
}
);
Ich bin mir nicht sicher über diese * spezifische beobachtbare *, aber nicht alle beobachtbaren werden abgeschlossen. –
Welchen Knoten-Stream übergeben Sie an 'RxNode.fromReadLineStream'? Endet der Stream selbst? – cartant
Ja, es ist ein RxNode.fromReadLineStream einer Datei mit einer endlichen Länge. –