2013-04-12 4 views
11

Ich habe eine ganze Reihe von Fragen über SO über das Teilen von Arrays gelesen und es scheint einfach genug für einfache Arrays, aber ich bin stecken, versuchen, es für das Array, das ich habe.Machen Sie mein NumPy-Array über Prozesse hinweg

import numpy as np 
data=np.zeros(250,dtype='float32, (250000,2)float32') 

Ich habe versucht, dies zu einem gemeinsamen Array Umwandlung von irgendwie versucht mp.Array die data akzeptieren, ich habe auch versucht, wie mit ctypes als solche das Array zu schaffen machen:

import multiprocessing as mp 
data=mp.Array('c_float, (250000)c_float',250) 

Der einzige Weg, ich habe es geschafft, dass mein Code funktioniert und keine Daten an die Funktion weiterleitet, sondern eine codierte Zeichenkette zur Dekomprimierung/Decodierung weiterleitet, dies würde jedoch in n (Anzahl der Zeichenketten) Prozesse enden, die aufgerufen werden, was redundant erscheint. Meine gewünschte Implementierung basiert darauf, die Liste der binären Zeichenfolgen in x (Anzahl der Prozesse) aufzuschneiden und diesen Block data und einen index an die Prozesse zu übergeben, die funktionieren, außer dass data lokal geändert wird, daher die Frage wie man es macht shared, wäre jedes Beispiel, das mit einem benutzerdefinierten (verschachtelten) numpy-Array arbeitet, bereits eine große Hilfe.

PS: Diese Frage ist ein Follow-up von Python multi-processing

+1

Was ist 'mp'? Das 'Multiprocessing'-Modul? – delnan

+0

ja, ich werde das zur Frage hinzufügen –

Antwort

10

Beachten Sie, dass Sie mit einer Reihe von komplexen dtype beginnen können: es als ein Array von homogenem dtype

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32') 

und Aussicht:

In [5]: data2 = data.view('float32') 

und später konvertieren es zurück zu komplexen dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32') 

Das Ändern des dtype ist eine sehr schnelle Operation; Es beeinflusst nicht die zugrunde liegenden Daten, sondern nur die Art, wie NumPy es interpretiert. So ist das Ändern des D-Typs praktisch kostenlos.

Was Sie also über Arrays mit einfachen (homogenen) dtypes gelesen haben, können Sie mit dem obigen Trick einfach auf Ihren komplexen dtype anwenden.


Der folgende Code leiht viele Ideen aus J.F. Sebastian's answer, here.

import numpy as np 
import multiprocessing as mp 
import contextlib 
import ctypes 
import struct 
import base64 


def decode(arg): 
    chunk, counter = arg 
    print len(chunk), counter 
    for x in chunk: 
     peak_counter = 0 
     data_buff = base64.b64decode(x) 
     buff_size = len(data_buff)/4 
     unpack_format = ">%dL" % buff_size 
     index = 0 
     for y in struct.unpack(unpack_format, data_buff): 
      buff1 = struct.pack("I", y) 
      buff2 = struct.unpack("f", buff1)[0] 
      with shared_arr.get_lock(): 
       data = tonumpyarray(shared_arr).view(
        [('f0', '<f4'), ('f1', '<f4', (250000, 2))]) 
       if (index % 2 == 0): 
        data[counter][1][peak_counter][0] = float(buff2) 
       else: 
        data[counter][1][peak_counter][1] = float(buff2) 
        peak_counter += 1 
      index += 1 
     counter += 1 


def pool_init(shared_arr_): 
    global shared_arr 
    shared_arr = shared_arr_ # must be inherited, not passed as an argument 


def tonumpyarray(mp_arr): 
    return np.frombuffer(mp_arr.get_obj()) 


def numpy_array(shared_arr, peaks): 
    """Fills the NumPy array 'data' with m/z-intensity values acquired 
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'. 

    The m/z values are assumed to be ordered without validating this 
    assumption. 

    Note: This function uses multi-processing 
    """ 
    processors = mp.cpu_count() 
    with contextlib.closing(mp.Pool(processes=processors, 
            initializer=pool_init, 
            initargs=(shared_arr,))) as pool: 
     chunk_size = int(len(peaks)/processors) 
     map_parameters = [] 
     for i in range(processors): 
      counter = i * chunk_size 
      # WARNING: I removed -1 from (i + 1)*chunk_size, since the right 
      # index is non-inclusive. 
      chunk = peaks[i*chunk_size : (i + 1)*chunk_size] 
      map_parameters.append((chunk, counter)) 
     pool.map(decode, map_parameters) 

if __name__ == '__main__': 
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250) 
    peaks = ... 
    numpy_array(shared_arr, peaks) 

Wenn Sie, dass die verschiedenen Prozesse, die die Aufgaben ausführen garantieren

if (index % 2 == 0): 
    data[counter][1][peak_counter][0] = float(buff2) 
else: 
    data[counter][1][peak_counter][1] = float(buff2) 

nie die Daten an den gleichen Stellen verändern konkurrieren, dann glaube ich, Sie tatsächlich die Sperre mit verzichten kann

with shared_arr.get_lock(): 

aber ich nicht Ihren Code gut genug, um sicher zu wissen, um auf der sicheren Seite zu sein, schloss ich das Schloss.

+0

Sie sind garantiert nie auf den gleichen 'Bereich' von Daten aufgrund der Zähler, die ich an die Funktion übergeben (die berechnet wird, indem ich 'i * chunk_size 'zu erreichen).Ich werde Ihre Antwort morgen früh ausprobieren und höchstwahrscheinlich diese Antwort akzeptieren. –

0
from multiprocessing import Process, Array 
import numpy as np 
import time 
import ctypes 

def fun(a): 
    a[0] = -a[0] 
    while 1: 
     time.sleep(2) 
     #print bytearray(a.get_obj()) 
     c=np.frombuffer(a.get_obj(),dtype=np.float32) 
     c.shape=3,3 
     print 'haha',c 


def main(): 
    a = np.random.rand(3,3).astype(np.float32) 
    a.shape=1*a.size 
    #a=np.array([[1,3,4],[4,5,6]]) 
    #b=bytearray(a) 
    h=Array(ctypes.c_float,a) 
    print "Originally,",h 

    # Create, start, and finish the child process 
    p = Process(target=fun, args=(h,)) 
    p.start() 
    #p.join() 
    a.shape=3,3 
    # Print out the changed values 
    print 'first',a 
    time.sleep(3) 
    #h[0]=h[0]+1 
    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32) 



if __name__=="__main__": 
    main() 
Verwandte Themen