2017-12-05 4 views
1

Ich habe über Observables gelesen. Eine Sache, über die ich gelesen habe, ist ihre Fähigkeit, wie ein Strom zu hören. Ich habe das mit dem folgenden Code versucht.Zu versprechen, wie ein Strom

const Rx = require('rxjs/Rx'); 
Promise = require('bluebird'); 

let i = 0; 

function calculate() { 
    return new Promise((resolve, reject) => { 
     setInterval(() => { 
      console.log(++i); 
      resolve(i); 
     }, 5000); 
    }) 
} 


let source = Rx.Observable.fromPromise(calculate()); 

source.subscribe(
    next => { 
     console.log(next, ' next '); 
    }, 
    err => { 
     console.error(err) 
    }, 
    () => { 
     console.log('done!'); 
    } 
) 

Aber es hat nicht wie ein Stream funktioniert. So sieht die Ausgabe etwa so aus:

1 
1 ' next ' 
done! 
2 
3 

Wie höre ich weiter auf das Versprechen? Ist das Problem Observable wirklich zu lösen?

+6

Versprechen kann nur einmal aufgelöst/abgelehnt werden, also nicht sicher, was Sie erwarten würden, wenn Sie ein Versprechen mehrmals lösen. Wenn Sie in bestimmten Intervallen weiterhin Ereignisse erzeugen möchten, können Sie 'Rx.Observable.interval' verwenden. –

+0

@arturgrzesiak Könnten Sie bitte ein Beispiel in einer Antwort posten –

+0

@SuhailGupta was ist der Anwendungsfall, den Sie lösen möchten? – mickaelw

Antwort

2

Observable.fromPromise gedacht ist ein Versprechen an eine beobachtbare zu konvertieren. Da Versprechungen nur einmal aufgelöst werden können, gibt dieser Stream immer nur 1 Element aus.

Sie können eine Versprechungsfunktionalität nicht erweitern, indem Sie einen Stream darüber legen. Wenn das Versprechen nur 1 Wert ergeben kann, wird nur 1 Wert in der resultierenden Observablen erhalten.

In Ihrem spezifischen Beispiel gibt es Observable.interval(n), die 1 Element alle n Millisekunden aussendet. Else, für eine individuelle beobachtbar:

Rx.Observable.create(obs => { 
    let i=0; 
    setInterval(() => { 
     obs.next(++i); 
    }, 1000); 
}); 

Wenn Sie eine beobachtbare erstellen, erhalten Sie eine Observer-Instanz, die den Betrachter darstellt, die nur auf Ihren Stream abonniert. Beachten Sie, dass jeder neue Beobachter einen neuen Kontext erstellt (in meinem Beispiel hat jeder Beobachter seinen i Zähler).

+0

Ich glaube nicht, dass das seine Frage beantwortet. Er sagte, er wolle die Datenbank aufgrund seines Versprechens regelmäßig überprüfen. – mickaelw

+0

@mickaelw Und ich antwortete, indem ich sagte, dass es mit diesem Ansatz wegen der Beschränkungen von "Versprechen" nicht möglich ist, und dass es leicht zu "Rx.Observable.create" umgestaltet werden sollte – olivarra1

0

können Sie verwenden switchMap und Intervall:

let i = 0; 

function calculate() { 
    return Promise.resolve(++i) 
} 
//I reduce to the strictly necessary for the answer, the promise's resolve. 
//It's possible to have more things 

let source = Rx.Observable 
    .interval(1000) 
    .switchMap(() => calculate()) 

source.subscribe(
    next => { 
     console.log(next, ' next '); 
    }, 
    err => { 
     console.error(err) 
    }, 
    () => { 
     console.log('done!'); 
    } 
) 

Sie können die Dokumentation von switchMap und Intervall siehe hier:

Und die Demo: https://jsbin.com/nutukiyagi/1/edit?js,console

In diesem Fall Sie nie den Strom vervollständigen, die console.log (‚gemacht!‘) Nie in der Konsole erscheint

+0

@JLRishe du bist richtig für die switchMap und ist ein Fehler für das neue vor dem Versprechen, danke! Ich aktualisiere meine Antwort – mickaelw

0

Aus der Dokumentation für Rx.Observable.fromPromise:

Wenn das Versprechen mit einem Wert verrechnet wird, der Ausgang beobachtbare aussendet, die den Wert als nächstes aufgelöst, und dann vervollständigt.

Dies beschreibt genau das Verhalten, vor dem Sie stehen. Sie sollten ein generator function oder Rx.Observable.create verwenden, um dieses spezifische Problem zu lösen.

Verwandte Themen