2013-01-08 4 views
5

ich starten 3 Prozesse und ich möchte, dass sie einen String in einem gemeinsamen Array setzen, auf den Index zu dem Prozess entsprechenden (i).Python: Multiprocessing und Array von c_char_p

Blick auf den Code unten, erzeugt die Ausgabe:

['test 0', None, None] 
['test 1', 'test 1', None] 
['test 2', 'test 2', 'test 2'] 

Warum 'test 0' von test 1 überschrieben werden, und test 1 von test 2?

Was ich will, ist (um nicht wichtig ist):

['test 0', None, None] 
['test 0', 'test 1', None] 
['test 0', 'test 1', 'test 2'] 

Der Code:

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Value, Lock, Process, Array 
import ctypes 
from ctypes import c_int, c_char_p 

class Consumer(multiprocessing.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      multiprocessing.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr=self.arr, lock=self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr=None, lock=None): 
     with lock: 
      arr[self.i] = "test %d" % self.i 
      print arr[:] 

    def __str__(self): 
     return 'ARC' 

    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    arr = Array(ctypes.c_char_p, 3) 

    lock = multiprocessing.Lock() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

ich Python läuft 2.7.3 (Ubuntu)

Antwort

5

Dieses Problem scheint ähnlich wie this one. Dort spekuliert, J. F. Sebastian, dass die Zuordnung zu arr[i] Punkte arr[i] auf eine Speicheradresse, die nur sinnvoll, auf die subprocess machen die Zuordnung war. Die anderen Subprozesse rufen den Müll ab, wenn sie diese Adresse betrachten.

Es gibt mindestens zwei Möglichkeiten, dieses Problem zu vermeiden. Eines ist eine multiprocessing.manager Liste zu verwenden:

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, lock, lst): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.lock = lock 
      self.lst = lst 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(lock = self.lock, lst = self.lst) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, lock, lst): 
     with lock: 
      lst[self.i] = "test {}".format(self.i) 
      print([lst[i] for i in range(3)]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    manager = mp.Manager() 
    lst = manager.list(['']*3) 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

Eine andere Möglichkeit ist es, einen Shared-Array mit einer festen Größe wie mp.Array('c', 10) zu verwenden.

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr = self.arr, lock = self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr, lock): 
     with lock: 
      arr[self.i].value = "test {}".format(self.i) 
      print([a.value for a in arr]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    arr = [mp.Array('c', 10) for i in range(3)] 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

ich spekulieren, dass der Grund, warum dies funktioniert, wenn mp.Array(ctypes.c_char_p, 3) nicht der Fall ist, liegt daran, dass mp.Array('c', 10) eine feste Größe, so dass die Adresse nie Speicher ändert, während mp.Array(ctypes.c_char_p, 3) eine variable Größe hat, so dass die Speicheradresse kann sich ändern, wenn arr[i] ist einem größeren String zugewiesen.

Vielleicht ist es das, was the docs warnen etwa, wenn es heißt,

Obwohl es möglich ist, ein Zeiger im gemeinsam genutzten Speicher zu speichern nicht vergessen, dass dies zu einer Stelle im Adressraum einer bestimmten beziehen verarbeiten. Allerdings ist der Zeiger sehr wahrscheinlich im Kontext eines zweiten Prozesses ungültig zu sein und zu versuchen, den Zeiger dereferenzieren von der zweiten Prozess zu einem Absturz führen kann.

+0

Vielen Dank eine Milliarde Mal! Beide Lösungen funktionieren in der Tat :) Ich bin auf den Post von J. F. Sebastian gestoßen, aber aus irgendeinem Grund konnte ich ihn nicht umsetzen ... doh! Jetzt sagst du mir, wo ich deine Statue bauen soll! Vielen Dank nochmal ... – Ujoux

+0

Danke für die interessante Frage und deine Begeisterung! Hoffe, Sie auf Stackoverflow mehr zu sehen. Wie für Statuen - ich glaube, Sie auf den Button uparrow über dem Häkchen macht ein ziemlich genial ein; ^) – unutbu

+0

Will tun, sobald ich die nötige Reputation von 15 haben, werde ich nicht vergessen;) – Ujoux