2017-10-02 14 views
0

Um einige Leistungstests durchzuführen, muss ich eine Reihe von Threads an einem bestimmten Punkt in meinem Programm starten. Leider muss ich den Thread-basierten Weg gehen und kann keine Aufgaben (std :: async) verwenden, weil ich die gegebenen Threads an bestimmte Kerne (mit Affinität) anheften muss. Um diese Art von Verhalten zu erkennen, habe ich einen RAH - „One-Shot“ Ansatz erwähnt von Scott MeyersC++ - Threadpool für ausgesetzt Threads

Dies ist mein Code so weit:

template < class T > 
typename std::decay<T>::type decay_copy(T&& v) { 
    return std::forward<T>(v); 
} 


/** 
* delayed thread - more or less copied from Scott Meyers: 
* http://scottmeyers.blogspot.de/2013/12/threadraii-thread-suspension-trouble.html 
*/ 
class del_thread { 
    private: 
    using future_t   = std::shared_future<void>; 
    using thread_t   = std::thread; 

    enum execution_state { 
     WAITING, TRIGGERED, DISMISSED 
    }; 

    future_t  _future; 
    thread_t  _thread; 

    execution_state _state = WAITING; 

    public: 
    del_thread() = delete; 
    del_thread(del_thread const &) = delete; 
    del_thread &operator=(del_thread const & dt) = delete; 

    del_thread(del_thread && other): 
     _future(std::move(other._future)), 
     _thread(std::move(other._thread)), 
     _state(std::move(other._state)) { 
     other._state = DISMISSED; 
    } 

    del_thread &operator=(del_thread && dt) { 
     _future = std::move(dt._future); 
     _thread = std::move(dt._thread); 
     _state = std::move(dt._state); 
     dt._state = DISMISSED; 
     return *this; 
    } 

    template< typename op_t > 
    del_thread(op_t && operation, future_t const & future): 
     _thread( [ operation = decay_copy(std::forward<op_t>(operation)), 
        _future = future, 
        &_state = _state 
       ]() { 
        _future.wait(); 
        if(_state == TRIGGERED || _state == DISMISSED) { 
        return; 
        } 
        _state = TRIGGERED; 
        operation(); 
       } 
      ) { 
    } 

    ~del_thread() { 
     join(); 
    } 

    void join() { 
     if(_state == DISMISSED) { 
     return; 
     } 
     if(_thread.joinable()) { 
     _thread.join(); 
     } 
    } 
}; 

class batch_thread_pool { 
    private: 
    std::promise<void> _promise; 
    std::shared_future<void> _future; 
    std::vector<del_thread> _pool; 

    public: 
    batch_thread_pool() : 
     _future(_promise.get_future().share()) {} 
    template< typename op_t > 
    void add_thread(op_t && operation) { 
     _pool.emplace_back(del_thread(std::forward<op_t>(operation), std::ref(_future))); 
    } 

    void run_batch() { 
     _promise.set_value(); 
     _pool.clear(); 
    } 
}; 

Die Grundidee ist eine hohlraum Zukunft nutzen zu erstellen ein suspendierter Thread, führe die Thread-Setup-Sachen wie Affinität und/oder Priorität aus und starte alle Threads gleichzeitig. Wie man sieht, sollte der Haupt-Thread den hinzugefügten Threads beitreten, wenn der Pool gelöscht wird. Um den Thread zu testen habe ich ein wenig Haupt, wie die Suche:

#include <chrono> 
#include <iostream> 

#include "threads.h" 

void runFunc() { 
    std::cout << "In runFunc...\n"; 
    return; 
} 
void run2Func() { 
    std::cout << "In run2Func...\n"; 
    return; 
} 

int main() { 
    batch_thread_pool tp; 
    tp.add_thread(runFunc); 
    tp.add_thread(run2Func); 
    std::cout << "Working while thread 'suspended'...\n"; 
    tp.run_batch(); 
    std::cout << "Working while thread runs asynchronously...\n"; 
    std::this_thread::sleep_for(std::chrono::milliseconds(500)); 
    std::cout << "Done!\n"; 
} 

Leider ist die Themen sind nicht konsequent gestartet. Manchmal werden beide Methoden (runFunc und run2Func) ausgeführt, manchmal nur einer von ihnen. Ich denke es ist, weil der Haupt-Thread endet, bevor ein Join passiert. Ist das korrekt oder weiß jemand, was ich falsch mache?

Mit freundlichen Grüßen

Antwort

1
execution_state _state; 

wenn Sie diese Objekte bewegen unterstützen möchten, kann dies nicht in dynamischem Speicher sein.

Ehrlich gesagt, die einzige Verwendung, die ich dafür denken könnte, wäre ABORT. Fahrener von thread s sind nicht joinable ...

class del_thread { 
private: 
    using future_t  = std::shared_future<void>; 
    using thread_t  = std::thread; 

    enum execution_state { 
    WAITING, TRIGGERED, ABORT 
    }; 

    std::unique_ptr<std::atomic<execution_state>> _state = 
    std::make_unique<std::atomic<execution_state>>(WAITING); 

    future_t _future; 
    thread_t _thread; 

public: 
    void abort() const { 
    if (_state) *_state = ABORT; 
    } 
    del_thread() = delete; 
    del_thread(del_thread const &) = delete; 
    del_thread &operator=(del_thread const & dt) = delete; 

    del_thread(del_thread && other) = default; 
    del_thread &operator=(del_thread && dt) = default; 

    template< class op_t > 
    del_thread(op_t && operation, future_t const & future): 
    _thread(
     [ 
     operation = std::forward<op_t>(operation), 
     _future = future, 
     _state = _state.get() 
     ]() 
     { 
     _future.wait(); 
     if (*_state == ABORT) return; 
     *_state = TRIGGERED; 
     operation(); 
     } 
    ) 
    {} 

    ~del_thread() { 
    join(); 
    } 

    void join() { 
    if(!_state) { 
     return; 
    } 
    if(_thread.joinable()) { 
     _thread.join(); 
    } 
    } 
}; 

wenn Sie nicht abbrechen wollen:

class del_thread { 
private: 
    using future_t  = std::shared_future<void>; 
    using thread_t  = std::thread; 

    future_t _future; 
    thread_t _thread; 

public: 
    del_thread() = delete; 
    del_thread(del_thread const &) = delete; 
    del_thread &operator=(del_thread const & dt) = delete; 

    del_thread(del_thread && other) = default; 
    del_thread &operator=(del_thread && dt) = default; 

class del_thread { 
private: 
    using future_t  = std::shared_future<void>; 
    using thread_t  = std::thread; 

    future_t _future; 
    thread_t _thread; 

public: 
    del_thread() = delete; 
    del_thread(del_thread const &) = delete; 
    del_thread &operator=(del_thread const & dt) = delete; 

    del_thread(del_thread && other) = default; 
    del_thread &operator=(del_thread && dt) = default; 

    template< class op_t > 
    del_thread(op_t && operation, future_t const & future): 
    _thread(
     [ 
     operation = std::forward<op_t>(operation), 
     _future = future 
     ]() 
     { 
     _future.wait(); 
     operation(); 
     } 
    ) 
    {} 

    ~del_thread() { 
    join(); 
    } 

    void join() { 
    if(_thread.joinable()) { 
     _thread.join(); 
    } 
    } 
}; 

Live example.

+0

Das scheint zu funktionieren! Danke vielmals! Leider kämpfe ich mit der Weiterleitung eines Parameterpakets in das Lambda! Ich werde die Antwort akzeptieren, sobald ich sie richtig bestätigen kann! :) – Hymir

Verwandte Themen