2017-06-04 2 views
0

Ich entwickle eine node.js Social-Networking-Anwendung mit senecajs und muss ein Szenario implementieren, wo ein Hersteller die gleiche Nachricht an mehrere Verbraucher senden kann. Ich habe einen Artikel gefunden, der Beispielcode zu illustrieren scheint, um dies mit Senecajs zu entfernen. Das Problem ist, dass ich versuche, dies zu meinem Szenario zu übersetzen, hier ist das Beispiel aus diesem Artikel (https://github.com/senecajs/seneca-amqp-transport/issues/27):Implementierung von Fanout-Strategie mit senecajs

Ich habe 1 Client veröffentlichen Ereignis zu 2 Listener.

Auftraggeber:

.client({ 
type: 'amqp', 
pin: 'incomingMessage:*', 
url: process.env.AMQP_URL, 
exchange: { 
    name: process.env.NODE_ENV + ':events', 
    type: 'fanout' 
} 
}); 

Hörer:

.listen({ 
type: 'amqp', 
pin: 'incomingMessage:*', //maybe useless 
url: process.env.AMQP_URL, 
name: process.env.NODE_ENV + ':service1', 
exchange: { 
    name: process.env.NODE_ENV + ':events', 
    type: 'fanout' 
} 
}); 

.listen({ 
type: 'amqp', 
pin: 'incomingMessage:*', //maybe useless? 
url: process.env.AMQP_URL, 
name: process.env.NODE_ENV + ':service2', 
exchange: { 
    name: process.env.NODE_ENV + ':events', 
    type: 'fanout' 
} 
}); 

Es gibt ein paar Dinge sind verwirrend:

  1. Für die Client-Einstellungen, so scheint es, als ob der Name endet "Entwicklung: Ereignisse" oder "Produktion: Ereignisse" sein. Bin ich in diesem Denken richtig?

  2. Für das Namensfeld für den Listener außerhalb des Exchange-Objekts, was ist der Zweck dieses Feldes?

  3. Wenn ich das Add-Methode aufrufen, muss ich in einem Namen zu übergeben, die zu den Funktionsaufruf zuordnet, die gemacht wird, wenn der Hörer eine Nachricht empfängt, würde ich die „incoming: *“ pass auf das Add Anruf?

  4. Wird dieser Code effektiv Fanout-Funktionalität mit Senecajs bereitstellen?

Antwort

1

Ich habe die Probe getestet. (Kredit geht für den Autor auf GitHub)

1, Regel NODE_ENV Umgebungsvariable Entwicklung bezieht, Produktion, Inszenierung usw. Es kann alles sein (wie „Apfel“), aber der Trick ist, dass diese " Der Name "Eigenschaft" bezieht sich auf den Namen der "Warteschlange", der für jeden Listener/Teilnehmer eindeutig sein muss.

2, Wie ich oben erwähnt habe, ist diese Eigenschaft "name" (außerhalb des Exchange-Objekts) für den Namen der Warteschlange. Es muss für jeden Listener eindeutig sein. Jeder Listener hat seine eigene Warteschlange, die an den Austausch gebunden ist.

3, Es kann alles sein. Aber das ist auch schwierig.

Für den Client wer "veröffentliche" auf den Fanout-Austausch muss es den gleichen Pin in den Optionen als das aufgerufene Aktion Muster haben. Zum Beispiel, wenn der Client aufrufen:

seneca.act('event:orderReceived', optionalPayload) 

dann hat der Stift sein:

{ 
    pin: 'event:orderReceived', 
    ... 
} 

Siehe Seneca API docs, wie Muster arbeiten.

Für die Zuhörer, in diesem Fall (für RPC ist dies nicht wahr) müssen sie nicht mit dem Pin übereinstimmen. Nur das Muster muss übereinstimmen, wenn Sie die Funktion seneca.add (etwas seltsam) aufrufen. Zum Beispiel:

seneca.add('event:orderReceived', function (msg, respond) { 
    // some logic 
}) 

der Stift Eigenschaft kann sein:

{ 
    pin: 'something:different' 
} 

4, Für die letzte Frage die Antwort richtig ist, kann es eine Publish/Subscribe-Funktionen bieten. Ich habe es getestet.

Beachten Sie, dass, wenn Sie keinen Rückruf an seneca.act weitergeben, wenn "publishing" zu tauschen, dann wird nicht auf eine Antwort gewartet. Zum Beispiel:

seneca.act('event:orderReceived', { some: "payload" }) 

Beachten Sie auch, dass, wenn Sie keine Antwort benötigen, werden Sie die Antwort Rückruf mit einem null für erste (Fehler) Parameter wie auch immer, in den Hörer Dienst rufen müssen, sonst wird es Auszeit. Siehe in den folgenden Beispielen.

Hier ist meine Version des Codes:

Verlag

const seneca = require('seneca') 
const si = seneca() 

si 
    .use('seneca-amqp-transport') 

    .client({ 
     type: 'amqp', 
     pin: 'event:orderReceived', 
     url: 'amqp://localhost:5672', 
     exchange: { 
      name: 'order-service', 
      type: 'fanout' 
     } 
    }) 
    .ready(function() { 
     si.log.info('order-service application is running...') 
     si.act('event:orderReceived', {}) 
    }) 

process.on('SIGTERM',() => { 
    si.log.info('Got SIGTERM. Graceful shutdown start') 
    si.act('role:seneca,cmd:close') 
}) 

Subscriber 1

const seneca = require('seneca') 
const si = seneca() 

si 
    .use('seneca-amqp-transport') 

    .add('event:orderReceived', function(msg, respond) { 
     si.log.info(msg) 
     respond(null) 
    }) 

    .listen({ 
     type: 'amqp', 
     pin: 'some:pin1', 
     url: 'amqp://localhost:5672', 
     name: 'delivery-service-queue', 
     exchange: { 
      name: 'order-service', 
      type: 'fanout' 
     } 
    }) 

    .ready(function() { 
     si.log.info('delivery-service application is running...') 
    }) 

process.on('SIGTERM',() => { 
    si.log.info('Got SIGTERM. Graceful shutdown start') 
    si.act('role:seneca,cmd:close') 
}) 

Teilnehmer 2

const seneca = require('seneca') 
const si = seneca() 

si 
    .use('seneca-amqp-transport') 

    .add('event:orderReceived', function(msg, respond) { 
     si.log.info(msg) 
     respond(null) 
    }) 

    .listen({ 
     type: 'amqp', 
     pin: 'some:pin2', 
     url: 'amqp://localhost:5672', 
     name: 'financial-service-queue', 
     exchange: { 
      name: 'order-service', 
      type: 'fanout' 
     } 
    }) 

    .ready(function() { 
     si.log.info('financial-service application is running...') 
    }) 

process.on('SIGTERM',() => { 
    si.log.info('Got SIGTERM. Graceful shutdown start') 
    si.act('role:seneca,cmd:close') 
}) 

Ich bin mir nicht sicher, dass dies ein richtiger Weg ist, aber scheint zu funktionieren.

Verwandte Themen