2015-06-16 3 views
6

Ich habe das Last Value Caching (LVC) Beispiel von ZMQ (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching) implementiert, kann aber keinen zweiten Subskribenten zur Registrierung am Backend veranlassen.ZMQ: Keine Subskriptionsnachricht am XPUB-Socket für mehrere Abonnenten (Last Value Caching Pattern)

Wenn ein Teilnehmer zum ersten Mal an Bord kommt, ist die event[0] == b'\x01' Bedingung erfüllt und der zwischengespeicherte Wert wird gesendet, aber der zweite Teilnehmer (dasselbe Thema) registriert nicht einmal (if backend in events: ist nie wahr). Alles andere funktioniert gut. Daten werden vom Herausgeber an die Abonnenten (alle) übergeben.

Was könnte der Grund dafür sein? Ist die Art und Weise, wie das Backend verbunden ist, korrekt? Soll dieses Muster nur mit dem ersten Teilnehmer funktionieren?

aktualisieren

Als ich den zweiten Teilnehmer zu einem anderen Thema zu abonnieren, bekomme ich das richtige Verhalten (das heißt \x01 bei der Anmeldung). Dies scheint wirklich für den ersten Teilnehmer zu funktionieren. Ist das ein Fehler in ZeroMQ?

Update 2

Hier ist ein minimales Beispiel arbeiten, dass das Muster nicht arbeiten (zumindest nicht so, wie es ist hier implementiert) LVC zeigt ist.

# subscriber.py 
import zmq 

def main(): 
    ctx = zmq.Context.instance() 
    sub = ctx.socket(zmq.SUB) 
    sub.connect("tcp://127.0.0.1:5558") 

    # Subscribe to every single topic from publisher 
    print 'subscribing (sub side)' 
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic") 

    poller = zmq.Poller() 
    poller.register(sub, zmq.POLLIN) 
    while True: 
     try: 
      events = dict(poller.poll(1000)) 
     except KeyboardInterrupt: 
      print("interrupted") 
      break 

     # Any new topic data we cache and then forward 
     if sub in events: 
      msg = sub.recv_multipart() 
      topic, current = msg 
      print 'received %s on topic %s' % (current, topic) 

if __name__ == '__main__': 
    main() 

Und hier ist der Broker (wie im Beispiel, aber mit etwas mehr Ausführlichkeit und einem integrierten Verlag).

# broker.py 
# from http://zguide.zeromq.org/py:lvcache 
import zmq 
import threading 
import time 


class Publisher(threading.Thread): 
    def __init__(self): 
     super(Publisher, self).__init__() 

    def run(self): 
     time.sleep(10) 
     ctx = zmq.Context.instance() 
     pub = ctx.socket(zmq.PUB) 
     pub.connect("tcp://127.0.0.1:5557") 

     cnt = 0 
     while True: 
      msg = 'hello %d' % cnt 
      print 'publisher is publishing %s' % msg 
      pub.send_multipart(['my-topic', msg]) 
      cnt += 1 
      time.sleep(5) 


def main(): 
    ctx = zmq.Context.instance() 
    frontend = ctx.socket(zmq.SUB) 
    frontend.bind("tcp://*:5557") 
    backend = ctx.socket(zmq.XPUB) 
    backend.bind("tcp://*:5558") 

    # Subscribe to every single topic from publisher 
    frontend.setsockopt(zmq.SUBSCRIBE, b"") 

    # Store last instance of each topic in a cache 
    cache = {} 

    # We route topic updates from frontend to backend, and 
    # we handle subscriptions by sending whatever we cached, 
    # if anything: 
    poller = zmq.Poller() 
    poller.register(frontend, zmq.POLLIN) 
    poller.register(backend, zmq.POLLIN) 


    # launch a publisher 
    p = Publisher() 
    p.daemon = True 
    p.start() 

    while True: 

     try: 
      events = dict(poller.poll(1000)) 
     except KeyboardInterrupt: 
      print("interrupted") 
      break 

     # Any new topic data we cache and then forward 
     if frontend in events: 
      msg = frontend.recv_multipart() 
      topic, current = msg 
      cache[topic] = current 
      backend.send_multipart(msg) 

     ### this is where it fails for the 2nd subscriber. 
     ### There's never even an event from the backend 
     ### in events when the 2nd subscriber is subscribing. 

     # When we get a new subscription we pull data from the cache: 
     if backend in events: 
      print 'message from subscriber' 
      event = backend.recv() 
      # Event is one byte 0=unsub or 1=sub, followed by topic 
      if event[0] == b'\x01': 
       topic = event[1:] 
       print ' => subscribe to %s' % topic 
       if topic in cache: 
        print ("Sending cached topic %s" % topic) 
        backend.send_multipart([ topic, cache[topic] ]) 
      elif event[0] == b'\x00': 
       topic = event[1:] 
       print ' => unsubscribe from %s' % topic 

if __name__ == '__main__': 
    main() 

