2017-06-28 3 views
0

Ich versuche, eine einfache, aber skalierbare Chat-App mit Node.js und WebSockets zu bauen. Ich verwende RabbitMQ als Nachrichten-Broker zwischen meinen Node-Instanzen. Ich benutze pm2 für Knoten-Clustering hinter Nginx.Node.js Cluster (PM2) - Geschlossene WebSocket empfangen Nachrichten

Das Problem, das ich habe ist, dass manchmal WebSockets, die bereits geschlossen haben, neue Nachrichten anstelle von offenen Nachrichten erhalten. Hier

ist der Code für meinen WebSocket-Server:

var url = require('url'); 
var WebSocket = require('ws'); 
var rabbit = require('amqplib').connect('amqp://localhost'); 
var express = require('express'); 
var http = require('http'); 

const app = express(); 
const server = http.createServer(app); 

var port = process.env.PORT || 3000; 

server.listen(port, function() { 
    console.log('Express server listening on port ' + server.address().port); 
}); 

const q = "messages"; 

const wss = new WebSocket.Server({ server }); 

wss.on('connection', function onConnection(ws) { 

    console.log('got websocket connection'); 

    rabbit.then(function(conn) { 
     ws.conn = conn; 
     console.log('consumer connected to rabbitMQ'); 
     return conn.createChannel(); 
    }).then(function(ch) { 
     console.log('consumer channel created'); 
     return ch.assertQueue(q).then(function(ok) { 
      console.log('consumer asserted queue'); 
      return ch.consume(q, function(msg) { 
       console.log('received message from rabbitMQ: ', msg.content.toString()); 
       if (ws.readyState === 1) { 
        if (msg && msg.content) 
         ws.send(msg.content.toString()); 
       } else { 
        console.warn('Websocket is not open, state is ', ws.readyState); 
       } 
      }, { noAck: true }); 
     }); 
    }).catch(function handleConsumerRabbitErr(err) { 
     console.warn('Handled rabbit consumer error:', err.stack); 
    }); 

    ws.on('message', function onMessage(msg) { 

     console.log('got websocket message'); 

     if (ws.conn) { 
      ws.conn.createChannel().then(function(ch) { 
       console.log('producer channel created'); 
       return ch.assertQueue(q).then(function(ok) { 
        console.log('sending ' + msg + ' to rabbitMQ'); 
        return ch.sendToQueue(q, Buffer(msg)); 
       }); 
      }).catch(function handleProducerRabbitErr(err) { 
       console.warn('Handled rabbit producer error:', err.stack); 
      }); 
     } 

    }); 

    ws.on('close', function onClose() { 
     console.log('websocket closing'); 
    }); 
}); 

Meiner Nginx config:

http { 

    upstream nodejs { 
     server localhost:3000; 
    } 

    map $http_upgrade $connection_upgrade { 
     default upgrade; 
     '' close; 
    } 

    server { 
     listen 80; 
     listen 443 ssl; 
     server_name 192.168.45.45; 
     root /vagrant/static; 

     # Redirect HTTP to HTTPS 
     if ($scheme = http) { 
      return 301 https://$server_name$request_uri; 
     } 

     proxy_http_version 1.1; 
     proxy_set_header Host $host; 
     proxy_set_header Connection ""; 
     proxy_set_header X-Real-IP $remote_addr; 
     proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; 

     ssl_certificate /etc/ssl/certs/nginx-selfsigned.crt; 
     ssl_certificate_key /etc/ssl/private/nginx-selfsigned.key; 
     ssl_session_cache shared:SSL:1m; 
     ssl_prefer_server_ciphers on; 

     location /ws/ { 
      proxy_set_header Upgrade $http_upgrade; 
      proxy_set_header Connection $connection_upgrade; 
      proxy_pass http://nodejs; 
     } 

     location /app/ { 
      proxy_pass http://nodejs; 
     } 
    } 
} 

ich die App mit dem folgenden Befehl ausgeführt wird (erzeugt zwei Knoten auf meiner VM): Hier

$ pm2 app.js -i 0 --name chat

ist der Ausgang des pm2 log aft er sendet eine WebSocket-Nachricht, die Seite aktualisieren und das Senden von zwei Nachrichten:

[STREAMING] Now streaming realtime logs for [all] processes 
0|chat  | got websocket connection    # Initial websocket connection on node 0 
0|chat  | consumer connected to rabbitMQ 
0|chat  | consumer channel created 
0|chat  | consumer asserted queue 
0|chat  | got websocket message 
0|chat  | producer channel created 
0|chat  | sending test to rabbitMQ 
0|chat  | received message from rabbitMQ: test # Websocket received message from RabbitMQ, as expected 
0|chat  | websocket closing 
1|chat  | got websocket connection    # Page refresh, new websocket connection on node 1 
1|chat  | consumer connected to rabbitMQ 
1|chat  | consumer channel created 
1|chat  | consumer asserted queue 
1|chat  | got websocket message 
1|chat  | producer channel created 
1|chat  | sending test to rabbitMQ 
0|chat  | received message from rabbitMQ: test # ERROR - old websocket received message from RabbitMQ! 
0|chat  | Websocket is not open, state is 3  # Note these two lines are from node 0 
1|chat  | got websocket message     # Sent another message without refreshing the page 
1|chat  | producer channel created 
1|chat  | sending test to rabbitMQ    # This one succeeds, and it alternates from here 
1|chat  | received message from rabbitMQ: test 

ich gefunden habe, dass es auf jeder n-ten Nachricht erfolgreich gesendet, wobei n die Anzahl der WebSockets I (aka geöffnet habe. die Anzahl der Male, die ich die Seite aktualisiert habe), unabhängig davon, ob die Websockets geschlossen wurden oder nicht.

Meine Erwartung ist, dass nach dem Schließen eines WebSockets das Ereignis message nicht mehr ausgelöst werden sollte. Das ist natürlich nicht der Fall.

Was kann ich tun, um sicherzustellen, dass WebSocket nach dem Schließen keine Nachrichten mehr empfängt?

Antwort

Verwandte Themen