2016-12-14 1 views
2

I um eine Schleife parallelisieren sollen (mit tbb), die einige, aber teuer vektorisierbar Iterationen enthält (zufällig verteilt). Meine Idee war, diese zu puffern und den Puffer zu leeren, wenn die Vektorgröße erreicht ist. Ein solcher Puffer muss Thread-lokal sein. Zum BeispielSpülung Gewinde lokale Puffer am Ende der parallelen Schleife mit TBB

// dummy for testing 
void do_vectorized_work(size_t k, size_t*indices) 
{} 
// dummy for testing 
bool requires_expensive_work(size_t k) 
{ return (k&7)==0; } 

struct buffer 
{ 
    size_t K=0, B[vector_size]; 
    void load(size_t i) 
    { 
    B[K++]=i; 
    if(K==vector_size) 
     flush(); 
    } 
    void flush() 
    { 
    do_vectorized_work(K,B); 
    K=0; 
    } 
}; 

void do_work_in_parallel(size_t N) 
{ 
    tbb::enumerable_thread_specific<buffer> tl_buffer; 

    tbb::parallel_for(size_t(0),N,[&](size_t i) 
    { 
    if(requires_expensive_work(i)) 
     tl_buffer.local().load(i); 
    }); 
} 

Dies lässt jedoch die Puffer nicht leer ist, so habe ich noch jede von ihnen ein letztes Mal

for(auto&b:tl_buffer) 
    b.flush(); 

aber das ist seriell zu spülen! Natürlich kann ich auch parallel

using tl_range = typename tbb::enumerable_thread_specific<buffer>::range_type; 
tbb::parallel_for(tl_buffer.range(),[](tl_range const&range) 
{ 
    for(auto r:range) 
    r->flush(); 
}); 

versuchen, dies zu tun, aber ich bin nicht sicher, dass dies effizient ist (da es nur so viele Puffer wie es Threads). Ich habe mich gefragt, ob es möglich ist, diesen letzten Flush nach dem Event zu vermeiden. I.e. ist es möglich, tbb::task s zu verwenden (tbb::parallel_for ersetzt) ​​in der Weise, dass die letzte Aufgabe jedes Threads seinen Puffer zu leeren ist?

Antwort

1

Es fiel mir ein, dass dies durch Reduktion gelöst werden können.

struct buffer 
{ 
    std::size_t K=0, B[vector_size]; 
    void load(std::size_t i) 
    { 
    B[K++]=i; 
    if(K==vector_size) flush(); 
    } 
    void flush() 
    { 
    do_vectorized_work(K,B); 
    K=0; 
    } 
    buffer(buffer const&, tbb::split) 
    {} 
    void operator()(tbb::block_range<std::size_t> const&range) 
    { for(i:range) load(i); } 
    bool empty() 
    { return K==0; } 
    std::size_t pop() 
    { return K? B[--K] : 0; } 
    void join(buffer&rhs) 
    { while(!rhs.empty()) load(rhs.pop()); } 
}; 

void do_work_in_parallel(std::size_t N) 
{ 
    buffer buff; 
    tbb::parallel_reduce(tbb::block_range<std::size_t>(0,N,vector_size),buff); 
    if(!buff.empty()) 
    buff.flush(); 
} 
2

Nein, ein Arbeiter-Thread nicht über vollständige Informationen darüber, ob diese besondere Aufgabe die letzte Aufgabe der gegebenen Arbeit ist oder nicht (das ist, wie die Arbeit stiehlt Werke). Daher ist es nicht möglich, eine solche Funktion auf der Ebene parallel_for oder dem Scheduler selbst zu implementieren. Daher würde ich Ihnen empfehlen, mit diesen beiden Ansätzen, die Sie beschreiben, fortzufahren.

Es gibt zwei andere Dinge, die Sie über das obwohl tun können.

  • machen es asynchron. I.e. eine Aufgabe in die Warteschlange stellen, die alles durchleuchtet. Es wird helfen, diesen Code aus dem heißen Pfad auf dem Hauptthread zu entfernen. Seien Sie vorsichtig, wenn Abhängigkeiten bestehen, die nach Abschluss dieser Aufgabe gesetzt werden müssen.
  • Verwenden Sie tbb::task_scheduler_observer, um threadspezifische Daten zu initialisieren und loszulassen, wenn Threads heruntergefahren werden oder wenn für einige Zeit keine Arbeit mehr besteht. Letzteres erfordert die Verwendung von local observer feature, die noch nicht offiziell unterstützt wird, aber für einige Jahre stabil bleibt.

Beispiel:

#define TBB_PREVIEW_LOCAL_OBSERVER 1 
#include <tbb/tbb.h> 
#include <assert.h> 

typedef void * buffer_t; 
const static int bufsz = 1024; 
class thread_buffer_allocator: public tbb::task_scheduler_observer { 
    tbb::enumerable_thread_specific<buffer_t> _buf; 
public: 
    thread_buffer_allocator() 
    : tbb::task_scheduler_observer(/*local=*/ true) { 
    observe(true); // activate the observer 
    } 
    ~thread_buffer_allocator() { 
    observe(false); // deactivate the observer 
    for(auto &b : _buf) { 
     printf("destructor: cleared: %p\n", b); 
     free(b); 
    } 
    } 
    /*override*/ void on_scheduler_entry(bool worker) { 
    assert(_buf.local() == nullptr); 
    _buf.local() = malloc(bufsz); 
    printf("on entry: %p\n", _buf.local()); 
    } 
    /*override*/ void on_scheduler_exit(bool worker) { 
    printf("on exit\n"); 
    if(_buf.local()) { 
     printf("on exit: cleared %p\n", _buf.local()); 
     free(_buf.local()); 
     _buf.local() = nullptr; 
    } 
    } 
}; 

int main() { 
    thread_buffer_allocator buffers_scope; 
    tbb::parallel_for(0, 1024*1024*1024, [&](auto i){ 
    usleep(i%3); 
    }); 
    return 0; 
} 
+0

Danke dafür. Ich denke nicht, dass der asynchrone Ansatz besser ist als meine im OP beschriebenen Versuche. Die Methode mit dem 'tbb :: task_scheduler_observer 'klingt interessant. Können Sie mit einem Code-Snippet beschreiben, wie das funktioniert? – Walter

+0

@Walter aktualisiert. Obwohl ich es nur auf on-line-Compiler versuchte, der nicht genug genug TBB mit lokalem Beobachter hat: http://coliru.stacked-crooked.com/a/11728cd935579cfe – Anton