0

Ich benutze Python Python Multiprocessing für einen RabbitMQ Consumers. Beim Start der Anwendung erstelle ich 4 WorkerProcesses.Python MultiProcessing

def start_workers(num=4): 
    for i in xrange(num): 
     process = WorkerProcess() 
     process.start() 

Unten finden Sie meine WorkerClass. Die Logik funktioniert soweit, ich erstelle 4 parallele Consumer-Prozesse. Aber das Problem ist, nachdem ein Prozess getötet wurde. Ich möchte einen neuen Prozess erstellen. Das Problem in der Logik unten ist, dass der neue Prozess als untergeordneter Prozess von dem alten erstellt wird und nach einiger Zeit der Speicher nicht mehr voll ist. Gibt es eine Möglichkeit mit Python Multiprocessing einen neuen Prozess zu starten und den alten korrekt zu beenden?

class WorkerProcess(multiprocessing.Process): 

def ___init__(self): 
    app.logger.info('%s: Starting new Thread!', self.name) 
    super(multiprocessing.Process, self).__init__() 

def shutdown(self): 
    process = WorkerProcess() 
    process.start() 
    return True 

def kill(self): 
    start_workers(1) 
    self.terminate() 

def run(self): 
    try: 
     # Connect to RabbitMQ 
     credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS')) 
     connection = pika.BlockingConnection(
      pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials)) 
     channel = connection.channel() 

     # Declare the Queue 
     channel.queue_declare(queue='screenshotlayer', 
           auto_delete=False, 
           durable=True) 

     app.logger.info('%s: Start to consume from RabbitMQ.', self.name) 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='screenshotlayer') 
     channel.start_consuming() 
     app.logger.info('%s: Thread is going to sleep!', self.name) 

     # do what channel.start_consuming() does but with stoppping signal 
     #while self.stop_working.is_set(): 
     # channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
    except Exception as e: 
       self.shutdown() 
    return 0 

Danke

Antwort

1

Im Hauptprozess, den Überblick über Ihre Teilprozesse (in einem list) und Schleife über sie mit .join(timeout=50) (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join).

Dann überprüfen, ob er lebt (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.is_alive).

Wenn nicht, ersetzen Sie ihn durch einen frischen.

def start_workers(n): 
    wks = [] 
    for _ in range(n): 
     wks.append(WorkerProcess()) 
     wks[-1].start() 
    while True: 
     #Remove all terminated process 
     wks = [p for p in wks if p.is_alive()] 

     #Start new process 
     for i in range(n-len(wks)): 
      wks.append(WorkerProcess()) 
      wks[-1].start() 
+0

Aber ich Subprocces nicht gewohnt. Ich beginne Thread: 1, Schritt: 2, Thread: 3, Thread: 4. Jetzt Wenn Thread: 1 getötet wurde, erzeugt es einen neuen Subprozess Thread: 1: 1, und der nächste Thread wäre Thread: 1: 1: 1 das geht so lange, bis es keinen Speicher mehr gibt. Ich möchte, dass wenn Thread: 1 getötet wurde ein neuer Thread generiert wird Thread: XY. – ghovat

+0

I Sie können Ihren Hauptprozess beschäftigt halten, er kann die 4 Threads überwachen und wenn einer beendet wird, kann er einen neu starten. Wenn deine Funktion 'start_worker' nach dem Start zurückkehren soll, starte einen Thread, der den Rest erledigt (starte 4 Threads und beobachte sie) – Paul

+0

Verstanden, danke. Aber wenn ich einen neuen Thread erzeuge, bekomme ich einen Assertionsfehler, kann nicht zweimal starten 'AssertionError: kann einen Prozess nicht zweimal starten' – ghovat

1

Ich würde nicht selbst die Verwaltung des Prozesspools übernehmen. Stattdessen würde ich das ProcessPoolExecutor vom concurrent.future Modul verwenden.

Keine Notwendigkeit, die WorkerProcess erben, um die Process Klasse zu erben. Schreiben Sie einfach Ihren tatsächlichen Code in die Klasse und senden Sie ihn dann an einen Prozesspool-Executor. Der Executor hätte einen Pool von Prozessen, die immer bereit sind, Ihre Aufgaben auszuführen.

Auf diese Weise können Sie die Dinge einfach und weniger Kopfschmerzen für Sie halten.

Sie können hier mehr über in meinem Blogbeitrag lesen: http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

Beispielcode:

from concurrent.futures import ProcessPoolExecutor 
from time import sleep 

def return_after_5_secs(message): 
    sleep(5) 
    return message 

pool = ProcessPoolExecutor(3) 

future = pool.submit(return_after_5_secs, ("hello")) 
print(future.done()) 
sleep(5) 
print(future.done()) 
print("Result: " + future.result()) 
+0

Danke, aber was passiert, wenn es eine Ausnahme in der Funktion return_after_5_secs gibt und der Prozess wurde getötet, wird es einen neuen Prozess generieren. In meinem Fall muss ich immer 4 Verbraucher laufen: Danke – ghovat

+0

Ja, es wird immer die Anzahl der Prozesse, die Sie erwähnen. In meinem Codebeispiel ('pool = ProcessPoolExecutor (3)') habe ich 3 Prozesse im Pool. – masnun