2017-10-01 4 views
1

Ich bin neu in der Arbeit mit Nachrichtenaustausch und traf das Problem, das richtige Handbuch für die Aufgabe zu finden.RabbitMQ + Kombu: Schreiben/Lesen zu einmaligen Gebrauch Warteschlangen mit zufälligen Namen

ich brauche Pool von Warteschlangen zu organisieren, so dass:

  1. Producer einige zufällige leere Warteschlange erstellen und schreiben dort die ganze Packung von Nachrichten (100 Meldungen in der Regel).

  2. Consumer finden nicht leere und nicht gesperrte Warteschlange und lesen von ihm bis es ist leer und dann löschen Sie es und suchen Sie nach dem nächsten.

So ist meine Aufgabe, mit Nachrichten zu arbeiten, wie die Pakete, die ich verstehe, wie in einer Warteschlange mit demselben Schlüssel zu produzieren und konsumieren, aber nicht finden kann, wie mit dem Pool von Warteschlangen zu arbeiten.

Wir können mehrere Produzenten und Konsumenten parallel laufen lassen, aber es gibt keine Frage, welche von denen an wen senden. Wir brauchen und können keine bestimmten Produzenten mit bestimmten Konsumenten verbinden.

Allgemeine Aufgabe: wir haben viele Kunden Push-Benachrichtigungen zu erhalten, schiebt wir Gruppe von einigen Parametern später als Gruppe zu bearbeiten, so dass solche Gruppe sollte in RabbitMQ in einer Warteschlange als produziert und verbraucht werden eine Gruppe, aber jede Gruppe ist unabhängig von anderen Gruppen.

Vielen Dank an Hannu für die Hilfe: Grundidee seiner einfachen und robusten Lösung, dass wir eine persistente Warteschlange mit bekannten Namen haben können, wo der Produzent Namen der erstellten Warteschlangen schreibt und der Verbraucher diese Namen von dort liest.

Zu seiner Lösung besser lesbar und einfache Arbeit mit in Zukunft in meiner persönlichen Aufgabe, ich habe geteilt publish_data() in Produzenten in zwei Funktion zu machen - eine zufällige Warteschlange machen und schreiben Sie es ein anderes erhält diese random_queue control_queue und füllt es mit Nachrichten. Ähnliche Idee ist gut für den Verbraucher - eine Funktion zur Verarbeitung der Warteschlange, ein anderer wird für die Prozessnachricht selbst aufgerufen.

+1

Haben Sie mehrere Erzeuger und Verbraucher haben Anwendungen? Und ist die "andere mögliche Variante" auch akzeptabel? So würde ich es wahrscheinlich machen, oder eine Variante davon. – Hannu

+0

Hannu, ich aktualisierte die Frage mit Antworten auf Ihre Fragen. – DaneSoul

+0

Danke. Ich habe einen Antwortkandidaten gepostet. Es scheint nicht mit seiner "Kombu-ness", aber es funktioniert und wird zumindest eine Idee geben, wie man das löst. – Hannu

Antwort

1

Ich habe sowas aber mit Pika gemacht. Ich musste ein altes Code-Snippet für die Beispiele putzen und kombufy. Es ist wahrscheinlich nicht sehr kombuish (das ist mein absolut erstes Code-Snippet, das damit geschrieben wurde), aber so würde ich es lösen. Grundsätzlich würde ich eine Kontrollwarteschlange mit einem bekannten Namen einrichten.

Publisher erstellen einen zufälligen Warteschlangennamen für ein Nachrichtenpaket, geben N Nachrichten (in meinem Fall Nummern 1-42) aus und geben den Warteschlangennamen in die Steuerungswarteschlange ein. Ein Benutzer erhält dann diesen Warteschlangennamen, bindet sich an ihn, liest Nachrichten, bis die Warteschlange leer ist, und löscht dann die Warteschlange.

