2012-04-08 6 views
1

Ich habe eine Erinnerung Typ App, die Aufgaben in Sellerie mit dem "Eta" -Argument geplant. Wenn sich die Parameter im Erinnerungsobjekt ändern (z. B. Zeitpunkt der Erinnerung), widerrufe ich die zuvor gesendete Aufgabe und stelle eine neue Aufgabe in eine Warteschlange., wie widerrufen Aufgaben in über mehrere selleryd Prozesse

Ich habe mich gefragt, ob es eine gute Möglichkeit gibt, die widerrufenen Aufgaben über Neustarts von selleryd nachzuverfolgen. Ich würde gerne die Möglichkeit haben, sellerydische Prozesse im laufenden Betrieb hoch/runter zu skalieren, und es scheint, dass alle selleryd-Prozesse, die nach dem Senden des revoke-Befehls gestartet wurden, diese Aufgabe immer noch ausführen.

Eine Möglichkeit besteht darin, eine Liste von widerrufenen Aufgaben-IDs zu führen, aber diese Methode führt dazu, dass die Liste willkürlich wächst. Das Löschen dieser Liste erfordert, dass die Task nicht mehr in der RabbitMQ-Warteschlange vorhanden ist, was nicht möglich zu sein scheint.

Ich habe auch versucht, eine gemeinsame --statedb-Datei für jeden der selleryd Arbeiter zu verwenden, aber es scheint, dass die angegebene b-Datei nur bei Beendigung der Arbeiter aktualisiert wird und somit nicht geeignet für das, was ich erreichen möchte.

Vielen Dank im Voraus!

Antwort

0

Ich musste etwas ähnliches in meinem Projekt tun und celerycam mit django-admin-monitor verwenden. Der Monitor erstellt einen Snapshot der Aufgaben und speichert sie regelmäßig in der Datenbank. Und es gibt eine nette Benutzeroberfläche zum Durchsuchen und Überprüfen des Status aller Aufgaben. Und Sie können es sogar even if your project is not Django based verwenden.

0

Ich habe vor einiger Zeit etwas Ähnliches implementiert, und die Lösung, die ich entwickelt habe, war sehr ähnlich wie deine.

Die Art, wie ich dieses Problem löste, war, dass der Arbeiter das Objekt Task aus der Datenbank holte, als der Job ausgeführt wurde (indem man den Primärschlüssel übergab, wie es die Dokumentation empfiehlt). In Ihrem Fall sollte der Mitarbeiter vor dem Senden der Erinnerung eine Überprüfung durchführen, um sicherzustellen, dass die Aufgabe "bereit" ist, um ausgeführt zu werden. Wenn nicht, sollte es einfach zurückkehren, ohne irgendeine Arbeit zu verrichten (vorausgesetzt, dass sich die ETA geändert hat und ein anderer Arbeiter den neuen Job annimmt).

+0

würde dieses noch theoretisch erfordern, dass ich alle vorherigen Task-Ergebnisse in der Datenbank behalten, da jede Beschneidung zu einem Verlust der Garantie führen würde, die neu Arbeitsprozesse neu gestartet Führen Sie keine zuvor widerrufenen Aufgaben aus. –

+0

Ich gehe davon aus, dass Sie bereits eine Art Datenbankmodell eingerichtet haben, mit dem Sie auch die Task-ID speichern, damit Sie die Aufgabe bei Bedarf widerrufen können. Wenn dies der Fall ist, können Sie diesem Modell einfach eine "completed" -Flag hinzufügen. –

+0

Eine Alternative, die ich gerade entwickelt habe: Behalte eine Liste von revoke-Aufgaben-IDs, und jedes Mal, nachdem ein selleryd-Prozess hochgespielt oder neu gestartet wurde, durchläuft ein Skript die gesamte Liste und sendet die recode-Befehle erneut. Auf diese Weise müssen wir nur die Aufgaben-IDs behalten, die seit dem letzten Skriptlauf gesperrt wurden. Kannst du irgendwelche Fehler in dieser Implementierung sehen? –

1

Interessantes Problem, ich denke, es sollte einfach sein, mit Broadcast-Befehle zu lösen. Wenn ein neuer Worker gestartet wird, werden alle anderen Worker aufgefordert, die widerrufenen Aufgaben an den neuen Worker zu senden. Hinzufügen von zwei neuen Fernsteuerbefehle, Sie einfach neue Befehle mithilfe von @Panel.register hinzufügen können,

Modul control.py:

from celery.worker import state 
from celery.worker.control import Panel 

@Panel.register 
def bulk_revoke(panel, ids): 
    state.revoked.update(ids) 

@Panel.register 
def broadcast_revokes(panel, destination): 
    panel.app.control.broadcast("bulk_revoke", arguments={ 
     "ids": list(state.revoked)}, 
     destination=destination) 

Fügen Sie es zu CELERY_IMPORTS:

CELERY_IMPORTS = ("control",) 

Das einzige fehlende Problem Jetzt ist es so zu verbinden, dass der neue Arbeiter beim Start broadcast_revokes auslöst. Ich denke, man das worker_ready Signal dafür verwenden könnte:

from celery import current_app as celery 
from celery.signals import worker_ready 

def request_revokes_at_startup(sender=None, **kwargs): 
    celery.control.broadcast("broadcast_revokes", 
          destination=sender.hostname)