2016-07-03 6 views
1

Ich schrieb einen Beispielcode, um parallele Instanzen von for_each Ich bin nicht in der Lage, die Threads, in der folgenden Code beizutreten. Ich bin etwas zu früh zur gleichzeitigen Programmierung, also bin ich nicht sicher, ob ich alles richtig gemacht habe.Thread kann nicht für force_each parallel C++

template <typename Iterator, typename F> 
class for_each_block 
{ 
public : 
     void operator()(Iterator start, Iterator end, F f) { 
      cout << this_thread::get_id << endl; 
      this_thread::sleep_for(chrono::seconds(5)); 
      for_each(start, end, [&](auto& x) { f(x); }); 
    } 
}; 

typedef unsigned const long int ucli; 

template <typename Iterator, typename F> 
void for_each_par(Iterator first, Iterator last, F f) 
{ 
    ucli size = distance(first, last); 
    if (!size) 
     return; 
    ucli min_per_thread = 4; 
    ucli max_threads = (size + min_per_thread - 1)/min_per_thread; 
    ucli hardware_threads = thread::hardware_concurrency(); 

    ucli no_of_threads = min(max_threads, hardware_threads != 0 ? hardware_threads : 4); 

    ucli block_size = size/no_of_threads; 

    vector<thread> vf(no_of_threads); 
    Iterator block_start = first; 
    for (int i = 0; i < (no_of_threads - 1); i++) 
    { 
     Iterator end = first; 
     advance(end, block_size); 
     vf.push_back(std::move(thread(for_each_block<Iterator, F>(),first,end,f))); 
     first = end; 
    } 
    vf.push_back(std::move(thread(for_each_block<Iterator, F>(), first, last, f))); 
    cout << endl; 
    cout << vf.size() << endl; 
    for(auto& x: vf) 
    { 
     if (x.joinable()) 
      x.join(); 
     else 
      cout << "threads not joinable " << endl; 
    } 

    this_thread::sleep_for(chrono::seconds(100)); 
} 

int main() 
{ 
    vector<int> v1 = { 1,8,12,5,4,9,20,30,40,50,10,21,34,33 }; 
    for_each_par(v1.begin(), v1.end(), print_type<int>); 
return 0; 
} 

Im obigen Code bekomme ich Threads nicht zusammenführbar. Ich habe es auch mit asynchronen Futures versucht, trotzdem bekomme ich das selbe. Fehle ich hier etwas? Danke

Jede Hilfe ist sehr geschätzt, im Voraus ..

Antwort

4
vector<thread> vf(no_of_threads); 

Dies erzeugt einen Vektor mit no_of_threads default-initialisiert Threads. Da sie standardmäßig initialisiert werden, kann keiner von ihnen verbunden werden. Sie wahrscheinlich zu tun bedeutete:

vector<thread> vf; 
vf.reserve(no_of_threads); 

P. S .: std::move auf Zeit ist überflüssig :); betrachten dies zu ändern:

vf.push_back(std::move(thread(for_each_block<Iterator, F>(), first, last, f))); 

dazu:

vf.emplace_back(for_each_block<Iterator, F>(), first, last, f); 
+0

Vielen Dank, es funktioniert jetzt. Aber ich habe ein Problem. Alle Threads drucken dieselbe ID. Kannst du mir bitte auch mehr vorschlagen (ich bin mir sicher, dass ich etwas falsch gemacht habe). Auch ich hatte immer Verwirrung für push_back und emplace_back Danke für die Klärung :) –

+1

@KartikV 'this_thread :: get_id' ist eine Funktion, und es sollte aufgerufen werden. Jetzt drucken Sie den Wert des Funktionszeigers. –

+0

@Ocelot so wahr, ich fühle mich so dumm. ihr beide habt meine Augen geöffnet. Ich sollte nah und klar schauen, um dumme Fehler zu vermeiden. Danke Jungs. –

1

Dies kann oder auch nicht interessant sein. Ich habe versucht, den Code umzuformulieren, um einen etwas idiomatischen Ansatz zu verwenden. Ich sage nicht, dass Ihre Herangehensweise falsch ist, aber da Sie Thread-Management lernen, dachte ich, dass Sie daran interessiert sein könnten, was sonst noch möglich ist.

Fühlen Sie sich frei, Flamme/Frage als geeignet. Kommentare inline:

#include <vector> 
#include <chrono> 
#include <thread> 
#include <mutex> 
#include <iomanip> 
#include <future> 

using namespace std; 

// 
// provide a means of serialising writing to a stream. 
// 
struct locker 
{ 
    locker() : _lock(mutex()) {} 

    static std::mutex& mutex() { static std::mutex m; return m; } 
    std::unique_lock<std::mutex> _lock; 
}; 
std::ostream& operator<<(std::ostream& os, const locker& l) { 
    return os; 
} 