diesen Code Laufen (1 x broker.py, 2 x subscriber.py) zeigt, dass der erste Teilnehmer an dem Broker registriert, wie erwartet (\x01 und Cache-Lookup), aber der zweite Teilnehmer erhält nicht die gleiche Art und Weise registriert. Interessanterweise ist der zweite Teilnehmer an den Pub/Sub-Kanal angeschlossen, da nach einer Weile (10 Sekunden) beide Teilnehmer Daten vom Herausgeber erhalten.

Das ist sehr seltsam. Vielleicht sind einige meiner Bibliotheken veraltet. Hier ist, was ich habe:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import zmq 
>>> zmq.__version__ 
'14.1.1' 

$ brew info zeromq 
zeromq: stable 4.0.5 (bottled), HEAD 
High-performance, asynchronous messaging library 
http://www.zeromq.org/ 
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) * 
    Poured from bottle 
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb 
==> Dependencies 
Build: pkg-config ✔ 
Optional: libpgm ✘, libsodium ✘ 

Update 3

Dieses Verhalten kann auch in zeromq 4.1.2 und pyzmq-14.7.0 beobachtet werden (mit oder ohne libpgm und libsodium installiert ist).

Update 4

Eine weitere Beobachtung legt nahe, dass der erste Teilnehmer irgendwie anders gehandhabt wird: Der erste Teilnehmer in der erwarteten Art und Weise ist die einzige, von dem XPUB Buchse abzumelden (backend) durch ihren Bezug Thema vorhergehende mit \x00 . Die anderen Teilnehmer (ich habe mehr als 2 versucht) blieben auf dem Backend-Kanal stumm (obwohl sie Nachrichten empfangen).

Update 5

Ich hoffe, ich bin nicht ein Kaninchen Loch nach unten geht, aber ich habe blickte in die czmq Bindungen und lief mein Python Beispiel in C.Die Ergebnisse sind die gleichen, so dass ich denke, es ist nicht ein Problem mit den Bindungen, aber mit libzmq.

Ich überprüfte auch, dass der zweite Teilnehmer eine Nachricht abonnieren sendet und in der Tat kann ich das auf dem Draht sehen:

Erste abonnieren:

0000 02 00 00 00 45 00 00 3f 98 be 40 00 40 06 00 00 ....E..? [email protected]@... 
0010 7f 00 00 01 7f 00 00 01 fa e5 15 b6 34 f0 51 c3 ........ ....4.Q. 
0020 05 e4 8b 77 80 18 31 d4 fe 33 00 00 01 01 08 0a ...w..1. .3...... 
0030 2a aa d1 d2 2a aa cd e9 00 09 01 6d 79 2d 74 6f *...*... ...my-to 
0040 70 69 63           pic    

zweite Nachricht mit Differenz abonnieren (oben) markiert und erklärt. Die gleichen Daten werden im Subskriptionsrahmen gesendet.

       identification 
           v 
0000 02 00 00 00 45 00 00 3f ed be 40 00 40 06 00 00 ....E..? [email protected]@... 
          src port  sequence number 
            v  v v v v 
0010 7f 00 00 01 7f 00 00 01 fa e6 15 b6 17 da 02 e7 ........ ........ 

Acknowledgement number window scaling factor 
     v v v v   v 
0020 71 4b 33 e6 80 18 31 d5 fe 33 00 00 01 01 08 0a qK3...1. .3...... 

timestamp value timestamp echo reply 
      v   v v |<-------- data ------- 
0030 2a aa f8 2c 2a aa f4 45 00 09 01 6d 79 2d 74 6f *..,*..E ...my-to 

     ------>| 
0040 70 69 63           pic    

Antwort

7

fand ich die Lösung für dieses Problem, und obwohl ich die docs vorne lesen und nach vorne Rücken an Rücken, ich hatte es nicht gesehen. Der Schlüssel ist XPUB_VERBOSE. Fügen Sie diese Zeile nach dem Back-End-Initialisierung und alles funktioniert

backend.setsockopt(zmq.XPUB_VERBOSE, True) 

Hier ist ein Auszug from the official documentation:

ZMQ_XPUB_VERBOSE: bieten alle Abonnement-Nachrichten auf XPUB Steckdosen Legt die XPUB Buchse Verhalten auf neue Abonnements und Abmeldungen Ein Wert von 0 ist der Standard und übergibt nur neue Abonnement Nachrichten Upstream. Ein Wert von 1 übergibt alle Subskriptionsnachrichten .

Optionswert Typ int Optionswert Einheit 0, 1 Standardwert 0 Anwendbar Socket-Typen ZMQ_XPUB

Pieter Hintjens hat einige weitere Informationen zu diesem in his blog. Dies ist der entsprechende Abschnitt:

Vor ein paar Monaten haben wir eine nette kleine Option (ZMQ_XPUB_VERBOSE) zu XPUB Steckdosen, die ihre Filterung von doppelten Abonnements deaktiviert. Dies funktioniert jetzt für eine beliebige Anzahl von Abonnenten. Wir verwenden diese wie folgt:

void *publisher = zsocket_new (ctx, ZMQ_XPUB); 
zsocket_set_xpub_verbose (publisher, 1); 
zsocket_bind (publisher, "tcp://*:6001"); 

Die LVC Musterbeschreibung aktualisiert werden soll, diese Einstellung zu reflektieren, wie dieses Muster sonst nicht funktionieren.