Ich habe das Last Value Caching (LVC) Beispiel von ZMQ (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching) implementiert, kann aber keinen zweiten Subskribenten zur Registrierung am Backend veranlassen.ZMQ: Keine Subskriptionsnachricht am XPUB-Socket für mehrere Abonnenten (Last Value Caching Pattern)
Wenn ein Teilnehmer zum ersten Mal an Bord kommt, ist die event[0] == b'\x01'
Bedingung erfüllt und der zwischengespeicherte Wert wird gesendet, aber der zweite Teilnehmer (dasselbe Thema) registriert nicht einmal (if backend in events:
ist nie wahr). Alles andere funktioniert gut. Daten werden vom Herausgeber an die Abonnenten (alle) übergeben.
Was könnte der Grund dafür sein? Ist die Art und Weise, wie das Backend verbunden ist, korrekt? Soll dieses Muster nur mit dem ersten Teilnehmer funktionieren?
aktualisieren
Als ich den zweiten Teilnehmer zu einem anderen Thema zu abonnieren, bekomme ich das richtige Verhalten (das heißt \x01
bei der Anmeldung). Dies scheint wirklich für den ersten Teilnehmer zu funktionieren. Ist das ein Fehler in ZeroMQ?
Update 2
Hier ist ein minimales Beispiel arbeiten, dass das Muster nicht arbeiten (zumindest nicht so, wie es ist hier implementiert) LVC zeigt ist.
# subscriber.py
import zmq
def main():
ctx = zmq.Context.instance()
sub = ctx.socket(zmq.SUB)
sub.connect("tcp://127.0.0.1:5558")
# Subscribe to every single topic from publisher
print 'subscribing (sub side)'
sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")
poller = zmq.Poller()
poller.register(sub, zmq.POLLIN)
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
print("interrupted")
break
# Any new topic data we cache and then forward
if sub in events:
msg = sub.recv_multipart()
topic, current = msg
print 'received %s on topic %s' % (current, topic)
if __name__ == '__main__':
main()
Und hier ist der Broker (wie im Beispiel, aber mit etwas mehr Ausführlichkeit und einem integrierten Verlag).
# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time
class Publisher(threading.Thread):
def __init__(self):
super(Publisher, self).__init__()
def run(self):
time.sleep(10)
ctx = zmq.Context.instance()
pub = ctx.socket(zmq.PUB)
pub.connect("tcp://127.0.0.1:5557")
cnt = 0
while True:
msg = 'hello %d' % cnt
print 'publisher is publishing %s' % msg
pub.send_multipart(['my-topic', msg])
cnt += 1
time.sleep(5)
def main():
ctx = zmq.Context.instance()
frontend = ctx.socket(zmq.SUB)
frontend.bind("tcp://*:5557")
backend = ctx.socket(zmq.XPUB)
backend.bind("tcp://*:5558")
# Subscribe to every single topic from publisher
frontend.setsockopt(zmq.SUBSCRIBE, b"")
# Store last instance of each topic in a cache
cache = {}
# We route topic updates from frontend to backend, and
# we handle subscriptions by sending whatever we cached,
# if anything:
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
# launch a publisher
p = Publisher()
p.daemon = True
p.start()
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
print("interrupted")
break
# Any new topic data we cache and then forward
if frontend in events:
msg = frontend.recv_multipart()
topic, current = msg
cache[topic] = current
backend.send_multipart(msg)
### this is where it fails for the 2nd subscriber.
### There's never even an event from the backend
### in events when the 2nd subscriber is subscribing.
# When we get a new subscription we pull data from the cache:
if backend in events:
print 'message from subscriber'
event = backend.recv()
# Event is one byte 0=unsub or 1=sub, followed by topic
if event[0] == b'\x01':
topic = event[1:]
print ' => subscribe to %s' % topic
if topic in cache:
print ("Sending cached topic %s" % topic)
backend.send_multipart([ topic, cache[topic] ])
elif event[0] == b'\x00':
topic = event[1:]
print ' => unsubscribe from %s' % topic
if __name__ == '__main__':
main()
diesen Code Laufen (1 x broker.py
, 2 x subscriber.py
) zeigt, dass der erste Teilnehmer an dem Broker registriert, wie erwartet (\x01
und Cache-Lookup), aber der zweite Teilnehmer erhält nicht die gleiche Art und Weise registriert. Interessanterweise ist der zweite Teilnehmer an den Pub/Sub-Kanal angeschlossen, da nach einer Weile (10 Sekunden) beide Teilnehmer Daten vom Herausgeber erhalten.
Das ist sehr seltsam. Vielleicht sind einige meiner Bibliotheken veraltet. Hier ist, was ich habe:
Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'
$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ✔
Optional: libpgm ✘, libsodium ✘
Update 3
Dieses Verhalten kann auch in zeromq 4.1.2
und pyzmq-14.7.0
beobachtet werden (mit oder ohne libpgm und libsodium installiert ist).
Update 4
Eine weitere Beobachtung legt nahe, dass der erste Teilnehmer irgendwie anders gehandhabt wird: Der erste Teilnehmer in der erwarteten Art und Weise ist die einzige, von dem XPUB
Buchse abzumelden (backend
) durch ihren Bezug Thema vorhergehende mit \x00
. Die anderen Teilnehmer (ich habe mehr als 2 versucht) blieben auf dem Backend-Kanal stumm (obwohl sie Nachrichten empfangen).
Update 5
Ich hoffe, ich bin nicht ein Kaninchen Loch nach unten geht, aber ich habe blickte in die czmq
Bindungen und lief mein Python Beispiel in C.Die Ergebnisse sind die gleichen, so dass ich denke, es ist nicht ein Problem mit den Bindungen, aber mit libzmq
.
Ich überprüfte auch, dass der zweite Teilnehmer eine Nachricht abonnieren sendet und in der Tat kann ich das auf dem Draht sehen:
Erste abonnieren:
0000 02 00 00 00 45 00 00 3f 98 be 40 00 40 06 00 00 ....E..? [email protected]@...
0010 7f 00 00 01 7f 00 00 01 fa e5 15 b6 34 f0 51 c3 ........ ....4.Q.
0020 05 e4 8b 77 80 18 31 d4 fe 33 00 00 01 01 08 0a ...w..1. .3......
0030 2a aa d1 d2 2a aa cd e9 00 09 01 6d 79 2d 74 6f *...*... ...my-to
0040 70 69 63 pic
zweite Nachricht mit Differenz abonnieren (oben) markiert und erklärt. Die gleichen Daten werden im Subskriptionsrahmen gesendet.
identification
v
0000 02 00 00 00 45 00 00 3f ed be 40 00 40 06 00 00 ....E..? [email protected]@...
src port sequence number
v v v v v
0010 7f 00 00 01 7f 00 00 01 fa e6 15 b6 17 da 02 e7 ........ ........
Acknowledgement number window scaling factor
v v v v v
0020 71 4b 33 e6 80 18 31 d5 fe 33 00 00 01 01 08 0a qK3...1. .3......
timestamp value timestamp echo reply
v v v |<-------- data -------
0030 2a aa f8 2c 2a aa f4 45 00 09 01 6d 79 2d 74 6f *..,*..E ...my-to
------>|
0040 70 69 63 pic