1

Nehmen wir an, es gibt eine Anzahl von Threads, die aus einer Schleife bestehen, die Instanzen derselben Funktion ausführt, aber der Start jeder Iteration muss synchronisiert werden (so müssen die Threads, die zuerst fertig sind, auf die letzte warten, um eine neue Iteration zu beginnen)). Wie kann dies in C++ 11 getan werden?Wie synchronisiert man Instanzen einer Funktion, die auf verschiedenen Threads läuft (in C++ 11)?

...

Der Rest der Post ist genau das, was ich versucht habe, und wie es funktioniert nicht.

Ich verwende einen Zähler, "Sync", zunächst auf 3 (die Anzahl der Threads). Jeder Thread am Ende der Funktion zieht 1 von diesem Zähler ab und beginnt zu warten. Wenn der Zähler 0 erreicht, bedeutet dies, dass die 3 von ihnen eine Runde beendet haben. Der Hauptzweig setzt den Zähler auf 3 zurück und benachrichtigt die Threads, um sie aufzuwecken.

Dies funktioniert die meiste Zeit, aber manchmal ein oder zwei der Threads nicht aufwachen.

So sind diese die globalen Variablen:

mutex syncMutex; 
condition_variable syncCV; 
int sync; 

Dies ist am Ende der Funktion, die in dem Gewinde in einer Schleife läuft:

unique_lock<mutex> lk(syncMutex); 
cout << "Thread num: " << mFieldNum << " got sync value: " << sync; 
sync --; 
syncCV.notify_all(); 
cout << " and goes to sleep..." << endl; 
syncCV.wait(lk, []{return sync == numFields;}); 
cout << "Thread num: " << mFieldNum << " woke up" << endl; 
} 

Und das läuft in einer Schleife in der Hauptthread:

unique_lock<mutex> lk(syncMutex); 
syncCV.wait(lk, []{return sync == 0;}); 
sync = 3; 
lk.unlock(); 
cout << "Notifying all threads!" << endl; 
syncCV.notify_all(); 

