Beachten Sie, dass Sie mit einer Reihe von komplexen dtype beginnen können: es als ein Array von homogenem dtype
In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')
und Aussicht:
In [5]: data2 = data.view('float32')
und später konvertieren es zurück zu komplexen dtype:
In [7]: data3 = data2.view('float32, (250000,2)float32')
Das Ändern des dtype ist eine sehr schnelle Operation; Es beeinflusst nicht die zugrunde liegenden Daten, sondern nur die Art, wie NumPy es interpretiert. So ist das Ändern des D-Typs praktisch kostenlos.
Was Sie also über Arrays mit einfachen (homogenen) dtypes gelesen haben, können Sie mit dem obigen Trick einfach auf Ihren komplexen dtype anwenden.
Der folgende Code leiht viele Ideen aus J.F. Sebastian's answer, here.
import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64
def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff)/4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1
def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr,))) as pool:
chunk_size = int(len(peaks)/processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)
if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)
Wenn Sie, dass die verschiedenen Prozesse, die die Aufgaben ausführen garantieren
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
nie die Daten an den gleichen Stellen verändern konkurrieren, dann glaube ich, Sie tatsächlich die Sperre mit verzichten kann
with shared_arr.get_lock():
aber ich nicht Ihren Code gut genug, um sicher zu wissen, um auf der sicheren Seite zu sein, schloss ich das Schloss.
Was ist 'mp'? Das 'Multiprocessing'-Modul? – delnan
ja, ich werde das zur Frage hinzufügen –