2017-12-31 120 views
1

Ich habe versucht, ein Problem gleichzeitig zu lösen, die das Thread-Pool-Muster sehr schön passt. Hier werde ich versuchen, einen minimalen repräsentatives Beispiel zu bieten:Thread-Pool in einer Warteschlange in C++

sagen, wir haben eine pseudo-Programm wie folgt aus:

Q : collection<int> 
while (!Q.empty()) { 
    for each q in Q { 
     // perform some computation 
    } 
    // assign a new value to Q 
    Q = something_completely_new(); 
} 

Ich versuche, dass in einer parallelen Art und Weise zu implementieren, mit n-1 Arbeiter und eine Haupt Faden. Die Arbeiter führen die Berechnung in der inneren Schleife aus, indem sie Elemente aus Q greifen.

Ich habe versucht, dies zu lösen mit zwei bedingten Variablen, work, auf denen die Master-Threads benachrichtigt die Arbeiter, die Q zugewiesen wurde, und eine andere, , wo die Arbeiter Master benachrichtigen, dass die gesamte Berechnung durchgeführt werden kann.

Hier ist meine C++ Code:

#include <iostream> 
#include <mutex> 
#include <condition_variable> 
#include <queue> 
#include <thread> 

using namespace std; 

std::queue<int> Q; 
std::mutex mut; 
std::condition_variable work; 
std::condition_variable work_done; 

void run_thread() { 
    for (;;) { 
     std::unique_lock<std::mutex> lock(mut); 
     work.wait(lock, [&] { return Q.size() > 0; }); 

     // there is work to be done - pretend we're working on something 
     int x = Q.front(); Q.pop(); 
     std::cout << "Working on " << x << std::endl; 

     work_done.notify_one(); 
    } 
} 

int main() { 
    // your code goes here 
    std::vector<std::thread *> workers(3); 

    for (size_t i = 0; i < 3; i++) { 
     workers[i] = new std::thread{ 
      [&] { run_thread(); } 
     }; 
    } 

    for (int i = 4; i > 0; --i) { 
     std::unique_lock<std::mutex> lock(mut); 
     Q = std::queue<int>(); 
     for (int k = 0; k < i; k++) { 
      Q.push(k); 
     } 
     work.notify_all(); 
     work_done.wait(lock, [&] { return Q.size() == 0; }); 
    } 

    for (size_t i = 0; i < 3; i++) { 
     delete workers[i]; 
    } 

    return 0; 
} 

Leider, nachdem es auf OS X kompilieren mit g++ -std=c++11 -Wall -o main main.cpp ich folgende Ausgabe:

Working on 0 
Working on 1 
Working on 2 
Working on 3 
Working on 0 
Working on 1 
Working on 2 
Working on 0 
Working on 1 
Working on 0 
libc++abi.dylib: terminating 
Abort trap: 6 

Nach einer Weile googeln es wie ein Segmentierungsfehler sieht . Es hat wahrscheinlich damit zu tun, dass ich bedingte Variablen missbrauche. Ich würde einige Einblicke schätzen, sowohl architektonisch (wie man an diese Art von Problem herantritt) als auch spezifisch, wie in dem, was ich hier falsch mache.

Ich schätze die Hilfe

+0

Nicht Ihre Krachen Problem zu beenden: beachten Sie, dass Ihr Zugang von 'Q' in' run_thread() 'ist nicht Thread-sicher:' std :: condition_variable :: wait() 'gibt die Sperre bei der Rückgabe frei. Wenn ich auf die Dokumentation für die Form von 'wait()' schaue, die ein Prädikat nimmt, bin ich nicht überzeugt, dass das Prädikat lambda ausgeführt wird, während die Sperre gehalten wird. – marko

+0

Der obige Anwendungsfall ist ein guter Fall für Futures & Versprechungen (http://en.cppreference.com/w/cpp/thread/future).Dies löst das Problem des Packens des Arbeitsergebnisses, auf das dann gewartet werden kann. – marko

+0

Der Zugriff auf 'Q' in' run_thread' ist threadsicher. Wenn 'condition_variable :: wait 'zurückkehrt, wird die Sperre für den Mutex übernommen, also sind diese Zeilen int x = Q.front(); Q.pop(); 'sind sicher. Die Ausführung des Prädikats in der 'wait'-Methode ist ebenfalls threadsicher, siehe Dokumentation http://en.cppreference.com/w/cpp/thread/condition_variable/wait, Beschreibung der Version mit Prädikat' Beachten Sie, dass vor der Eingabe dieser Methode die Sperre aktiviert sein muss erworben, nach dem Warten (Sperren) wird es auch wieder erworben, dh Schloss kann als Schutz vor pred() Zugriff verwendet werden. So ist der Zugriff auf "Q" in Lambda sicher. – rafix07

Antwort

2

Ihre Anwendung von std::terminate getötet wurde.

Leib Thread-Funktion ist unendlich-Schleife, so dass, wenn diese Zeilen ausgeführt werden

for (size_t i = 0; i < 3; i++) { 
    delete workers[i]; 
} 

Sie Threads löschen, die noch ausgeführt werden (jeder Thread ist in joinable Zustand). Wenn Sie destructor Faden nennen, die in joinable Zustand ist die folgende Sache (von http://www.cplusplus.com/reference/thread/thread/~thread/)

passiert, wenn der Faden joinable ist, wenn sie zerstört, beenden() aufgerufen wird.

so, wenn Sie terminate wollen nicht genannt werden, sollten Sie detach() Verfahren nach Threads zu schaffen nennen.

for (size_t i = 0; i < 3; i++) { 
    workers[i] = new std::thread{ 
     [&] { run_thread(); } 
    }; 
    workers[i]->detach(); // <--- 
} 
+0

Das Entfernen der Threads führt dazu, dass "std :: thread" den Zugriff verweigert und dann verloren geht. Wenn Sie sie vor dem Löschen des 'std :: thread 'verbinden, bedeutet' main() 'für immer (es gibt nur wenige Umstände, bei denen es eine gute Idee ist, den zugrunde liegenden Pthread zu löschen, ohne dass er konkurriert). Eine bessere Lösung ist, dass jeder Thread ein Flag hat, das als Ausgangsbedingung verwendet werden kann, und call detach(). – marko

2

Nur weil die Warteschlange leer ist bedeutet nicht, dass die Arbeit erledigt ist.

finished = true; 
work.notify_all(); 
for (size_t i = 0; i < 3; i++) { 
    workers[i].join(); // wait for threads to finish 
    delete workers[i]; 
} 

und wir brauchen eine Art und Weise die Fäden

for (;!finshed;) { 
    std::unique_lock<std::mutex> lock(mut); 
    work.wait(lock, [&] { return Q.size() > 0 || finished; }); 
    if (finished) 
     return;