2016-12-13 3 views
5

Ich verwende Celery 4.0.1 mit Django 1.10 und ich habe Schwierigkeiten beim Planen von Aufgaben (Ausführen einer Aufgabe funktioniert gut). Hier ist die Sellerie-Konfiguration:Periodische Aufgaben in Sellerie (Selleriebeat) dynamisch einrichten mit add_periodic_task

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') 
app = Celery('myapp') 

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST) 
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery' 
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default' 
app.conf.CELERY_TASK_SERIALIZER = 'json' 
app.conf.CELERY_ACCEPT_CONTENT = ['json'] 
app.conf.CELERY_IGNORE_RESULT = True 
app.conf.CELERY_DISABLE_RATE_LIMITS = True 
app.conf.BROKER_POOL_LIMIT = 2 

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'), 
    Queue('myapp.queue1'), 
    Queue('myapp.queue2'), 
    Queue('myapp.queue3'), 
) 

Dann in tasks.py ich habe:

@app.task(queue='myapp.queue1') 
def my_task(some_id): 
    print("Doing something with", some_id) 

In views.py Ich mag diese Aufgabe planen:

def my_view(request, id): 
    app.add_periodic_task(10, my_task.s(id)) 

Dann führe ich die Befehle:

sudo systemctl start rabbitmq.service 
celery -A myapp.celery_app beat -l debug 
celery worker -A myapp.celery_app 

Aber die Aufgabe ist nie geplant. Ich sehe nichts in den Protokollen. Die Aufgabe funktioniert, denn wenn ich es meiner Meinung nach tue:

def my_view(request, id): 
    my_task.delay(id) 

Die Aufgabe wird ausgeführt.

Wenn in meiner Konfigurationsdatei, wenn ich die Aufgabe manuell zu planen, wie dies funktioniert es: dynamisch

app.conf.CELERYBEAT_SCHEDULE = { 
    'add-every-30-seconds': { 
     'task': 'tasks.my_task', 
     'schedule': 10.0, 
     'args': (66,) 
    }, 
} 

Ich kann einfach nicht die Aufgabe planen. Irgendeine Idee?

Antwort

14

Eigentlich kann man nicht ohne periodische Aufgabe auf Ansichtsebene definieren, weil der Beat Zeitplan Einstellung zuerst geladen und kann zur Laufzeit nicht nachgeholt:

Die add_periodic_task() Funktion wird die Eingabe des hinzufügen beat_schedule Einstellung hinter den Kulissen, und die gleiche Einstellung kann auch manuell einrichten periodische Aufgaben verwendet werden:

app.conf.CELERYBEAT_SCHEDULE = { 
    'add-every-30-seconds': { 
     'task': 'tasks.my_task', 
     'schedule': 10.0, 
     'args': (66,) 
    }, 
} 

was bedeutet, wenn Sie add_periodic_task() verwenden möchten es sollte withi gewickelt werden n eine on_after_configure Handler an der Sellerie App-Ebene und jede Änderung zur Laufzeit nicht wirksam:

app = Celery() 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    sender.add_periodic_task(10, my_task.s(66)) 

Wie in der doc erwähnt die die reguläre celerybeat einfach den Überblick über die Aufgabenausführung halten:

Der Standard Scheduler ist der celery.beat.PersistentScheduler, der einfach die letzten Laufzeiten in einer lokalen Shelve-Datenbankdatei verfolgt.

Um dynamisch zu können periodische Aufgaben verwalten und celerybeat zur Laufzeit neu planen:

Es gibt auch die django-celery-beat-Erweiterung, die den Zeitplan in der Django-Datenbank speichert und stellt ein bequemes Admin-Interface zu periodische Aufgaben zur Laufzeit verwalten.

Die Tasks werden in der django-Datenbank beibehalten und der Scheduler kann im Task-Modell auf der db-Ebene aktualisiert werden.Immer wenn Sie eine periodische Aufgabe aktualisieren, wird ein Zähler in dieser Aufgaben-Tabelle inkrementiert und weist den Sellerie-Beat-Dienst an, den Zeitplan von der Datenbank neu zu laden.

Eine mögliche Lösung für Sie wie folgt sein könnte:

from django_celery_beat.models import PeriodicTask, IntervalSchedule 

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) 
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66])) 

views.py

def update_task_view(request, id) 
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique 
    task.args=json.dumps([id]) 
    task.save() 


EDIT: (13/01/2018)


Die neuesten release 4.1.0 in diesen ticket #3958 das Thema angesprochen hat und auf Ihrem Kommentar „Eigentlich kann man nicht nicht definieren periodische Task auf Ansichtsebene“

+0

Picking up zusammengeführt: Ist es möglich, Verwenden Sie 'add_periodic_task()' auf App-Ebene, dh in 'task.py'? Es scheint eine bessere Kapselung zu sein, damit diese periodischen Aufgaben in der App deklariert werden. –

+0

Eigentlich ist es überhaupt nicht notwendig, es zu verwenden, da es für Sie aufgerufen wird, wenn Sie nur 'app.conf.CELERYBEAT_SCHEDULE' Einstellungen Syntax verwenden, aber wenn Sie es explizit verwenden möchten, können Sie es in der' task.py' verwenden Datei. – DhiaTN

+2

Ich glaube die neueste Version (nach 4.1.0) sollte diese angesprochen haben. Hier ist der Entwickler, der gerade läuft [# 3958] (https://github.com/sellery/sellery/pull/3958) –

Verwandte Themen