Ich habe einen net
tcp
Buchse node.js
Codeabschnitt, den ich von der Verwendung eines callback
zu rx
konvertieren möchte.eine Netto Steckdose in eine beobachtbaren
ich wie folgt aussehen in feed.js
Modul:
var net = require('net');
var server = net.createServer(function(socket) {
...
// Handle incoming messages from clients.
socket.on('data', function (data) {
broadcast(data, socket);
});
...
}
function broadcast(message, sender)
{
...
onChangeHandler(stock.symbol, 'stock', stock);
}
}
function start(onChange) {
onChangeHandler = onChange;
}
exports.start = start;
server.listen(....);
Dann wird der Kunde von dem obigen Aufruf registriert einen Rückruf:
feed.start(function(room, type, message) {
//...Do something with the message
});
Ich mag würde dies konvertieren ein Rx
Observable/Observer
zu verwenden. Ich sehe, dass es einen Weg gibt einen beobachtbaren Strom aus dem web socket
zu machen (obwohl es eine bidirektionale Subject
verwendet, die ich nicht brauche):
fromWebSocket(address, protocol) {
var ws = new WebSocket(address, protocol);
// Handle the data
var osbervable = Rx.Observable.create (function (obs) {
// Handle messages
ws.onmessage = obs.onNext.bind(obs);
ws.onerror = obs.onError.bind(obs);
ws.onclose = obs.onCompleted.bind(obs);
// Return way to unsubscribe
return ws.close.bind(ws);
});
var observer = Rx.Observer.create(function (data) {
if (ws.readyState === WebSocket.OPEN) { ws.send(data); }
});
return Rx.Subject.create(observer, observable);
}
var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol');
// Receive data
socketSubject.subscribe(
function (data) {
// Do something with the data
},
function (error) {
// Do something with the error
},
function() {
// Do something on completion
});
// Send data
socketSubject.onNext(42);
Was ist das Äquivalent für eine net
Buchse? Wenn es eine Standardbibliothek gibt, ist das in Ordnung.
Mein erster Versuch ist so, aber ich weiß nicht, wie Rx
und und die Socket-Funktionen zu binden zusammen in eine onnext
:
var net = require('net');
fromNetSocket(address, protocol) {
var ns = net.createServer(function(socket) {
socket.on('disconnect', function() { // This seems like it maps to onclose
console.log('User disconnected. %s. Socket id %s', socket.id);
});
// Handle incoming messages from clients.
socket.on('data', function (data) { //this should map to onnext
});
// Handle the data
var osbervable = Rx.Observable.create (function (obs) {
// Handle messages
ns.onmessage = obs.onNext.bind(obs);
ns.onerror = obs.onError.bind(obs);
ns.onclose = obs.onCompleted.bind(obs);
// Return way to unsubscribe
return ns.close.bind(ns);
});
});
};