// 
// fill in the missing work function 
// 
template<class T> 
void print_type(const T& t) { 
    std::cout << locker() << hex << std::this_thread::get_id() << " : " << dec << t << std::endl; 
} 

// put this in your personable library. 
// the standards committee really should have given us ranges by now... 
template<class I1, class I2> 
struct range_impl 
{ 
    range_impl(I1 i1, I2 i2) : _begin(i1), _end(i2) {}; 

    auto begin() const { return _begin; } 
    auto end() const { return _end; } 

    I1 _begin; 
    I2 _end; 
}; 

// distinct types because sometimes dissimilar iterators are comparable 
template<class I1, class I2> 
auto range(I1 i1, I2 i2) { 
    return range_impl<I1, I2>(i1, i2); 
} 

// 
// lets make a helper function so we can auto-deduce template args 
// 
template<class Iterator, typename F> 
auto make_for_each_block(Iterator start, Iterator end, F&& f) 
{ 
    // a lambda gives all the advantages of a function object with none 
    // of the boilerplate. 
    return [start, end, f = std::move(f)] { 
     cout << locker() << this_thread::get_id() << endl; 
     this_thread::sleep_for(chrono::seconds(1)); 

     // let's keep loops simple. for_each is a bit old-skool. 
     for (auto& x : range(start, end)) { 
      f(x); 
     } 
    }; 
} 


template <typename Iterator, typename F> 
void for_each_par(Iterator first, Iterator last, F f) 
{ 
    if(auto size = distance(first, last)) 
    { 
     std::size_t min_per_thread = 4; 
     std::size_t max_threads = (size + min_per_thread - 1)/min_per_thread; 
     std::size_t hardware_threads = thread::hardware_concurrency(); 

     auto no_of_threads = min(max_threads, hardware_threads != 0 ? hardware_threads : 4); 

     auto block_size = size/no_of_threads; 

     // futures give us two benefits: 
     // 1. they automatically transmit exceptions 
     // 2. no need for if(joinable) join. get is sufficient 
     // 
     vector<future<void>> vf; 
     vf.reserve(no_of_threads - 1); 
     for (auto count = no_of_threads ; --count ;) 
     { 
      // 
      // I was thinking of refactoring this into std::generate_n but actually 
      // it was less readable. 
      // 
      auto end = std::next(first, block_size); 
      vf.push_back(async(launch::async, make_for_each_block(first, end, f))); 
      first = end; 
     } 
     cout << locker() << endl << "threads: " << vf.size() << " (+ main thread)" << endl; 

     // 
     // why spawn a thread for the remaining block? we may as well use this thread 
     // 
     /* auto partial_sum = */ make_for_each_block(first, last, f)(); 

     // join the threads 
     // note that if the blocks returned a partial aggregate, we could combine them 
     // here by using the values in the futures. 
     for (auto& f : vf) f.get(); 
    } 
} 

int main() 
{ 
    vector<int> v1 = { 1,8,12,5,4,9,20,30,40,50,10,21,34,33 }; 
    for_each_par(v1.begin(), v1.end(), print_type<int>); 
    return 0; 
} 

Beispielausgabe:

0x700000081000 
0x700000104000 

threads: 3 (+ main thread) 
0x700000187000 
0x100086000 
0x700000081000 : 1 
0x700000104000 : 5 
0x700000187000 : 20 
0x100086000 : 50 
0x700000081000 : 8 
0x700000104000 : 4 
0x700000187000 : 30 
0x100086000 : 10 
0x700000081000 : 12 
0x700000104000 : 9 
0x700000187000 : 40 
0x100086000 : 21 
0x100086000 : 34 
0x100086000 : 33 
Program ended with exit code: 0 

bitte std :: erklären hier bewegen: [start, end, f = std::move(f)] {...};

Dies ist eine allgemeine Sprache-Funktion, die in C + zur Verfügung gestellt wurde +14. f = std::move(f) innerhalb des Erfassungsblocks entspricht: decltype(f) new_f = std::move(f), mit der Ausnahme, dass die neue Variable f heißt und nicht new_f. Es erlaubt std::move Objekte in Lambdas anstatt sie zu kopieren.

Für die meisten Funktionsobjekte spielt es keine Rolle - aber einige können groß sein und dies gibt dem Compiler die Möglichkeit, eine Verschiebung statt einer Kopie zu verwenden, falls diese verfügbar ist.

+0

Es ist sehr ordentlich, Schließfach, Reichweite sind wirklich cool. Können Sie bitte die Bewegungssemantik erklären, die Sie für das Funktionsargument verwendet haben? –

+0

@KartikV aktualisiert. Ich hoffe, das hilft. –

+0

Vielen Dank. Deine Helferfunktionen sind wirklich nützlich, kannst du mich auf irgendeinen Blog mit mehr solchen hinweisen? (Oder vielleicht kannst du einen schreiben, wenn du Zeit bekommst) –