2014-02-10 11 views
5

Ich versuche, eine bestehende API zu konvertieren, um mit RxJS zu arbeiten ... ziemlich neu zu Knoten und sehr neu zu RxJs, also bitte bitte mit mir.Versuche, meine eigenen RxJs beobachtbar zu machen

Ich habe eine vorhandene API (getNextMessage), die entweder (asynchron) blockiert oder ein neues Element oder einen Fehler über einen node-style (err, val) -Rückruf zurückgibt, wenn das etwas verfügbar wird.

so sieht es so etwas wie:

getNextMessage (nodeStyleCompletionCallback);

Sie könnten an getNextMessage wie eine HTTP-Anfrage denken, die in der Zukunft abgeschlossen wird, wenn der Server antwortet, aber Sie müssen getNextMessage erneut aufrufen, sobald eine Nachricht empfangen wird, um neue Elemente vom Server zu erhalten.

Also, um es in eine beobachtbare Sammlung zu machen, muss ich RxJs dazu bringen, meine getNextMessage-Funktion aufzurufen, bis der Abonnent entsorgt wird();

Grundsätzlich versuche ich meine eigene RxJs beobachtbare Sammlung zu erstellen.

Die Probleme sind:

  1. Ich weiß nicht, wie subscriber.dispose() töten den async.forever
  2. ich wahrscheinlich sollte nicht async.forever in erster Linie mit machen
  3. Ich bin nicht sicher, ich sollte sogar für jede Nachricht "abgeschlossen" sein - sollte das nicht am Ende einer Sequenz sein
  4. Ich möchte schließlich die Notwendigkeit der Verwendung von NodeCallback entfernen, um eine erste zu haben Klasse RxJS beobachtbar
  5. Offensichtlich bin ich ein wenig verwirrt.

Würde ein bisschen Hilfe lieben, danke!

Hier ist mein vorhandener Code:

var Rx = require('rx'); 
var port = require('../lib/port'); 
var async = require('async'); 

function observableReceive(portName) 
{ 
    var observerCallback; 
    var listenPort = new port(portName); 
    var disposed = false; 

    var asyncReceive = function(asyncCallback) 
    { 
     listenPort.getNextMessage(
      function(error, json) 
      { 
       observerCallback(error, json); 

       if (!disposed) 
        setImmediate(asyncCallback); 
      } 
     ); 
    } 

    return function(outerCallback) 
    { 
     observerCallback = outerCallback; 
     async.forever(asyncReceive); 
    } 
} 

var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest')); 
var source = receive(); 

var subscription = source.forEach(
    function (json) 
    { 
     console.log('receive completed: ' + JSON.stringify(json)); 
    }, 
    function (error) { 
     console.log("receive failed: " + error.toString()); 
    }, 
    function() { 
     console.log('Completed'); 
     subscription.dispose(); 
    } 
); 

Antwort

14

Also hier ist wahrscheinlich das, was ich tun würde.

var Rx = require('Rx'); 

// This is just for kicks. You have your own getNextMessage to use. ;) 
var getNextMessage = (function(){ 

    var i = 1; 

    return function (callback) { 
    setTimeout(function() { 
     if (i > 10) { 
     callback("lawdy lawd it's ova' ten, ya'll."); 
     } else { 
     callback(undefined, i++); 
     } 
    }, 5); 
    }; 

}()); 

// This just makes an observable version of getNextMessage. 
var nextMessageAsObservable = Rx.Observable.create(function (o) { 
    getNextMessage(function (err, val) { 
    if (err) { 
     o.onError(err); 
    } else { 
     o.onNext(val); 
     o.onCompleted(); 
    } 
    }); 
}); 

// This repeats the call to getNextMessage as many times (11) as you want. 
// "take" will cancel the subscription after receiving 11 items. 
nextMessageAsObservable 
    .repeat() 
    .take(11) 
    .subscribe(
    function (x) { console.log('next', x); }, 
    function (err) { console.log('error', err); }, 
    function() { console.log('done');  } 
); 
+2

Vielen Dank! Ich fand die Hälfte davon auch alleine, und irgendwie hat es funktioniert, obwohl ich nicht so weit gekommen bin, async loszuwerden. Jetzt kann ich meine eigenen erstklassigen Observablen bauen, es ist eine wunderschöne Welt.Dies ist übrigens ein großartiges Beispiel - sollte irgendwo in die RxJs-Dokumentation gehen. danke nochmal. – user3291110

5

Ich weiß, das über ein Jahr alt ist, aber ich denke, eine bessere Lösung für diese Verwendung von rekursiven Scheduling stattdessen zu machen wäre: hier

Rx.Observable.forever = function(next, scheduler) { 
    scheduler = scheduler || Rx.Scheduler.default, 
    //Internally wrap the the callback into an observable 
    next = Rx.Observable.fromNodeCallback(next);  

    return Rx.Observable.create(function(observer) { 
    var disposable = new Rx.SingleAssignmentDisposable(), 
     hasState = false; 
    disposable.setDisposable(scheduler.scheduleRecursiveWithState(null, 
     function(state, self) { 
     hasState && observer.onNext(state); 
     hasState = false; 
     next().subscribe(function(x){ 
      hasState = true; 
      self(x); 
     }, observer.onError.bind(observer)); 

     })); 

    return disposable; 

    }); 

}; 

Die Idee, dass Sie neu planen können Artikel, sobald der vorherige abgeschlossen wurde. Sie rufen next() auf, die die übergebene Methode aufruft und wenn sie einen Wert zurückgibt, planen Sie das nächste Element für den Aufruf ein.

Sie können es dann wie so verwenden:

Rx.Observable.forever(getNextMessage) 
.take(11) 
.subscribe(function(message) { 
console.log(message); 
}); 

Sehen Sie ein funktionierendes Beispiel here

Verwandte Themen