Für die Serverautomatisierung versuchen wir ein Tool zu entwickeln, das viele Aufgaben auf verschiedenen Servern bewältigen und ausführen kann. Wir senden die Aufgabe und den Hostnamen des Servers in eine Warteschlange. Die Warteschlange wird dann von einem Anforderer verbraucht, der die Information an die ansible API liefert. Um das zu erreichen, können wir mehr als eine Aufgabe gleichzeitig ausführen, wir verwenden Threading.Pika: Verbraucht die nächste Nachricht, selbst wenn die letzte Nachricht nicht bestätigt wurde
Jetzt sind wir mit der Bestätigung der Nachricht hängen geblieben ...
Was wir bisher getan haben:
Die requester.py
verbraucht die Warteschlange und startet dann einen Thread, in dem die ansible Aufgabe ausgeführt wird. Das Ergebnis wird dann in eine andere Warteschlange gesendet. So erstellt jede neue Nachricht einen neuen Thread. Ist die Aufgabe erledigt, stirbt der Thread.
Aber jetzt kommt schwieriger Teil. Wir müssen die Nachrichten persistent machen, falls unser Server stirbt. So sollte jede Nachricht bestätigt werden nach das Ergebnis von Ansible wurde zurückgesendet.
Unser Problem ist jetzt, wenn wir versuchen, die Nachricht im Thread selbst zu bestätigen, gibt es nicht mehr "gleichzeitig" Arbeit getan, weil die consume
von Pika auf die Bestätigung wartet. Wie können wir erreichen, dass die consume
Nachrichten konsumiert und nicht auf die Bestätigung wartet? Oder wie können wir unser kleines Programm verbessern oder verbessern?
requester.py
#!/bin/python
from worker import *
import ansible.inventory
import ansible.runner
import threading
class Requester(Worker):
def __init__(self):
Worker.__init__(self)
self.connection(self.selfhost, self.from_db)
self.receive(self.from_db)
def send(self, result, ch, method):
self.channel.basic_publish(exchange='',
routing_key=self.to_db,
body=result,
properties=pika.BasicProperties(
delivery_mode=2,
))
print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)
def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0
result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)
def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
t.start()
worker.py
import pika
import ConfigParser
import json
import os
class Worker(object):
def __init__(self):
#read some config files
def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")
def receive(self, queue):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback,queue=queue)
self.channel.start_consuming()
def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=server,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
Wir mit Python arbeiten sind 2.7 und pika 0.10.0.
Und ja, wir bemerkten in der Pika FAQ: http://pika.readthedocs.io/en/0.10.0/faq.html
, dass Pika nicht Thread sicher ist.
Super! Vielen Dank! Wie kann ich diese Prefetch-Zählung übersteuern? Das macht all die Magie. – Rumpli
@Rumpli Ich habe dies zur Antwort hinzugefügt. Jetzt werde ich hier meinen eigenen Schaden anrichten, aber da du neu hier bist, werde ich kurz erklären, wie du die Antworten aufhebst und akzeptierst: Wenn dir die Antwort hilft, gib ihr eine Stimme. Wenn es Ihr Problem löst, geben Sie ihm eine positive Bewertung und akzeptieren Sie es. Hier hast du nur ohne upvote akzeptiert, aber du hast es nicht versucht, wenn es funktioniert :) Vielleicht nur upvote für jetzt, und sobald du bestätigst akzeptiere auch. Bitte korrigieren Sie mich, wenn ich diese Abstimmung nicht korrekt erklärt habe. – cantSleepNow
Danke für Ihre Erklärung. Ich habe das ausprobiert und mit dem Setzen von 'channel.basic_qos (prefetch_count = 1)' auf mehr als '1', tut es zu der Zeit mehr als eine Aufgabe. Und ich habe versucht, deine Antwort zu verbessern, aber solange ich nicht 15 Ruf habe, wird es es nicht anzeigen ... :( – Rumpli