2017-06-04 3 views
1

Ich bin ziemlich neu mit Rxjs. Ich rufe unten eine Funktion auf und der komplette Strom wird gelesen und die gelesenen Konsolenanweisungen werden gedruckt, aber ich sehe nie einen "betitelten Teilnehmer" und ich weiß nicht warum. Was braucht es, um diesen Stream zu beenden? Ist etwas offensichtlich falsch?RxJS abonnieren nie beendet

const readline$ = RxNode.fromReadLineStream(rl) 
    .filter((element, index, observable) => { 
     if (index >= range.start && index < range.stop) { 
      console.log(`kept line is ${JSON.stringify(element)}`); 
      return true; 
     } else { 
      console.log(`not keeping line ${JSON.stringify(element)}`); 
      return false; 
     } 
    }) 
    .concatMap(line => Rx.Observable.fromPromise(myFunction(line))) 
    .do(response => console.log(JSON.stringify(response))); 

readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); }, 
        err => { console.error(`Subscribe error: ${util.inspect(err)}`); }, 
       done => { console.log("Subscribe done."); // NEVER CALLED 
          anotherFunc();     // NEVER CALLED 
        } 
); 
+1

Ich bin mir nicht sicher über diese * spezifische beobachtbare *, aber nicht alle beobachtbaren werden abgeschlossen. –

+1

Welchen Knoten-Stream übergeben Sie an 'RxNode.fromReadLineStream'? Endet der Stream selbst? – cartant

+0

Ja, es ist ein RxNode.fromReadLineStream einer Datei mit einer endlichen Länge. –

Antwort

1

Sie können aus dem Quellcode sehen, dass es nur vollständige Meldung senden, wenn der Quellenstrom close Ereignis ausgibt. https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L100-L102

Wenn Sie also den richtigen vollständigen Handler benötigen, der aufgerufen werden soll, müssen Sie den Stream selbst schließen, siehe How to close a readable stream (before end)?.
Mit anderen Worten, das Observable wird nach dem Lesen der gesamten Datei nicht automatisch abgeschlossen.

Verwandte Themen