2015-03-05 3 views
5

Ich kämpfe lokalen Thread-ndarrays mit cython.parallel zu initialisieren:cython.parallel: Wie thread-local ndarray buffer initialisieren?

Pseudo-Code:

cdef: 
    ndarray buffer 

with nogil, parallel(): 
    buffer = np.empty(...) 

    for i in prange(n): 
     with gil: 
      print "Thread %d: data address: 0x%x" % (threadid(), <uintptr_t>buffer.data) 

     some_func(buffer.data) # use thread-local buffer 

cdef void some_func(char * buffer_ptr) nogil: 
    (... works on buffer contents...) 

Mein Problem ist, dass in allen Threads buffer.data Punkte an die gleiche Adresse. Nämlich die Adresse des Threads, der zuletzt buffer zugewiesen wurde.

Trotz buffer innerhalb des parallel() (oder alternativ prange) -Block zugeordnet ist, macht nicht Cython buffer a private oder faden lokale Variable, sondern hält es als shared Variable.

Als Ergebnis buffer.data verweist auf die gleiche Speicherregion Chaos auf meinem Algorithmus.

Dies ist kein Problem ausschließlich mit NDarray-Objekten, sondern scheinbar mit allen cdef class definierten Objekten.

Wie löse ich dieses Problem?

+1

Können Sie rufen 'np.empty' ohne gil? –

+1

vielleicht [diese Antwort] (http://stackoverflow.com/a/20520295/832621) bringt, was Sie wollen ... –

+1

@BiRico Ist das eine rhetorische Frage :)? Nein, Sie können definitiv kein numpiges Array (oder eine Speicheransicht) innerhalb eines 'nogil' Blocks instanziieren (andernfalls würde das Array nicht in Pythons verwaltetem Speicher zugewiesen werden und könnte keine Garbage Collected sein usw.) –

Antwort

2

Ich denke, ich habe endlich eine Lösung für dieses Problem gefunden, die ich mag. Die kurze Version ist, dass Sie ein Array erstellen, die Form hat:

(number_of_threads, ...<whatever shape you need in the thread>...) Dann ruft openmp.omp_get_thread_num und verwendet, das die Array-Index eine „thread-local“ Untergruppe zu erhalten. Dies vermeidet das Vorhandensein eines separaten Arrays für jeden Schleifenindex (was enorm sein könnte), verhindert jedoch auch, dass sich Threads gegenseitig überschreiben.

Hier ist eine grobe Version von dem, was ich getan habe:

import numpy as np 
import multiprocessing 

from cython.parallel cimport parallel 
from cython.parallel import prange 
cimport openmp 

cdef extern from "stdlib.h": 
    void free(void* ptr) 
    void* malloc(size_t size) 
    void* realloc(void* ptr, size_t size) 

... 

cdef int num_items = ... 
num_threads = multiprocessing.cpu_count() 
result_array = np.zeros((num_threads, num_items), dtype=DTYPE) # Make sure each thread uses separate memory 
cdef c_numpy.ndarray result_cn 
cdef CDTYPE ** result_pointer_arr 
result_pointer_arr = <CDTYPE **> malloc(num_threads * sizeof(CDTYPE *)) 
for i in range(num_threads): 
    result_cn = result_array[i] 
    result_pointer_arr[i] = <CDTYPE*> result_cn.data 

cdef int thread_number 
for i in prange(num_items, nogil=True, chunksize=1, num_threads=num_threads, schedule='static'): 
    thread_number = openmp.omp_get_thread_num() 
    some_function(result_pointer_arr[thread_number])