2012-10-23 10 views
9

Ich möchte überprüfen, ob ein Consumer/Worker vorhanden ist, um eine Nachricht zu konsumieren Ich bin im Begriff, zu senden.In Pika oder RabbitMQ, wie überprüfe ich, ob Verbraucher derzeit konsumieren?

Wenn es keine Arbeiter ist, würde ich einige Arbeiter beginnen (sowohl Verbraucher als auch Verlage auf einer einzigen Maschine sind) und dann gehen Sie über die Veröffentlichung Nachrichten.

Wenn es eine Funktion wie connection.check_if_has_consumers ist, würde ich es ein wenig wie diese umzusetzen -

import pika 
import workers 

# code for publishing to worker queue 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

# if there are no consumers running (would be nice to have such a function) 
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): 
    # start the workers in other processes, using python's `multiprocessing` 
    workers.start_workers() 

# now, publish with no fear of your queues getting filled up 
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) 
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", 
          properties=pika.BasicProperties(delivery_mode=2)) 
connection.close() 

Aber ich bin nicht in der Lage jede Funktion mit check_if_has_consumers Funktionalität zu finden in pika.

Gibt es einen Weg, dies zu erreichen, mit pika? oder vielleicht, von reden bis Das Kaninchen direkt?

Ich bin nicht ganz sicher, aber ich glaube wirklich RabbitMQ von der Anzahl der Verbraucher zu unterschiedlichen Warteschlangen abonnierten bewusst sein würde, da es Nachrichten sie tut Versand und akzeptiert acks

ich nur vor mit RabbitMQ 3 Stunden angefangen hat ... ist jede Hilfe willkommen ...

hier die workers.py Code schrieb ich, wenn seine jede Hilfe ....

import multiprocessing 
import pika 


def start_workers(num=3): 
    """start workers as non-daemon processes""" 
    for i in xrange(num):  
     process = WorkerProcess() 
     process.start() 


class WorkerProcess(multiprocessing.Process): 
    """ 
    worker process that waits infinitly for task msgs and calls 
    the `callback` whenever it gets a msg 
    """ 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.stop_working = multiprocessing.Event() 

    def run(self): 
     """ 
     worker method, open a channel through a pika connection and 
     start consuming 
     """ 
     connection = pika.BlockingConnection(
           pika.ConnectionParameters(host='localhost') 
        ) 
     channel = connection.channel() 
     channel.queue_declare(queue='worker_queue', auto_delete=False, 
                durable=True) 

     # don't give work to one worker guy until he's finished 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='worker_queue') 

     # do what `channel.start_consuming()` does but with stopping signal 
     while len(channel._consumers) and not self.stop_working.is_set(): 
      channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
     return 0 

    def signal_exit(self): 
     """exit when finished with current loop""" 
     self.stop_working.set() 

    def exit(self): 
     """exit worker, blocks until worker is finished and dead""" 
     self.signal_exit() 
     while self.is_alive(): # checking `is_alive()` on zombies kills them 
      time.sleep(1) 

    def kill(self): 
     """kill now! should not use this, might create problems""" 
     self.terminate() 
     self.join() 


def callback(channel, method, properties, body): 
    """pika basic consume callback""" 
    print 'GOT:', body 
    # do some heavy lifting here 
    result = save_to_database(body) 
    print 'DONE:', result 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

EDIT:

Ich habe nach vorne zu bewegen, so ist hier eine Abhilfe, die ich nehmen werde, es sei denn, ein besserer Ansatz entlang kommt,

So RabbitMQ hat diese HTTP management apis, arbeiten sie nach dem Einschalten des management plugin und Mitte HTTP apis Seite gedreht haben, gibt es

/api/Verbindungen - Eine Liste aller offenen Verbindungen.

/api/Verbindungen/Name - Eine individuelle Verbindung. LÖSCHEN wird die Verbindung schließen.

Also, wenn ich meine Arbeiter verbinden und meine Produziert sowohl durch unterschiedliche Verbindung Namen/Benutzer, werde ich in der Lage sein zu überprüfen, ob die Verbindung Worker offen ist ... (es möglicherweise Probleme, wenn Arbeiter stirbt ...)

wird auf eine bessere Lösung warten ...

EDIT:

fand gerade diese in den rabbitmq docs, aber dies wäre Hacky in Python zu tun:

[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers 
Listing queues ... 
worker_queue 0 
...done. 

so konnte ich etwas tun, wie,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

hacky ... hoffe immer noch Pika hat einige Python-Funktion, um dies zu tun ...

Danke,

Antwort

7

Ich habe gerade auch gerade in diese Sache hineingeschaut. Nach dem Durchlesen der Quelle und der Dokumente bin ich in channel.py auf folgendes gestoßen:

Meine eigenen Tests waren erfolgreich. Ich habe Folgendes verwendet, wobei mein Kanalobjekt self._channel ist:

if len(self._channel.consumer_tags) == 0: 
     LOGGER.info("Nobody is listening. I'll come back in a couple of minutes.") 
     ... 
0

Ich fand dies tatsächlich auf der Suche nach einem anderen Problem, aber eine Sache, die Ihnen helfen kann, ist in der Basic_Publish Funktion gibt es einen Parameter "Immediate", der auf False voreingestellt ist.

Eine Idee, die Sie tun können, ist, das Immediate Flag auf True zu setzen, was es erforderlich macht, dass es sofort von einem Verbraucher konsumiert wird, anstatt in einer Warteschlange zu sitzen. Wenn ein Worker nicht verfügbar ist, um die Nachricht zu konsumieren, tritt ein Fehler zurück, der Sie auffordert, einen anderen Worker zu starten.

Je nach Durchsatz Ihres Systems würde dies entweder eine Menge zusätzlicher Arbeitskräfte hervorbringen, oder Laicher erzeugen, um tote Arbeiter zu ersetzen. Für das frühere Problem können Sie ein Admin-ähnliches System schreiben, das die Arbeiter einfach über eine Kontrollwarteschlange verfolgt, wo Sie einen "Runner" -ähnlichen Prozess erzählen können, um Prozesse von Arbeitern zu töten, die jetzt nicht mehr notwendig sind.

Verwandte Themen