2017-12-21 8 views
0

Seit kurzem spiele ich mit Ubuntu mit Sellerie und Blume (für Dashboard und Taskvisualisierung auf einem Rechner) mit Python 3.x. Zuerst habe ich rabbitmq-server, radis, sellerie und flower installiert. Dann habe ich ein Skript erstellt tasks.py enthält das gerufene folgende:Inputs und Bildverarbeitungsjobs mit Sellerie und Python auf mehrere Rechner senden

from celery import Celery 

# py-advanced-message-queuing-protocol 
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//') 

@app.task 
def intensive_sum1(num): 
    val = sum(x**4 for x in range(num)) 
    return val 


@app.task 
def intensive_sum2(num): 
    val = sum(x**4 for x in range(num)) 
    return val 

@app.task 
def intensive_sum3(num): 
    val = sum(x**4 for x in range(num)) 
    return val 

Dann habe ich erstellt ein Skript run.py enthält

from tasks import intensive_sum1, intensive_sum2, intensive_sum3 
import time 

start = time.time() 
result1 = intensive_sum1.delay(100000000) 
result2 = intensive_sum2.delay(100000000) 
result3 = intensive_sum3.delay(100000000) 
print(result1.get(), result2.get(), result3.get()) 
end = time.time() 
print('time: ', end - start) 

start = time.time() 
result1 = sum(x**4 for x in range(100000000)) 
result2 = sum(x**4 for x in range(100000000)) 
result3 = sum(x**4 for x in range(100000000)) 
print(result1, result2, result3) 
end = time.time() 
print('time: ', end - start) 

bevor dieser ausgeführt begann ich zwei verschiedene Terminals und änderte das Verzeichnis in den Speicherort der beiden Skripte. Ich lief dann sudo celery -A tasks flower in einem Terminal und celery -A tasks worker --loglevel=info in dem anderen Terminal. Es stellt sich heraus, dass Sellerie (Überraschung überrascht) jede Aufgabe auf einen einzelnen Kern verteilen kann, was zu einer enormen Zeitersparnis führt. Natürlich ist diese Zeitersparnis nur für große Funktionen zu erwarten, da kleinere Threads Generierungsaufwand verursachen, der keinen Nutzen bringt.

Das hat mich über ein anderes Problem nachdenken lassen. Nehmen wir an, dass ich statt einer Maschine drei Maschinen an denselben WLAN-Router angeschlossen habe. Ich kann die IP-Adresse für jeden dieser Ubuntu-Maschinen mit ifconfig Befehl erarbeiten. Lassen Sie uns sagen, dass eine dieser Maschinen eine Master-Maschine ist, die ein main.py Skript enthält, das Echtzeitbilder mit Opencv-Python Capture-Objekt erfasst. Es nimmt dann jedes Bild, serialisiert es und sendet es als eine Nachricht an zwei Arbeitsmaschinen. Beide Arbeitsmaschinen arbeiten unabhängig und beide serialisieren das gleiche Bild. Eine Arbeitermaschine führt eine Katzenklassifizierung durch und gibt eine Wahrscheinlichkeit einer Katze zurück, die andere Maschine führt eine Hundeklassifizierung durch und gibt eine Wahrscheinlichkeit eines Hundes zurück. Eine Arbeitsmaschine kann länger brauchen, um zu einer Schlussfolgerung zu kommen als die andere. Für diesen bestimmten Frame muss der Master-Computer jedoch auf beide Klassifizierungsergebnisse warten, bevor er einige Ergebnisse oben auf diesem bestimmten Frame überlagert. Instinktiv bin ich zu der Überzeugung gelangt, dass die Master-Maschine überprüfen muss, ob beide Jobs bereit sind, bevor sie weitergeht (e.g. result_worker_one.ready() == result_worker_two.ready() == True). Wie kann ich dieses Verhalten erreichen? Wie kann ich ein RGB-Bild im Master-Gerät serialisieren und es in Arbeitsmaschinen deserialisieren? Was backend und broker benötigen jede Maschine? Wie kann dies als Client-Server-Architektur eingerichtet werden?

Antwort

1

Sie haben Recht, wenn Sie Jobs auf mehrere Maschinen verteilen. In der Tat ist es einer der Hauptzwecke von Sellerie.

  1. Um zu überprüfen, ob zwei asynchrone Jobs beendet sind, können Sie Gruppen- und Akkordoptionen in Sellerie verwenden. nehmen Ihre zwei Sellerie Aufgaben sind wie folgt:

    @app.task 
    def check_dog(): 
        #dog_classification code 
    
    @app.task 
    def check_cat(): 
        #cat classification code 
    

    Sie können Gruppe diese Aufgaben zusammen und dann einen Akkord verwenden (Ein Akkord ist eine Aufgabe, die nur nach dem alle Aufgaben in einer Gruppe mit der Ausführung fertig ausführt) zu Gehen Sie zum nächsten Schritt, nachdem beide Funktionen ausgeführt wurden. Fügen Sie nach den beiden Aufgaben in der unten gezeigten Rückruffunktion alles ein, was Sie benötigen. zugehörige Dokumentation finden Sie hier: http://docs.celeryproject.org/en/master/userguide/canvas.html#groups

    chord([check_dog(),check_cat()])(callback) 
    
  2. für das Bild bei dieser Werfen Sie einen Blick Teil Serialisierung: Passing an image to a celery task

  3. Um den dritten Teil der Frage zu beantworten, von Natur aus Sellerie eine Client-Server-Architektur folgt um paralleles Rechnen zu unterstützen. Wenn Sie eine Sellerieaufgabe aufrufen, wird eine Nachricht auf den von Ihnen eingestellten Nachrichtenbroker gestellt (in Ihrem Fall haben Sie rabbitMQ verwendet).Diese Nachricht enthält Informationen darüber, welche Aufgabe mit allen erforderlichen Argumenten ausgeführt werden soll. Die Nachrichtenwarteschlange wird Nachrichten an Sellery-Mitarbeiter auf verschiedenen Computern übermitteln. Sobald ein Worker eine Nachricht erhält, führt der Worker die in der Nachricht beschriebene Aufgabe aus. Wenn Sie Ihre Aufgaben auf mehrere Computer verteilen möchten, müssen Sie lediglich einen Sellerie-Arbeiter in jedem Computer starten, der Ihre Nachrichtenwarteschlange auf Ihrem Hauptcomputer überwacht. Sie können die Arbeiter wie folgt konfigurieren

    app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>') 
    

    sicherstellen, dass Sie eine Taskdatei zu jedem Sellerie Arbeiter zur Verfügung stellen, weil Botschaft an die Arbeiter weitergegeben nicht den Quellcode enthält, sondern nur den Namen der Aufgabe selbst nur.