Dies hält die Dinge relativ einfach, da Publisher nicht herausfinden müssen, wo sie ihre Datengruppen veröffentlichen dürfen (jede Warteschlange ist neu mit einem zufälligen Namen). Empfänger müssen sich keine Gedanken über Zeitüberschreitungen oder "alles erledigt" -Nachrichten machen, da ein Empfänger nur dann einen Warteschlangennamen erhalten würde, wenn eine Gruppe von Daten in die Warteschlange geschrieben wurde und jede Nachricht dort wartet.

Es gibt auch keine Notwendigkeit, mit Sperren oder Signalisierung oder etwas anderes, das die Dinge komplizieren würde basteln. Sie können so viele Konsumenten und Produzenten haben, wie Sie möchten.Und natürlich den Austausch und die Routing-Schlüssel dort verwendet, könnte verschiedene Gruppen von Verbrauchern für verschiedene Aufgaben sein usw.

Verlag

from kombu import Connection 
import uuid 
from time import sleep 
def publish_data(conn): 
    random_name= "q" + str(uuid.uuid4()).replace("-", "") 
    random_queue = conn.SimpleQueue(random_name) 
    for i in xrange(0, 42): 
     random_queue.put(i) 
    random_queue.close() 
    return random_name 


with Connection('amqp://guest:[email protected]:5672//') as conn: 
    control_queue = conn.SimpleQueue('control_queue') 
    _a = 0 
    while True: 
     y_name = publish_data(conn) 
     message = y_name 
     control_queue.put(message) 
     print('Sent: {0}'.format(message)) 
     _a += 1 
     sleep(0.3) 
     if _a > 20: 
      break 

    control_queue.close() 

Consumer

from Queue import Empty 

from kombu import Connection, Queue 


def process_msg(foo): 
    print str(foo) 
    with Connection("amqp://guest:[email protected]:5672//") as _conn: 
     sub_queue = _conn.SimpleQueue(str(foo)) 
     while True: 
      try: 
       _msg = sub_queue.get(block=False) 
       print _msg.payload 
       _msg.ack() 
      except Empty: 
       break 
     sub_queue.close() 
     chan = _conn.channel() 
     dq = Queue(name=str(foo), exchange="") 
     bdq = dq(chan) 
     bdq.delete() 


with Connection('amqp://guest:[email protected]:5672//') as conn: 
    rec = conn.SimpleQueue('control_queue') 
    while True: 
     msg = rec.get(block=True) 
     entry = msg.payload 
     msg.ack() 
     process_msg(entry) 
+0

Vielen Dank! Ich habe deine Idee verstanden und kurz getestet, bisher sieht das gut aus. Ich werde es mehr testen und in ein paar Tagen annehmen. – DaneSoul

+0

Es gibt natürlich viele andere Möglichkeiten, dies zu tun, aber ich mag das, weil es keine Signalisierung, Synchronisation oder Kontrollstrukturen außer der Verwendung von Kontrollkanal benötigt. Dadurch wird Rabbit all Ihre Terminplanung, Verteilung und Nachrichtenübermittlung für Sie erledigen, und es tut es gut, wie es für genau das entwickelt wurde. Die einzige gemeinsame Schnittstelle ist der Kontrollkanal. Es ist kein bidirektionaler Datenverkehr oder eine Kommunikation zwischen Peer-Prozessen erforderlich. Alles kann und soll getrennt bleiben, und mit acks ackern bietet Kaninchen auch Ihnen einen ausfallsicheren Mechanismus. – Hannu

+0

Ja, dieser Code funktioniert, und der Grundgedanke der Verwendung von control_queue zum Übertragen von Warteschlangennamen ist brillant, dies ist der Schlüssel des Problems, das mir fehlte. Um Lösung in Zukunft lesbarer und einfacher zu arbeiten, habe ich publish_data() in Producer in zwei Funktionen aufgeteilt - eine zufällige Warteschlange erstellen und sie in control_queue schreiben, eine andere diese random_queue empfangen und sie mit Nachrichten füllen. Ähnliche Idee ist gut für den Verbraucher - eine Funktion zur Verarbeitung der Warteschlange, eine andere wird für die Prozessnachricht selbst aufgerufen werden. – DaneSoul

Verwandte Themen