2016-12-12 6 views
0

Ich habe eine Situation, in der ich einen einzelnen Worker-Thread erstellen möchte, der Aufgabe ausführt (im Wesentlichen einen Thread-Pool mit einem einzigen Thread zu konstruieren). Mehrere Threads können Aufgaben an sie senden, und der Thread hat eine Schleife, um sie auszuführen.Race-Bedingung in C++ Task-Thread mit Deque

nicht zu hart, so lange sein sollte als die ordnungsgemäße Verwendung von Schlössern gemacht wird, dachte ich, und so ist meine Implementierung wie folgt:

typedef std::function<void()> MyTask; 

class MyTaskPool { 
public: 
    MyTaskPool() { 
     this->closed = false; 
     this->thread = std::thread(std::bind(&MyTaskPool::run, this)); 
    } 

    ~MyTaskPool() { 
     this->closed = true; 
     this->conditionVariable.notify_one(); 
     this->thread.join(); 
    } 

    void post(MyTask task) { 
     { 
      std::lock_guard<std::mutex>(this->mutex); 
      this->tasks.push(task); 
     } 
     this->conditionVariable.notify_one(); 
    } 
private: 
    bool closed; 
    std::mutex mutex; 
    std::condition_variable conditionVariable; 
    std::thread thread; 
    std::deque<MyTask> tasks; 

    void run() { 
     while (true) { 
      boost::optional<MyTask> task; 
      { 
       std::lock_guard<std::mutex>(this->mutex); 
       if (this->closed) 
        return; 

       if (this->tasks.size() > 0) { 
        task = this->tasks.front(); 
        this->tasks.pop_front(); 
       } 
      } 

      if (task.is_initialized()) { 
       task(); 
      } else { 
       std::unique_lock<std::mutex> lock(this->mutex); 
       this->conditionVariable.wait(lock); 
      } 
     } 
    } 
} 

es gebaut, getestet, es funktioniert. Und es ist einfach zu bedienen; Ich kann einen neuen MyTaskPool erstellen und Aufgaben mit einem einfachen Lambda-Ausdruck dorthin stellen. Groß! Außer ... nachdem das Ding für eine längere Zeit benutzt wurde, bricht es plötzlich: this-> tasks.front() scheitert - mit einem Fehler, der mir sagt, dass der Iterator nicht dereferenziert werden kann. Meine Aufgaben sind ... leer? Sowohl der Code, der der Deque hinzugefügt und von ihm entfernt wird, wird durch eine Sperre geschützt, und dies sollte daher nicht passieren.

Kann jemand den Fehler sehen - was ich bin ziemlich sicher, ist eine Race-Bedingung irgendeiner Art?

Der eigentliche Code ist etwas komplexer, da einige Aufgaben mit jeder Aufgabe behandelt werden, aber das sollte für dieses Beispiel nicht relevant sein.

+0

nichts in diesem Code, der (außer vielleicht unechten Wakeups brechen kann, aber du kontrollierst sowieso die Warteschlangengröße). Post den komplexeren Code –

+0

sollte nicht 'tasks.is_initialized()' 'task.is_initialized()' sein? – NathanOliver

+1

auch, im Destruktor schützen Sie nicht Ihre 'this-> closed = true;'. Sie sollten Ihren Mutex in der ersten Zeile des Destruktors sperren –

Antwort

1

Der Fehler ist tatsächlich unterle und kaum bemerkbar: Die Sperre, die verwendet wird, um den Abruf der Aufgabe zu schützen, wird nicht in einer lokalen Variablen gespeichert. Dies führt dazu, dass es sofort zerstört und somit freigegeben wird.

Für Menschen, die gleichermaßen etwas zu implementieren suchen hier ist, wie mein Code jetzt, mit den Kommentaren von David und Sam in diese integriert funktioniert:

typedef std::function<void(MySharedResource)> MyTask; 

class MyTaskPool { 
public: 
    MyTaskPool() { 
     this->closed = false; 
     this->thread = std::thread(std::bind(&MyTaskPool::run, this)); 
    } 

    ~MyTaskPool() { 
     { 
      std::lock_guard<std::mutex> lock(this->mutex); 
      this->closed = true; 
     } 
     this->conditionVariable.notify_one(); 
     this->thread.join(); 
    } 

    void post(MyTask task) { 
     { 
      std::lock_guard<std::mutex> lock(this->mutex); 
      this->tasks.push(task); 
     } 
     this->conditionVariable.notify_one(); 
    } 
private: 
    bool closed; 
    std::mutex mutex; 
    std::condition_variable conditionVariable; 
    std::thread thread; 
    std::deque<MyTask> tasks; 

    void run() { 
     while (true) { 
      boost::optional<MyTask> task; 
      { 
       std::unique_lock<std::mutex> lock(this->mutex); 
       if (this->closed) 
        return; 

       if (this->tasks.size() > 0) { 
        task = this->tasks.front(); 
        this->tasks.pop_front(); 
       } else { 
        this->conditionVariable.wait(lock); 
       } 
      } 

      if (task.is_initialized()) { 
       task(); 
      } 
     } 
    } 
} 
+1

Sie haben ein paar Instanzen verpasst, in denen die Sperre erstellt und zerstört wurde. Ich habe die Antwort auf – UKMonkey

+0

Damn aktualisiert. Ich muss wirklich mit diesen Schlössern aufpassen, danke, dass du mich darauf hingewiesen hast. –

Verwandte Themen