2010-05-06 8 views
12

Für eine Python/Django/Sellerie basierte Verteilungs-Tool haben wir die folgende Konfiguration:Wie wird eine Warteschlangen-/Worker-Struktur zur Unterstützung großer Aufgaben für mehrere Umgebungen erstellt?

  1. Wir verwenden derzeit die Standard-Sellerie-Setup. (Eine Warteschlange + Austausch wird "Sellerie" genannt.)
  2. Jeder Task in der Warteschlange repräsentiert einen Bereitstellungsvorgang.
  3. Jede Aufgabe für eine Umgebung endet mit einer Synchronisierungsphase, die möglicherweise (sehr) lang dauert.

Die folgenden Spezifikationen müssen erfüllt sein:

  1. Concurrency: Aufgaben für mehrere Umgebungen gleichzeitig durchgeführt werden sollte.
  2. Sperren: Es kann höchstens eine Task für jede Umgebung zur gleichen Zeit (d. H. Umgebungen Sperre) ausgeführt werden.
  3. Durchsatzoptimierung: Wenn mehrere Aufgaben für eine einzelne Umgebung vorhanden sind, können ihre Synchronisierungsphasen zur Optimierung kombiniert werden. Wenn also eine Aufgabe ihrem Ende nahe kommt, sollte sie prüfen, ob neue Aufgaben in der Warteschlange für diese Umgebung warten, und falls ja, ihre Synchronisierungsphase überspringen.

Was ist der bevorzugte Weg, dies zu implementieren?

Einige Gedanken:

  • würde ich sagen wir mehrere Warteschlangen einrichten: eine für jede Umgebung und haben N Sellerie Arbeiter ausschließlich eine einzelne Warteschlange Verarbeitung jeder. (Dies würde die Spezifikation 1 + 2 lösen.)
    Aber wie bekommen wir mehrere Selleriearbeiter, die ausschließlich verschiedene Warteschlangen hören?
  • Gibt es eine saubere Möglichkeit zu wissen, dass in der Warteschlange mehr Aufgaben für eine Umgebung warten?

Antwort

1

Ich würde einen Blick auf zeromq es kann Messaging und Multithreading in einer super schnelle Bibliothek tun. Es unterstützt auch eine große Anzahl von Sprachen und verfügt über einen integrierten Lastenausgleich.

2

für 1,2 Verwenden Sie mehrere Warteschlangen und starten Sie Worker mit -Q, um anzugeben, welche Warteschlange zu hören ist. Konfigurieren Sie auch CELERYD_PREFETCH_MULTIPLIER = 1 für jeweils nur eine Aufgabe.

Um die Warteschlange Länge (getestet mit rabbitmq) zu erhalten, können Sie so etwas wie folgt verwenden:

from kombu.connection import BrokerConnection 
connection = BrokerConnection(BROKER_HOST, BROKER_USER...) 
channel = connection.channel() 
q, j, c = channel.queue_declare('celery', passive=True) 
print 'celery %d jobs in queue' % j 

‚queue_delcare‘ als Nebeneffekt, geben Sie die Länge der Warteschlange. Hoffe das kann dir helfen.

Verwandte Themen