2014-11-21 11 views
8

Ich habe Probleme, etwas zu finden, das scheint, als wäre es relativ einfach für mich.Initialisieren eines Arbeiters mit Argumenten mit Sellerie

Ich benutze Sellerie 3.1 mit Python 3 und möchte meine Arbeiter mit Argumenten initialisieren, damit sie diese Details für die Einrichtung verwenden können.

Im Einzelnen: Diese Mitarbeiter werden Aufgaben ausführen, die die Interaktion mit einer API eines Drittanbieters erfordern, die Authentifizierungsdaten verwendet. Es ist erforderlich, dass der Mitarbeiter die Authentifizierungsdetails an den API-Server weitergibt, bevor Aufgaben ausgeführt werden (Authentifizierungsdetails werden nach der ersten Authentifizierungsanforderung in Cookies gespeichert).

Ich möchte diese Anmeldedaten an den Mitarbeiter weitergeben, wenn er über die CLI gestartet wird. Ich möchte dann, dass sich der Benutzer authentifiziert, indem er sie verwendet und die Sitzung speichert, um zukünftige Aufgaben zu konsumieren (idealerweise würde dies in einem Attribut gespeichert werden, auf das von Aufgaben zugegriffen werden kann).

Ist das mit Sellerie möglich? Als Randnotiz habe ich überlegt, ein requests.session Objekt (aus der Python requests Bibliothek) als Task-Argument zu übergeben, aber das würde eine Serialisierung erfordern, die aussieht, als wäre sie verpönt.

Antwort

16

Ich würde vorschlagen, eine abstrakte Aufgabe Basisklasse und Caching requests.session.

Von der Sellerie docs:

Eine Aufgabe instanziiert wird nicht für jede Anforderung, sondern wird in dem Task-Registry als globale Instanz registriert.

Dies bedeutet, dass der Konstruktor __init__ nur einmal pro Prozess aufgerufen wird und dass die Taskklasse semantisch näher an einem Actor liegt.

Dies kann auch nützlich sein, Ressourcen zu cachen ...

import requests 
from celery import Task 

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    def __init__(self): 
     # since this class is instantiated once, use this method 
     # to initialize and cache resources like a requests.session 
     # or use a property like the example below which will create 
     # a requests.session only the first time it's accessed 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      session.auth = ('user', 'pass') 

      self._session = session 

     return self._session 

Wenn Sie jetzt die Aufgaben erstellen, die API-Anfragen machen:

@app.task(base=APITask, bind=True) 
def call_api(self, url): 
    # self will refer to the task instance (because we're using bind=True) 
    self.session.get(url) 

Sie können auch die API-Authentifizierungsoptionen unter Verwendung des app.task Dekorateur als zusätzliches Argument übergeben, die auf die gesetzt werden __dict__ der Aufgabe, zum Beispiel:

# pass a custom auth argument 
@app.task(base=APITask, bind=True, auth=('user', 'pass')) 
def call_api(self, url): 
    pass 

Und die Basisklasse macht die a verwendet uthentifizierung Optionen:

class APITask(Task): 
    """API requests task class.""" 

    abstract = True 

    # the cached requests.session object 
    _session = None 

    # the API authentication 
    auth =() 

    @property 
    def session(self): 
     if self._session is None: 
      # store the session object for the first time 
      session = requests.Session() 
      # use the authentication that was passed to the task 
      session.auth = self.auth 

      self._session = session 

     return self._session 

Sie können mehr über die Sellerie-docs Website lesen:

Nun zurück zu Ihrer ursprünglichen Frage, die die zusätzlichen Argumente ist vorbei Arbeiter von der Befehlszeile:

Es gibt einen Abschnitt über diese in der Sellerie docs Adding new command-line options, hier ist ein Beispiel für einen Benutzername und ein Passwort für den Arbeiter von der Kommandozeile übergeben:

$ celery worker -A appname --username user --password pass 

Der Code:

from celery import bootsteps 
from celery.bin import Option 


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.') 
) 

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.') 
) 


class CustomArgs(bootsteps.Step): 

    def __init__(self, worker, api_username, api_password, **options): 
     # store the api authentication 
     APITask.auth = (api_username, api_password) 


app.steps['worker'].add(CustomArgs) 
+0

Ausgezeichnet, es fiel mir schwer, all das aus der Dokumentation zu entziffern. Danke, dass du es so gut verteilt hast. –

+0

Entschuldigung, um das noch einmal auszugraben, könnten Sie klären, wie die Befehlszeilenargumente von Boostep an die Taskinitialisierung übergeben werden (damit ich das Task-Sitzungsobjekt mit dem Benutzernamen und dem Passwort aus dem Befehl initialisieren kann) -Linie). Ziel ist es, meine API-Anmeldeinformationen nicht im Klartext zu speichern. –

+0

@JoshuaGilman Entschuldigung für die Verzögerung, habe ich die Antwort mit einem Beispiel aktualisiert. – Pierre

0

Ich würde denken, Sie könnten das Skript, das Sie geschrieben haben, mit Befehlszeilenargumenten aufrufen. Etwas wie folgt aus:

my_script.py username password 

In Ihrem Skript können Sie Ihre Hauptfunktion in einem @celery.task oder @app.task Dekorateur gewickelt haben.

import sys 

from celery import Celery 

cel = Celery() # put whatever config info you need in here 

@celery.task 
def main(): 
    username, password = sys.argv[1], sys.argv[2] 

So etwas sollte Sie beginnen. Stellen Sie sicher, dass Sie auch Pythons argparse für komplexere Argumente analysieren.

+0

Danke, aber Sie können keinen Arbeitsprozess starten, indem Sie das Python-Skript aufrufen. Sie müssen Sellerie wie folgt aufrufen: "Sellerie A-Projekt Arbeiter -l info" –

+0

Wir müssen eine wirklich seltsame Einrichtung haben dann ... weil es so aussah, als ob es funktionierte. Ich muss mehr darüber lernen, was wir tun. –