Ich möchte überprüfen, ob ein Consumer/Worker vorhanden ist, um eine Nachricht zu konsumieren Ich bin im Begriff, zu senden.In Pika oder RabbitMQ, wie überprüfe ich, ob Verbraucher derzeit konsumieren?
Wenn es keine Arbeiter ist, würde ich einige Arbeiter beginnen (sowohl Verbraucher als auch Verlage auf einer einzigen Maschine sind) und dann gehen Sie über die Veröffentlichung Nachrichten.
Wenn es eine Funktion wie connection.check_if_has_consumers
ist, würde ich es ein wenig wie diese umzusetzen -
import pika
import workers
# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
# start the workers in other processes, using python's `multiprocessing`
workers.start_workers()
# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
properties=pika.BasicProperties(delivery_mode=2))
connection.close()
Aber ich bin nicht in der Lage jede Funktion mit check_if_has_consumers
Funktionalität zu finden in pika.
Gibt es einen Weg, dies zu erreichen, mit pika? oder vielleicht, von reden bis Das Kaninchen direkt?
Ich bin nicht ganz sicher, aber ich glaube wirklich RabbitMQ von der Anzahl der Verbraucher zu unterschiedlichen Warteschlangen abonnierten bewusst sein würde, da es Nachrichten sie tut Versand und akzeptiert acks
ich nur vor mit RabbitMQ 3 Stunden angefangen hat ... ist jede Hilfe willkommen ...
hier die workers.py Code schrieb ich, wenn seine jede Hilfe ....
import multiprocessing
import pika
def start_workers(num=3):
"""start workers as non-daemon processes"""
for i in xrange(num):
process = WorkerProcess()
process.start()
class WorkerProcess(multiprocessing.Process):
"""
worker process that waits infinitly for task msgs and calls
the `callback` whenever it gets a msg
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_working = multiprocessing.Event()
def run(self):
"""
worker method, open a channel through a pika connection and
start consuming
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.queue_declare(queue='worker_queue', auto_delete=False,
durable=True)
# don't give work to one worker guy until he's finished
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='worker_queue')
# do what `channel.start_consuming()` does but with stopping signal
while len(channel._consumers) and not self.stop_working.is_set():
channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
return 0
def signal_exit(self):
"""exit when finished with current loop"""
self.stop_working.set()
def exit(self):
"""exit worker, blocks until worker is finished and dead"""
self.signal_exit()
while self.is_alive(): # checking `is_alive()` on zombies kills them
time.sleep(1)
def kill(self):
"""kill now! should not use this, might create problems"""
self.terminate()
self.join()
def callback(channel, method, properties, body):
"""pika basic consume callback"""
print 'GOT:', body
# do some heavy lifting here
result = save_to_database(body)
print 'DONE:', result
channel.basic_ack(delivery_tag=method.delivery_tag)
EDIT:
Ich habe nach vorne zu bewegen, so ist hier eine Abhilfe, die ich nehmen werde, es sei denn, ein besserer Ansatz entlang kommt,
So RabbitMQ hat diese HTTP management apis, arbeiten sie nach dem Einschalten des management plugin und Mitte HTTP apis Seite gedreht haben, gibt es
/api/Verbindungen - Eine Liste aller offenen Verbindungen.
/api/Verbindungen/Name - Eine individuelle Verbindung. LÖSCHEN wird die Verbindung schließen.
Also, wenn ich meine Arbeiter verbinden und meine Produziert sowohl durch unterschiedliche Verbindung Namen/Benutzer, werde ich in der Lage sein zu überprüfen, ob die Verbindung Worker offen ist ... (es möglicherweise Probleme, wenn Arbeiter stirbt ...)
wird auf eine bessere Lösung warten ...
EDIT:
fand gerade diese in den rabbitmq docs, aber dies wäre Hacky in Python zu tun:
[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue 0
...done.
so konnte ich etwas tun, wie,
subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")
hacky ... hoffe immer noch Pika hat einige Python-Funktion, um dies zu tun ...
Danke,