2017-12-29 4 views
1

Ich bin "erfolgreich" Senden einer Nachricht von der Ansicht an den Client mit dem Status einer Gruppe von Aufgaben (keine tatsächliche Sellerie-Gruppe). Das Problem ist: Das ignoriert wirklich, ob alle Aufgaben tatsächlich ausgeführt werden. Ich habe versucht, einen Rückruf hinzuzufügen (task.apply_async(link=)), aber das half auch nicht.django-channels/sellery: Wie kann ich den Fortschritt einer Liste von Aufgaben verfolgen?

Die Aufgaben selbst wirklich nicht viel Zeit in Anspruch nehmen, aber ich würde wirklich in der Lage sein mag, den Zähler zu erhöhen, wenn die Aufgabe ausgeführt tatsächlich wurde:

if 'selected' in request.GET: 
     selected_as_list = request.GET.getlist('selected') 
     print(selected_as_list) 
     searches = list(set([s.strip() for s in selected_as_list if s.strip()])) 
     task_group = [refresh_func.s(str(user_profile.id), search, dont_auto_add=True) for search in searches] 

     for i,task in enumerate(task_group): 
      task.apply_async() 
      Group(str(request.user.id)).send({"text": json.dumps({"tasks_completed": i+1, 
                    "task_id": "fb_import", 
                    "completed": True if i == len(task_group) -1 else False, 
"total": len(task_group)})}) 

Also zog ich den Code aus der Sicht und in den gleichen Block, der die Operation tatsächlich aufruft. Obwohl es bedeutete, dass ich jetzt viele Parameter passierte, löste dies das anfängliche Problem. Aber es stellt einen anderen dar: Eine Aufgabe mit einem Index von "1" kann nach einer Aufgabe mit einem Index "3" enden, und dies aktualisiert offensichtlich den Zähler falsch.

Was kann getan werden, um dies zu lösen?

Antwort

1

Wie wäre es mit einem Hintergrund-Thread, der regelmäßig den Status der erzeugten Aufgaben überprüft (Sie können diese Status erhalten, wenn Sie die IDs der Aufgaben kennen)?

Dieser Thread sollte in der Django-Server (nicht in den Sellerie Aufgaben) ausgeführt werden, weil das wahrscheinlich ist, wo Ihr django-channel aktiv ist: Wenn Sie Group(...).send in der Task aufrufen, wird es wahrscheinlich nicht in der Lage sein, darauf zuzugreifen (speziell seit Normalerweise Sellerie Arbeiter laufen in separaten Prozessen/Maschinen)

Nehmen wir an, Sie spawn Aufgaben in einer .GET Implementierung einer Ansicht. Vielleicht könntest du dort die Aufgaben-IDs sammeln (wo sie erzeugt werden) und regelmäßig ihren Status in einem Thread überprüfen (damit du die .GET Antwort nicht blockierst).

Lassen Sie uns sagen, dass die Ansicht, in der Sie Ihre Aufgaben laichen wie folgt aussieht:

class Test(generic.TemplateView): 
    template_name = 'stack_092.html' 

    def get(self, request, *args, **kwargs): 
     logger.info("Yep") 
     task_group = [foo_task.s(i) for i in range(5)] 
     logger.info("Task signatures created: %s", task_group) 

     task_ids = [task.apply_async().task_id for task in task_group] 
     logger.info("Tasks launched") 
     th = threading.Thread(target=verify_task_ids, args=('request.user.id', task_ids)) 
     th.start() 
     logger.info("Thread started") 
     return super(Test, self).get(request, *args, **kwargs) 

Und so etwas wie dies die verify_task_ids Zielfunktion für das Thema sein könnte:

def verify_task_ids(channel_group_id, task_ids): 
    previous_finished_task_ids = set() 
    finished_task_ids = set() 
    logger.info("Verifying %s task_ids", len(task_ids)) 
    while len(finished_task_ids) < len(task_ids): 
     finished_task_ids = set() 
     for task_id in task_ids: 
      if AsyncResult(task_id).ready(): 
       finished_task_ids.add(task_id) 
     if finished_task_ids != previous_finished_task_ids: 
      logger.info("%s new finished tasks", 
         len(finished_task_ids) - len(previous_finished_task_ids)) 
     previous_finished_task_ids = finished_task_ids 

Im Beispiel der channel_group_id Argument ist nur eine reine fest codierte Zeichenfolge "request.user.id". In Ihrem Fall sollten Sie es durch den tatsächlichen request.user.id des Benutzers ersetzen, der am Server angemeldet ist, da dies Ihre Gruppen-ID ist.

Und Sie werden sehen, dass, wenn eine neue Aufgabe beendet ist, habe ich nur eine Log-Meldung:

if finished_task_ids != previous_finished_task_ids: 
     logger.info("%s new finished tasks", 
        len(finished_task_ids) - len(previous_finished_task_ids)) 

Hier, wo anstelle der logger.info Funktion, die Sie wahrscheinlich

if finished_task_ids != previous_finished_task_ids: 
    Group(
     str(channel_group_id) 
    ).send(
     { 
      "text": json.dumps({ 
       "tasks_completed": len(finished_task_ids), 
       "task_id": "fb_import", 
       "completed": len(finished_task_ids) == len(task_ids), 
      }) 
     } 
    ) 

I don nennen sollte Ich weiß nicht viel (ähm ... irgendetwas, eher ... über Django-Kanäle), also bin ich mir nicht sicher, ob diese Lösung funktionieren wird, aber vielleicht ist es einen Versuch wert?

+0

Vielen Dank. Ich versuche, Umfragen zu vermeiden. Vielleicht kann ich Teile Ihrer Antwort übernehmen und irgendwo innerhalb einer for-Schleife nach etwas suchen, während die Aufgaben noch laufen. – zerohedge

+1

Ich dachte mir **: (** Das Hauptproblem (zumindest für mich) ist, dass die Sellery - Mitarbeiter dazu neigen, auf anderen Servern als dem Webserver zu laufen, selbst wenn nicht, laufen sie in getrennten Prozessen, was die Kommunikation mit der Prozess, der es ziemlich schwierig schuf.Viel Glück damit, und wenn Sie es selbst lösen, können Sie hinzufügen, was Sie als Antwort auf Ihre eigene Frage getan haben? Das interessiert mich auch. – BorrajaX

+0

Ich schaffte es tatsächlich früher heute. Es ist im Moment sehr klobig und kaum lesbar und scheint auch etwas spammig zu sein, aber es ist "asynchron". Ich werde die Antwort posten, sobald ich es Refactoring bekommen, aber hier ist die allgemeine Idee: Funktion Unterschriften sammeln, als Liste in eine Helferfunktion senden, dort, in einer for-Schleife, tun 'task.freeze()' und jede Aufgabe ID anhängen Starten Sie dann eine asynchrone Task, die diese Liste durchläuft, bis alle IDs "SUCCESS" (das sendet Nachrichten über Kanäle) zurückgibt. Nur dann task.apply_async() in einer ähnlichen for-Schleife. – zerohedge

Verwandte Themen