2017-08-23 3 views
3

Ich habe Probleme mit einem PUB/SUB in ZeroMQ.ZeroMQ SUB empfängt nie Nachrichten

Nachdem alles verbindet, veröffentlicht Publisher alle Nachrichten (Buchse des Nachrichtens kehrt true), aber die SUB nie empfängt sie und sperrt für immer auf .recv() Funktion.

Hier ist der Code ich verwende:

void startPublisher() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB); 
    zmq_socket.bind("tcp://127.0.0.1:58951"); 

    zmq::message_t msg(3); 
    memcpy(msg.data(), "abc", 3); 

    for(int i = 0; i < 10; i++) 
     zmq_socket.send(msg); // <-- always true 
} 

void startSubscriber() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB); 

    zmq_socket.connect("tcp://127.0.0.1:58951"); 
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); // allow all messages 

    zmq::message_t msg(3); 
    zmq_socket.recv(&msg); // <-- blocks forever (message never received?) 
} 

Bitte beachten Sie, dass ich diese zwei Funktionen in zwei verschiedenen Threads bin runing, beginnend SUB Faden zunächst für einige Zeit warten und dann Verleger Thread starten (auch versucht, andersherum mit dem Verleger Senden von Nachrichten in einer Endlosschleife, hat aber nicht funktioniert).

Was mache ich hier falsch?

+2

Der Abonnent muss zuerst als Teil der Dokumentation/Beispiel und dann der Herausgeber ausgeführt werden. Fügen Sie eine Verzögerung zwischen dem Start des Abonnententhreads und dem Start des Publisher-Threads hinzu, um festzustellen, ob dies einen Unterschied macht. Sie können nachher eine korrekte Synchronisation hinzufügen. Setze deine Binde auch so bitte: "tcp: // *: 58951" –

+0

Hallo, ich habe gerade dieses Problem gelöst ... Das nennt sich "slow joiner" und wird [hier] beschrieben (http://zguide.org/) .zeromq.org/page: all) im Detail. Kurz gesagt: Es dauert einige Zeit, bis der TCP-Handshake beendet ist und die Verbindung hergestellt ist. Aber du hattest absolut recht. – carobnodrvo

+1

Dies könnte gelöst werden, indem ein geeigneter Synchronisationsmechanismus zwischen dem Start des Subs und Pub bereitgestellt wird. –

Antwort

3

Basierend auf Ihrem Beispiel funktioniert der folgende Code für mich. Das Problem ist, dass das PUB/SUB-Pattern ein langsamer Joiner ist. Das bedeutet, dass Sie nach dem Binden des PUB-Sockels und vor dem Senden einer Nachricht etwas warten müssen.

#include <thread> 
#include <zmq.hpp> 
#include <iostream> 
#include <unistd.h> 
void startPublisher() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB); 
    zmq_socket.bind("tcp://127.0.0.1:58951"); 
    usleep(100000); // Sending message too fast after connexion will result in dropped message 
    zmq::message_t msg(3); 
    for(int i = 0; i < 10; i++) { 
     memcpy(msg.data(), "abc", 3); 
     zmq_socket.send(msg); // <-- always true 
     msg.rebuild(3); 
     usleep(1); // Temporisation between message; not necessary 
    } 
} 
volatile bool run = false; 
void startSubscriber() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB); 
    zmq_socket.connect("tcp://127.0.0.1:58951"); 
    std::string TOPIC = ""; 
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages 
    zmq_socket.setsockopt(ZMQ_RCVTIMEO, 1000); // Timeout to get out of the while loop 
    while(run) { 
     zmq::message_t msg; 
     int rc = zmq_socket.recv(&msg); // Works fine 
     if(rc) // Do no print trace when recv return from timeout 
      std::cout << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl; 
    } 
} 
int main() { 
    run = true; 
    std::thread t_sub(startSubscriber); 
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern 
    std::thread t_pub(startPublisher); 
    t_pub.join(); 
    sleep(1); 
    run = false; 
    t_sub.join(); 
} 
+3

Mit dem 'ZMQ_RCVTIMEO' zu beginnen, ist eine Flucht aus der Arbeit des richtigen Designers. Es gibt sowohl eine explizit nicht-blockierende '.recv()' -Rufsyntax als auch eine '.poll()' -Instrumentation, da man keine (prinzipiell schlechte) Praxis des Blockierens benötigt. Professionelle Distributed-Computing-Systeme sollten niemals blockieren (aus vielen offensichtlichen Gründen - der Code ist während der gesamten Dauer völlig außer Kontrolle, möglicherweise unendlich, des blockierenden Zustands usw.), also replizieren Sie keine Schulbuchbeispiele , wo der Code-Designer einfach war und einige Platzlimits für einige SLOCs hatte. – user3666197

+0

Als RTOS-Experte sollte dies klar und deutlich klingen: o) – user3666197

+0

Ja, es gibt viel bessere Muster für ein verteiltes PUB/SUB-System. Ich habe gerade versucht, die Eigenschaft der langsamen Tischler zu betonen, während ich so nah wie OPs Beispiel blieb. Du hast absolut recht :). Aber für ein triviales Beispiel eines PUB/SUB-Threads scheint es gut genug zu sein. – Clonk

Verwandte Themen