2017-08-11 6 views
0

aufgerufen wird Wie Sellerie Aufgabe, die in einer anderen Sellerie Aufgabe aufgerufen wird, richtig zu verspotten? (Dummy-Code unten)richtig verspotten Sellerie Aufgabe, die in einem anderen Sellerie Aufgabe

@app.task 
def task1(smthg): 
    do_so_basic_stuff_1 
    do_so_basic_stuff_2 
    other_thing(smthg) 

@app.task 
def task2(smthg): 
    if condition: 
     task1.delay(smthg[1]) 
    else: 
     task1.delay(smthg) 

Ich habe genau die gleiche Struktur des Codes in my_module. proj/cel/my_module.py Ich versuche zu schreiben Test in proj/Tests/cel_test/test.py

Testfunktion: wirklich schwierig

def test_this_thing(self): 
    # firs I want to mock task1 
    # i've tried to import it from my_module.py to test.py and then mock it from test.py namespace 
    # i've tried to import it from my_module.py and mock it 
    # nothing worked for me 

    # what I basically want to do 
    # mock task1 here 
    # and then run task 2 (synchronous) 
    task2.apply() 
    # and then I want to check if task one was called 
    self.assertTrue(mocked_task1.called) 

Antwort

2

Sie rufen nicht task1() oder task2(), aber ihre Methoden: delay() und apply() - so Sie benötigen, wenn diese Methoden testen aufgerufen. Hier

ist ein funktionierendes Beispiel schrieb ich gerade auf Ihren Code basieren:

tasks.py

from celery import Celery 

app = Celery('tasks', broker='amqp://[email protected]//') 

@app.task 
def task1(): 
    return 'task1' 

@app.task 
def task2(): 
    task1.delay() 

test.py

from tasks import task2 

def test_task2(mocker): 
    mocked_task1 = mocker.patch('tasks.task1') 
    task2.apply() 
    assert mocked_task1.delay.called 

Testergebnisse:

$ pytest -vvv test.py 
============================= test session starts ============================== 
platform linux -- Python 3.5.2, pytest-3.2.1, py-1.4.34, pluggy-0.4.0 -- /home/kris/.virtualenvs/3/bin/python3 
cachedir: .cache 
rootdir: /home/kris/projects/tmp, inifile: 
plugins: mock-1.6.2, celery-4.1.0 
collected 1 item                 

test.py::test_task2 PASSED 

=========================== 1 passed in 0.02 seconds =========================== 
1

zu starten, können Aufgaben Sellerie Prüfung sein. Im Allgemeinen lege ich meine ganze Logik in eine Funktion, die KEINE Aufgabe ist, und mache dann eine Aufgabe, die diese Funktion gerade aufruft, damit man die Logik richtig testen kann.

Zweitens glaube ich nicht, dass Sie Aufgaben innerhalb von Aufgaben aufrufen möchten (nicht sicher, aber ich glaube, das wird im Allgemeinen nicht empfohlen). Stattdessen je nach Bedarf, sollten Sie wahrscheinlich werden Verkettungs oder Gruppierung:

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

Schließlich Ihre eigentliche Frage zu beantworten, wollen Sie die delay Methode genau flicken, wo es in Ihrem Code auftritt, wie beschrieben in this post.