2017-03-12 5 views
1

Ich habe einen nettcp Buchse node.js Codeabschnitt, den ich von der Verwendung eines callback zu rx konvertieren möchte.eine Netto Steckdose in eine beobachtbaren

ich wie folgt aussehen in feed.js Modul:

var net = require('net'); 
var server = net.createServer(function(socket) { 
... 

    // Handle incoming messages from clients. 
    socket.on('data', function (data) { 
     broadcast(data, socket); 
    }); 

... 
} 

function broadcast(message, sender) 
{ 
    ... 
    onChangeHandler(stock.symbol, 'stock', stock); 
} 
} 

function start(onChange) { 
    onChangeHandler = onChange; 
} 

exports.start = start; 
server.listen(....); 

Dann wird der Kunde von dem obigen Aufruf registriert einen Rückruf:

feed.start(function(room, type, message) { 
    //...Do something with the message 
}); 

Ich mag würde dies konvertieren ein RxObservable/Observer zu verwenden. Ich sehe, dass es einen Weg gibt einen beobachtbaren Strom aus dem web socket zu machen (obwohl es eine bidirektionale Subject verwendet, die ich nicht brauche):

fromWebSocket(address, protocol) { 
    var ws = new WebSocket(address, protocol); 

    // Handle the data 
    var osbervable = Rx.Observable.create (function (obs) { 
     // Handle messages 
     ws.onmessage = obs.onNext.bind(obs); 
     ws.onerror = obs.onError.bind(obs); 
     ws.onclose = obs.onCompleted.bind(obs); 

     // Return way to unsubscribe 
     return ws.close.bind(ws); 
    }); 

    var observer = Rx.Observer.create(function (data) { 
     if (ws.readyState === WebSocket.OPEN) { ws.send(data); } 
    }); 

    return Rx.Subject.create(observer, observable); 
} 

var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol'); 

// Receive data 
socketSubject.subscribe(
    function (data) { 
     // Do something with the data 
    }, 
    function (error) { 
     // Do something with the error 
    }, 
    function() { 
     // Do something on completion 
    }); 

// Send data 
socketSubject.onNext(42); 

Was ist das Äquivalent für eine net Buchse? Wenn es eine Standardbibliothek gibt, ist das in Ordnung.

Mein erster Versuch ist so, aber ich weiß nicht, wie Rx und und die Socket-Funktionen zu binden zusammen in eine onnext:

var net = require('net'); 

fromNetSocket(address, protocol) { 
    var ns = net.createServer(function(socket) { 


     socket.on('disconnect', function() { // This seems like it maps to onclose 

      console.log('User disconnected. %s. Socket id %s', socket.id); 
     }); 

     // Handle incoming messages from clients. 
     socket.on('data', function (data) { //this should map to onnext 

     }); 

     // Handle the data 
     var osbervable = Rx.Observable.create (function (obs) { 
      // Handle messages 
      ns.onmessage = obs.onNext.bind(obs); 
      ns.onerror = obs.onError.bind(obs); 
      ns.onclose = obs.onCompleted.bind(obs); 

      // Return way to unsubscribe 
      return ns.close.bind(ns); 
     });   
    }); 
}; 

Antwort

0

Versuchen Sie, die folgenden

const createSubject =() => { 

    return Rx.Observable.create((observer) => { 

    const socket = net.connect({port: 1705},() => { 

     log.i('Connected to Server!'); 

     let socketObservable = Rx.Observable.create((observer) => { 
     socket.on('data', (data) => observer.next(JSON.parse(data))); 
     socket.on('error', (err) => observer.error(err)); 
     socket.on('close',() => observer.complete()); 
     }); 

     let socketObserver = { 
     next: (data) => { 
      if (!socket.destroyed) { 
      socket.write(`${JSON.stringify(data)}\r\n`); 
      } 
     } 
     }; 

     const subject = Rx.Subject.create(socketObserver, socketObservable); 
     observer.next(subject); 
     observer.complete(); 

    }); 

    }); 

}; 

Sie dann kann das Objekt wie folgt verwenden

createSubject().subscribe((con) => { 

    con.subscribe((data) => console.log(data)); 

    con.next({ 
    id: utils.UUID(), 
    jsonrpc: '2.0', 
    method: 'Server.GetRPCVersion' 
    }); 

}); 
Verwandte Themen