2014-02-07 5 views
13

Ich habe in der FAQ in der Monitoring section festgestellt, dass es nicht möglich ist, eine Liste der verbundenen Peers zu erhalten oder benachrichtigt werden, wenn Peers verbinden/trennen.Get Subscriber-Filter von einem ZMQ PUB-Socket

Bedeutet dies, dass es auch nicht möglich ist, zu wissen, welche Themen ein PUB/XPUB-Socket aus seiner Upstream-Rückmeldung veröffentlichen sollte? Oder gibt es eine Möglichkeit, auf diese Daten zuzugreifen?

Ich weiß, dass ZMQ> = 3.0 "supports PUB/SUB filtering at the publisher", aber was ich wirklich will, ist an meinem Anwendungscode zu filtern, mit dem Wissen, ZMQ hat über welche Themen abonniert sind.

Mein Anwendungsfall ist, dass ich Informationen über den Status eines Roboters veröffentlichen möchte. Einige Themen beinhalten wichtige Hardwareaktionen, wie das Umschalten der Auswahlleitungen auf einem ADC, um IR-Werte zu lesen.

Ich habe einen Publisher-Thread auf dem Bot ausgeführt, der nur das "Lesen" tun sollte, um IR-Daten zu erhalten, wenn es tatsächlich Abonnenten gibt. Da ich jedoch nur eine Zeichenfolge in mein pub_sock.send eingeben kann, muss ich immer die kostspielige Operation ausführen, selbst wenn ZMQ gerade diese Nachricht absetzen soll, wenn keine Abonnenten vorhanden sind.

Ich habe eine Implementierung, die einen Backchannel-REQ/REP-Socket verwendet, um Topic-Informationen zu senden, die meine App in ihrer Veröffentlichungsschleife überprüfen kann und dabei nur Daten sammelt, die gesammelt werden müssen. Dies scheint jedoch sehr unelegant zu sein, da ZMQ bereits die Daten haben muss, die ich benötige, was durch seine Filterung beim Herausgeber belegt wird.

Ich bemerkte, dass in diesem mailing list message das OP in der Lage zu sehen Subscribe-Nachrichten zu einem XPUB-Socket gesendet werden scheint.

Allerdings wird nicht erwähnt, wie sie das gemacht haben, und ich sehe keine solche Fähigkeit in den Dokumenten (immer noch). Vielleicht benutzten sie nur Wireshark (um Upstream-Nachrichten an einen XPUB-Socket zu abonnieren).

+0

Ich habe zweimal auf dem #zeromq IRC-Kanal gepostet, um dies zu fragen, mit 6 Stunden Offsets um Zeitzonen zu helfen, aber bis jetzt keine Antwort erhalten haben. – dfarrell07

+0

Dies ist immer noch eine offene Frage, für die ich aktiv eine Antwort suche. – dfarrell07

+0

Haben Sie eine Antwort gefunden? Ich brauche auch meinen PUB-Server, um zu wissen, welche Filter auch abonniert sind.Der Server muss keine Daten erstellen, an denen keine Clients interessiert sind. (Beispiel: Sagen Sie, wenn SUB-Clients nur Wetterdaten für New York abonniert haben, dann sollte der PUB-Server die Daten nicht für jede andere Stadt der Welt erstellen müssen , nur um es wegzuwerfen.) –

Antwort

1

Zumindest für die XPUB/XSUB Buchse Fall, dass Sie manuell ein Abonnement Zustand durch Spedition und Handhabung der Pakete speichern:

context = zmq.Context() 

xsub_socket = context.socket(zmq.XSUB) 
xsub_socket.bind('tcp://*:10000') 
xpub_socket = context.socket(zmq.XPUB) 
xpub_socket.bind('tcp://*:10001') 

poller = zmq.Poller() 
poller.register(xpub_socket, zmq.POLLIN) 
poller.register(xsub_socket, zmq.POLLIN) 

while True: 
    try: 
     events = dict(poller.poll(1000)) 
    except KeyboardInterrupt: 
     break 

    if xpub_socket in events: 
     message = xpub_socket.recv_multipart() 

     # HERE goes some subscription handle code which inspects 
     # message 

     xsub_socket.send_multipart(message) 
    if xsub_socket in events: 
     message = xsub_socket.recv_multipart() 
     xpub_socket.send_multipart(message) 

(dies ist Python-Code, aber ich denke, C/C++ sehr ähnlich sieht)

Ich arbeite derzeit an diesem Thema und ich werde so schnell wie möglich weitere Informationen hinzufügen.

5

Mit dem Socket-Typ zmq.XPUB gibt es eine Möglichkeit, neue und abgehende Teilnehmer zu erkennen. Das folgende Codebeispiel zeigt, wie:

# Publisher side 
import zmq 

ctx = zmq.Context.instance() 
xpub_socket = ctx.socket(zmq.XPUB) 
xpub_socket.bind("tcp://*:%d" % port_nr) 
poller = zmq.Poller() 
poller.register(xpub_socket) 

events = dict(poller.poll(1000)) 
if xpub_socket in events: 
    msg = xpub_socket.recv() 
    if msg[0] == b'\x01': 
     topic = msg[1:] 
     print "Topic '%s': new subscriber" % topic 
    elif msg[0] == b'\x00': 
     topic = msg[1:] 
     print "Topic '%s': subscriber left" % topic 

Beachten Sie, dass der zmq.XSUB Socket-Typ nicht in der gleichen Weise wie bei den „normalen“ zmq.SUB nicht abonnieren. Codebeispiel:

# Subscriber side 
import zmq 
ctx = zmq.Context.instance() 

# Subscribing of zmq.SUB socket 
sub_socket = ctx.socket(zmq.SUB) 
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK 
sub_socket.connect("tcp://localhost:%d" % port_nr) 

# Subscribing zmq.XSUB socket 
xsub_socket = ctx.socket(zmq.XSUB) 
xsub_socket.connect("tcp://localhost:%d" % port_nr) 
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument 
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher 

Ich würde auch die zmq.XPUB_VERBOSE Socket-Option weisen darauf hin,. Wenn diese Option aktiviert ist, werden alle Abonnementereignisse für den Socket empfangen. Wenn nicht festgelegt, werden doppelte Abonnements gefiltert. Siehe auch den folgenden Beitrag: ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)