Dies ist die Ausgabe, die es erzeugt, wenn es fehlschlägt (Thread # 3 nicht aufwachen):

Thread num: 1 got sync value: 3 and goes to sleep... 
Thread num: 2 got sync value: 2 and goes to sleep... 
Thread num: 3 got sync value: 1 and goes to sleep... 
Notifying all threads! 
Thread num: 1 woke up 
Thread num: 2 woke up 
Thread num: 3 woke up 
Thread num: 2 got sync value: 3 and goes to sleep... 
Thread num: 1 got sync value: 2 and goes to sleep... 
Thread num: 3 got sync value: 1 and goes to sleep... 
Notifying all threads! 
Thread num: 2 woke up 
Thread num: 1 woke up 
Thread num: 2 got sync value: 3 and goes to sleep... 
Thread num: 1 got sync value: 2 and goes to sleep... 

Hat jemand eine Ahnung? Danke fürs Lesen.

+0

Da jeder Thread läuft in einer Schleife, nach dem Faden 1 oder 2 aufgewacht, sync-- ausgeführt wurde, bevor Thread 3 [] {return sync == numFields;} Prädikat ausgeführt wurde. Das Prädikat wurde als falsch bewertet, sodass Thread 3 nicht aufwachte. –

+0

Danke @TonyJ Weißt du, wie das behoben werden könnte? – siflun

Antwort

1

Es gibt eine Reihe von Problemen mit der Thread-Synchronisierung. Tony hat einen in seinem Kommentar erwähnt. Sie haben auch eine potentielle Race-Bedingung in Ihrem Haupt-Loop-Code, wo Sie lk.unlock() aufrufen, bevor Sie syncCV.notify_all() aufrufen. (Dies könnte einem Thread erlauben, das Signal notify_all zu verpassen.)

Ich würde Ihren Code auf zwei Arten anpassen. Um die Verwendung von "sync == numFields" als Bedingung zu behandeln, die, wie Tony bemerkte, nicht wahr sein kann, nachdem ein anderer Thread sync- ausgeführt hat, ist es sinnvoll, als Bedingung zu verwenden, dass jeder Thread nur ausgeführt wird einmal pro Haupt-Thread-Schleife. In meinem Beispielcode wird dies durch Einführung der Variablen "done [numFields]" erreicht. Zweitens macht es Sinn, zwei Bedingungsvariablen einzuführen - eine, um den Worker-Threads zu signalisieren, dass eine neue Main-Loop-Iteration begonnen hat, und eine zweite, um dem Haupt-Thread zu signalisieren, dass die Worker-Threads ausgeführt werden. (Beachten Sie, dass die beiden Zustandsgrößen die gleiche Mutex verwenden.)

Hier ist ein komplettes Programm, auf dem Beispielcode modelliert, die diese beiden Ansätze beinhaltet:

#include <iostream> 
using std::cout; 
using std::endl; 

#include <condition_variable> 
#include <mutex> 
#include <thread> 
#include <vector> 

std::mutex syncMutex; 
std::condition_variable readyCV; 
std::condition_variable doneCV; 
int sync; 
bool exitFlag; 

const int numFields = 5; 
bool done[numFields]; 

const int nloops = 10; 

void thread_func(int i) { 
    int mFieldNum = i; 
    while (true) { 
    std::unique_lock<std::mutex> lk(syncMutex); 
    readyCV.wait(lk, [mFieldNum]{return exitFlag || !done[mFieldNum-1];}); 
    if (exitFlag) break; 
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << sync; 
    if (--sync == 0) doneCV.notify_all(); 
    done[mFieldNum-1] = true; 
    readyCV.notify_all(); 
    cout << " and goes to sleep..." << endl; 
    } 
} 

int main (int argc, char* argv[]) { 
    exitFlag = false; 
    sync = 0; 
    std::vector<std::thread> threads; 
    for (int i = 0; i < numFields; i++) { 
    done[i] = true; 
    threads.emplace_back (thread_func, i+1); 
    } 
    for (int i = 0; i <= nloops; i++) { 
    std::unique_lock<std::mutex> lk(syncMutex); 
    doneCV.wait(lk, []{return sync == 0;}); 
    cout << "main loop (lk held), i = " << i << endl; 
    sync = numFields; 
    if (i == nloops) exitFlag = true; 
    else    for (auto &b : done) b = false; 
    cout << "Notifying all threads!" << endl; 
    readyCV.notify_all(); 
    } 

    for (auto& t : threads) t.join(); 
} 

(Außerdem habe ich eine exitFlag und std :: thread :: join() 's so kann das Programm gut säubern und beenden.)

Dies ist sehr ähnlich zu einer klassischen Producer-Consumer-Implementierung (ein Produzent, numFields Verbraucher), mit der zusätzlichen Einschränkung dass jeder Consumer-Thread nur einmal pro Erzeuger-Thread-Schleife ausgeführt wird.

Sie können auch im Wesentlichen die gleiche Programmlogik einfacher erreichen, wenn Sie bereit sind, auf die Wiederverwendung der Worker-Threads zu verzichten.(In Ihrem Beispielcode und meinem obigen Beispiel fungieren sie als eine Art spezialisierter Threadpool.) Im nächsten Beispiel werden neue Threads für jede Iteration der Hauptschleife erstellt. Dies macht die Thread-Synchronisation einfacher und eliminiert die Zustandsvariablen.

#include <iostream> 
using std::cout; 
using std::endl; 

#include <atomic> 
#include <mutex> 
#include <thread> 
#include <vector> 

std::mutex coutMutex; 

std::atomic<int> sync; 

const int numFields = 5; 
bool done[numFields]; 

const int nloops = 10; 

void thread_func(int i) { 
    int mFieldNum = i; 
    int mySync = sync--; 
    { 
    std::lock_guard<std::mutex> lk(coutMutex); 
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << mySync << endl; 
    } 
} 

int main (int argc, char* argv[]) { 
    for (int i = 0; i < nloops; i++) { 
    cout << "main loop, i = " << i << endl; 
    std::vector<std::thread> threads; 
    sync = numFields; 
    for (int i = 0; i < numFields; i++) threads.emplace_back (thread_func, i+1); 
    for (auto& t : threads) t.join(); 
    } 
} 

(coutMutex ist ein nicety, so dass die Ausgabe der Konsole nicht verstümmelt hat, aber es ist nicht notwendig, für die Kern-Synchronisationslogik.)

Wenn in Ihrem realen Welt Sie Use Case don‘ t brauche thread_func, um von Iteration zu Iteration am Leben zu bleiben (zum Beispiel, um einen Zustand zu erhalten), und wenn jeder Aufruf von thread_func genug funktioniert, dann spielt der Aufwand für das Erstellen eines neuen Threads keine Rolle Das Erstellen neuer Threads für jede Main-Loop-Iteration (anstatt Threads erneut zu verwenden) ist einfach, sinnvoll und einfacher.

Happy Multi-Threaded Hacking!

K. Frank

+0

Vielen Dank! Dies war meine erste Frage hier und Ihre Hilfe spricht wirklich von dieser Gemeinschaft. Nach vielen frustrierenden Stunden habe ich auch eine ähnliche Lösung gefunden. Ich machte die Sync-Variable einen Vektor, so dass jeder Thread anzeigen würde, dass es beendet wurde, indem Sie ihre Position in dem Vektor auf Null festlegen, und der Hauptthread warten würde, um alle Nullen zu sehen, um zurückzusetzen. – siflun

Verwandte Themen