2010-09-21 11 views
12

Ich habe so etwas wie eine Jobwarteschlange über RabbitMQ und, auf eine Anfrage, einen Job abzubrechen, möchte ich die Aufgaben zurückziehen, die noch nicht verarbeitet wurden (ihre Nachrichten wurden nicht bestätigt), was entspricht um diese Nachrichten aus den Warteschlangen zurückzuziehen, an die sie weitergeleitet wurden.Wie kann ich eine Nachricht in RabbitMQ zurückziehen?

Ich habe diese Funktionalität nicht in AMQP oder in der RabbitMQ API gefunden; vielleicht habe ich nicht gut genug gesucht? Oder muss ich einen Workaround verwenden (es ist nicht schwer, aber immer noch)?

Antwort

2

Mindestens zwei Möglichkeiten, um Ihr Ziel zu erreichen:

+2

Nö, löst dies ein anderes Problem. Ich brauche nicht eine Nachricht auf der Verbraucherseite zu verwerfen, ich will seine Lieferung von der Herstellerseite abzubrechen, so dass die Nachricht nicht erreicht einen Verbraucher wie wenn es nie existiert. In meinem Problem kann ein Verbraucher nicht entscheiden, ob eine Nachricht abgelehnt werden soll. – jkff

+4

Wenn Nachrichten successfuly veröffentlicht und Sie müssen sie aus bekannten Warteschlange entfernen, dann abonnieren Sie es ot und verbrauchen sie in Produzenten. –

4

Mit RabbitMQ können Sie Nachrichten nicht ändern oder löschen, nachdem sie in die Warteschlange eingereiht wurden. Dafür möchten Sie eine Art Datenbank, die den Zustand jedes Jobs enthält, und RabbitMQ verwenden, um interessierte Parteien von Änderungen in diesem Zustand zu benachrichtigen.

Bei geringen Volumina können Sie sie zusammen mit einer Warteschlange pro Auftrag abklammern. Erstellen Sie die Warteschlange, buchen Sie die Jobbeschreibung in die Warteschlange, und geben Sie den Workern den Namen der Warteschlange bekannt. Wenn der Job vor der Verarbeitung abgebrochen werden muss, wurde die Warteschlange des Jobs gelöscht. Wenn die Arbeiter die Jobbeschreibung holen, werden sie feststellen, dass die Warteschlange verschwunden ist.

Leichter und in der Regel besser wäre die Verwendung von redis oder einem anderen Schlüssel-/Wertespeicher, um den Jobstatus zu halten (mit einem gelöschten oder fehlenden Datensatz, der einen abgebrochenen oder nicht existierenden Job bedeutet) und rabbitmq zu benachrichtigen über neu/entfernt/geändert Datensätze im Schlüssel/Wert-Speicher.

1

Sie müssen alle Warteschlangen abonnieren, an die Nachrichten weitergeleitet wurden, und sie mit ack konsumieren.

Zum Beispiel, wenn Sie zu einem Thema Austausch mit "Test" als Routing-Schlüssel veröffentlichen, und es gibt 3 persistente Warteschlangen, die "Test" abonnieren, müssten Sie diese drei Warteschlangen konsumieren. Es könnte besser sein, eine weitere Warteschlange hinzuzufügen, die auch Ihre Verbraucherprozesse hören würden, und ihnen zu sagen, dass sie diese Nachrichten ignorieren sollen.

Eine Alternative, da Sie RabbitMQ verwenden, besteht darin, ein benutzerdefiniertes Austausch-Plug-in zu schreiben, das einige Out-of-Band-Anweisungen zum Löschen aller Warteschlangen akzeptiert. Zum Beispiel könnte dieser Austausch einen speziellen Nachrichtenkopf lesen, der ihm sagt, alle Warteschlangen zu löschen, für die diese Nachricht bestimmt ist. Dies erfordert das Schreiben von Erlang-Code, aber es sind 4 verschiedene Austausch-Typen implementiert, so dass Sie nur den ähnlichsten kopieren und den Code für die neuen Bahaviours schreiben müssen. Wenn Sie hierfür nur benutzerdefinierte Header verwenden, kann der Nachrichtentext eine normale Nachricht für die Konsumenten sein.

Fazit:

1) der Verlag die Nachrichten selbst 2) der Verlag verbrauchen muss, kann eine spezielle Nachricht in einer speziellen Warteschlange sendet Verbraucher zu sagen, die Nachricht 3) der Verleger zu ignorieren schicken eine spezielle Nachricht an eine benutzerdefinierte Vermittlungsstelle, die alle vorhandenen Nachrichten aus den Warteschlangen löscht, bevor diese spezielle Nachricht an die Verbraucher gesendet wird.

7

würde ich dieses Szenario lösen, indem die Arbeiter Prüfstrahler eine Art autoritativen Daten, die zu bestimmen, ob das der Job gehen sollte oder nicht. Der Mitarbeiter würde beispielsweise den Status des Jobs in einer Datenbank prüfen, um festzustellen, ob der Job bereits storniert wurde.

Für Szenarien, in denen die Verarbeitungsgeschwindigkeit schneller als die Geschwindigkeit sein kann, mit der der autorisierende Speicher aktualisiert und gelesen werden kann, kann ein weniger garantierter Datenspeicher nützlich sein, der die Geschwindigkeit für andere Eigenschaften abwickelt.

Ein Beispiel hierfür wäre Redis als Speicher zu verwenden, für die Verarbeitung einer Nachricht Cancelling anstelle einer relationalen DB wie MySQL. Redis ist sehr schnell, gibt aber weniger Garantien für die Daten, die es enthält, während MySQL viel langsamer ist, aber mehr Garantien für die Daten bietet.

Am Ende wird das Konzept mit einer anderen Quelle der Überprüfung auf, ob eine Nachricht zu verarbeiten, ist die gleiche, aber die Art und Weise Sie, dass auf Ihrem speziellen Szenario hängt implementieren.

Verwandte Themen