2017-12-06 4 views
0

Ich muss zusätzliche Argumente an meine Callback-Funktion in Sellerie Akkorde übergeben. (Sellerie Version: 4.1.0 (latentcall) und Python 2,7)Übergeben Sie zusätzliche Argumente an die Callback-Funktion in Sellerie Akkord

Betrachten Sie das folgende Beispiel:

program.py

from tasks import get_stock_info, call_back 
from celery import group, chord 

def chord_queue(): 
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4]) 
    callback = call_back.subtask() 
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4]) 
    res = chord(header,queue='susanoo_dev')(callback) 
    res1 = chord(header1,queue='susanoo_core')(callback) 
    print(res.get()) 
    print(res1.get()) 
    print("We are done") 

if __name__ == '__main__': 
    chord_queue() 

tasks.py

from pandas_datareader import data 
from celery_app import app 
import time 

@app.task 
def get_stock_info(delay): 
    print('hello Celery--------') 
    time.sleep(delay) 
    print('Whats up') 
    return 10 

@app.task 
def call_back(num): 
    print("Everything is done------") 
    print("Everything is done------") 
    return sum(num) 

sellery_ap p.py

from celery import Celery 
from kombu import Queue 

app = Celery('tasks', broker='amqp://my_user:[email protected]/my_vhost', backend='redis://localhost:6379/0') 

CELERY_CONFIG = { 
    'CELERY_DEFAULT_QUEUE': 'default', 
    'CELERY_QUEUES': (Queue('dev'), Queue('core'),) 
} 

app.conf.update(**CELERY_CONFIG) 

nun in diesem Fall, wenn der Akkord aufgerufen wird und nach allen 3 get_stock_info Aufgaben fertig sind, die CALL_BACK aufgerufen wird, ist der Wert, zu dem , das ist Der Rückgabewert vom get_stock_info wird automatisch übergeben. Jetzt zusammen mit den Rückgabewerten würde ich auch ein zusätzliches Argument sagen sagen eine Zeichenfolge als "abcd" an die Callback-Funktion.

Wie mache ich das?

Ich habe versucht, das bereits auf einigen Blogs wie vorgeschlagen zu tun/SO Antworten usw.

program.py

def chord_queue(): 
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4]) 
    callback = call_back.subtask(kwargs={'my_str' : 'abcd'}) 
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4]) 
    res = chord(header,queue='susanoo_dev')(callback) 
    res1 = chord(header1,queue='susanoo_core')(callback) 
    print(res.get()) 
    print(res1.get()) 
    print("We are done") 

tasks.py

@app.task 
def call_back(num, my_str): 
    print("Everything is done------") 
    print("Everything is done------") 
    print my_str 
    return my_str, sum(num) 

Aber diese scheint nicht zu funktionieren und gibt den folgenden Fehler aus:

celery.backends.base.ChordError: Callback error: TypeError("call_back() got an unexpected keyword argument 'my_str'",)

Antwort

0

Haben Sie die Antwort. Danke an einen Freund, der mir dabei geholfen hat. Alles, was in der obigen Lösung falsch gemacht wurde, war, my_str nicht als ein Schlüsselwortargument in der Definition des call_back() zu definieren.

So würde die Arbeitslösung sein:

program.py

def chord_queue(): 
    header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4]) 
    callback = call_back.subtask(kwargs={'my_str' : 'abcd'}) 
    header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4]) 
    res = chord(header,queue='susanoo_dev')(callback) 
    res1 = chord(header1,queue='susanoo_core')(callback) 
    print(res.get()) 
    print(res1.get()) 
    print("We are done") 

task.py

@app.task 
def call_back(num, my_str=None): 
    print("Everything is done------") 
    print("Everything is done------") 
    print my_str 
    return my_str, sum(num) 

Und es funktioniert so ohne Probleme zu erwarten.

Verwandte Themen