Ich versuche, eine bestehende API zu konvertieren, um mit RxJS zu arbeiten ... ziemlich neu zu Knoten und sehr neu zu RxJs, also bitte bitte mit mir.Versuche, meine eigenen RxJs beobachtbar zu machen
Ich habe eine vorhandene API (getNextMessage), die entweder (asynchron) blockiert oder ein neues Element oder einen Fehler über einen node-style (err, val) -Rückruf zurückgibt, wenn das etwas verfügbar wird.
so sieht es so etwas wie:
getNextMessage (nodeStyleCompletionCallback);
Sie könnten an getNextMessage wie eine HTTP-Anfrage denken, die in der Zukunft abgeschlossen wird, wenn der Server antwortet, aber Sie müssen getNextMessage erneut aufrufen, sobald eine Nachricht empfangen wird, um neue Elemente vom Server zu erhalten.
Also, um es in eine beobachtbare Sammlung zu machen, muss ich RxJs dazu bringen, meine getNextMessage-Funktion aufzurufen, bis der Abonnent entsorgt wird();
Grundsätzlich versuche ich meine eigene RxJs beobachtbare Sammlung zu erstellen.
Die Probleme sind:
- Ich weiß nicht, wie subscriber.dispose() töten den async.forever
- ich wahrscheinlich sollte nicht async.forever in erster Linie mit machen
- Ich bin nicht sicher, ich sollte sogar für jede Nachricht "abgeschlossen" sein - sollte das nicht am Ende einer Sequenz sein
- Ich möchte schließlich die Notwendigkeit der Verwendung von NodeCallback entfernen, um eine erste zu haben Klasse RxJS beobachtbar
- Offensichtlich bin ich ein wenig verwirrt.
Würde ein bisschen Hilfe lieben, danke!
Hier ist mein vorhandener Code:
var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');
function observableReceive(portName)
{
var observerCallback;
var listenPort = new port(portName);
var disposed = false;
var asyncReceive = function(asyncCallback)
{
listenPort.getNextMessage(
function(error, json)
{
observerCallback(error, json);
if (!disposed)
setImmediate(asyncCallback);
}
);
}
return function(outerCallback)
{
observerCallback = outerCallback;
async.forever(asyncReceive);
}
}
var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();
var subscription = source.forEach(
function (json)
{
console.log('receive completed: ' + JSON.stringify(json));
},
function (error) {
console.log("receive failed: " + error.toString());
},
function() {
console.log('Completed');
subscription.dispose();
}
);
Vielen Dank! Ich fand die Hälfte davon auch alleine, und irgendwie hat es funktioniert, obwohl ich nicht so weit gekommen bin, async loszuwerden. Jetzt kann ich meine eigenen erstklassigen Observablen bauen, es ist eine wunderschöne Welt.Dies ist übrigens ein großartiges Beispiel - sollte irgendwo in die RxJs-Dokumentation gehen. danke nochmal. – user3291110