2012-08-23 5 views
6

Wenn dies eine idiotische Frage ist, entschuldige ich mich und meinen Kopf in Scham gehen verstecken, aber:Python/rq - Überwachung Arbeiter Status

ich rq bin mit Jobs in Python Schlange stehen. Ich möchte, dass es so funktioniert:

  1. Job A beginnt. Job A greift Daten über die Web-API zu und speichert sie.
  2. Job A läuft.
  3. Job A wird abgeschlossen.
  4. Nach Abschluss von A beginnt Job B. Job B prüft jeden von Job A gespeicherten Datensatz und fügt einige zusätzliche Antwortdaten hinzu.
  5. Nach Abschluss von Job B erhält der Benutzer eine E-Mail mit der Nachricht, dass der Bericht fertig ist.

Mein Code so weit:

redis_conn = Redis() 
use_connection(redis_conn) 
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later 
w = Worker(q) 
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid) 
w.work() 

Ich nahm meine beste Lösung 2 Arbeiter, ein Job-A und eine für B. Der Job B Arbeiter konnte Job A überwachen und haben war, als Job A wurde gemacht, fangen Sie an Job B.

Was ich nicht herausfinden kann, um mein Leben zu retten, ist, wie ich einen Arbeiter bekomme, um den Status eines anderen zu überwachen. Ich kann die Job-ID von Job A mit job.id abrufen. Ich kann den Worternamen mit w.name greifen. Aber haben Sie nicht die geringste Ahnung, wie ich diese Informationen an den anderen Arbeiter weitergebe.

Oder gibt es eine viel einfachere Möglichkeit, dies zu tun, die ich total vermisse?

+1

Wenn Auftrag B nicht ausgeführt werden kann, bis Auftrag A abgeschlossen ist (was bedeutet, dass sie nicht parallel ausgeführt werden können), warum überhaupt rq verwenden? Tun Sie sie einfach sequentiell (in einem separaten Thread oder Prozess, wenn Sie Ihre Anwendung nicht blockieren wollen) –

+0

Die Jobs für A und B dauern jeweils sehr lange und können getrennt passieren, also würde ich es gerne tun können Ich lasse viele Jobs A unabhängig von Job B laufen. Wenn es zu schwierig ist, kann ich mich ergeben. – user1066609

+0

Haben Sie Paare von A und B, die zusammen passen, oder kann irgendein B von irgendeinem A abhängen? Weil in letzterem Fall ein Synchonisierungsproblem besteht. :-) –

Antwort

0

Sie sind wahrscheinlich zu tief in Ihr Projekt zu wechseln, aber wenn nicht, werfen Sie einen Blick auf Twisted. http://twistedmatrix.com/trac/ Ich benutze es gerade für ein Projekt, das APIs erreicht, Webinhalte scratcht usw. Es führt mehrere Jobs parallel aus und organisiert bestimmte Jobs in der richtigen Reihenfolge, so dass Job B erst ausgeführt wird, wenn Job A erledigt ist.

Dies ist das beste Tutorial zum Lernen Twisted, wenn Sie es versuchen möchten. http://krondo.com/?page_id=1327

0

Kombinieren Sie die Dinge, die Job A und Job B tun, in einer Funktion und verwenden Sie dann z. multiprocessing.Pool (es ist map_async Methode), das über verschiedene Prozesse zu farmen.

Ich bin nicht vertraut mit rq, aber multiprocessing ist ein Teil der Standardbibliothek. Standardmäßig verwendet es so viele Prozesse wie Ihre CPU Kerne hat, was nach meiner Erfahrung in der Regel ausreicht, um die Maschine zu sättigen.

2

Von this page auf den rq docs, sieht es wie jedes job Objekt ein result Attribut, aufrufbar durch job.result hat, die Sie überprüfen können. Wenn der Job nicht beendet wurde, lautet er None. Wenn Sie jedoch sicherstellen, dass Ihr Job einen Wert zurückgibt (auch nur "Done"), können Sie Ihren anderen Mitarbeiter das Ergebnis des ersten Jobs überprüfen lassen und erst dann beginnen, zu arbeiten job.result hat einen Wert, was bedeutet, dass der erste Arbeiter fertiggestellt wurde.

6

aktualisieren januari 2015 diese Pull-Anforderung wird nun verschmolzen, und der Parameter auf depends_on umbenannt wird, das heißt:

second_job = q.enqueue(email_customer, depends_on=first_job) 

Der Original-Beitrag für Menschen, ältere Versionen laufen intakt gelassen und so:

Ich habe eine Pull-Anfrage (https://github.com/nvie/rq/pull/207) eingereicht, um Jobabhängigkeiten in RQ zu behandeln. Wenn diese Pull-Anforderung in wird verschmolzen, werden Sie in der Lage zu tun:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

first_job = q.enqueue(generate_report) 
second_job = q.enqueue(email_customer, after=first_job) 
# In the second enqueue call, job is created, 
# but only moved into queue after first_job finishes 

Vorerst Ich schlage vor, eine Wrapper-Funktion das Schreiben nacheinander auf Ihre Aufträge ausführen. Beispiel:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

def generate_report_and_email(): 
    generate_report() 
    email_customer() # You can also enqueue this function, if you really want to 

# Somewhere else 
q.enqueue(generate_report_and_email)