2015-06-15 8 views
6

Ich versuche, ein einfaches Pool-Objekt zu erstellen, das ich möchte mehr oder weniger fairen Zugriff auf eine Reihe von freigegebenen Ressourcen für alle Threads zuordnen, die danach fragen. In Windows hätte ich normalerweise ein Array von Mutexes und mache ein WaitForMultipleObjects mit bWaitAll = FALSE (siehe windows_pool_of_n_t unten). Aber ich hoffe, dass ich eines Tages in der Lage sein werde, dies auf andere Betriebssysteme zu portieren, also würde ich gerne beim Standard bleiben. Eine Deque von Ressourcen mit einer condition_variable auf size()! = 0 schien die naheliegende Lösung zu sein (siehe pool_of_n_t unten).Warum macht std :: condition_variable die Planung unfair?

Aber aus Gründen, die ich nicht verstehe, serialisiert dieser Code den Thread-Zugriff. Ich erwarte keine strenge Fairness, aber das ist so ziemlich der schlimmste Fall - der Thread, der das letzte Mal das Schloss hatte, bekommt immer das Schloss das nächste Mal. Es ist nicht so, dass std :: mutex nicht der mehr oder weniger fairen Planung von Windows entspricht, da die Verwendung nur eines Mutex ohne die Zustandsvariable wie erwartet funktioniert, obwohl natürlich nur für einen Pool von eins, siehe unten pool_of_one_t.

Kann jemand das erklären? Gibt es einen Weg dahin?

die Ergebnisse:

C:\temp\stdpool>bin\stdpool.exe 
pool:pool_of_one_t 
thread 0:19826 ms 
thread 1:19846 ms 
thread 2:19866 ms 
thread 3:19886 ms 
thread 4:19906 ms 
thread 5:19926 ms 
thread 6:19946 ms 
thread 7:19965 ms 
thread 8:19985 ms 
thread 9:20004 ms 
pool:windows_pool_of_n_t(1) 
thread 0:19819 ms 
thread 1:19838 ms 
thread 2:19858 ms 
thread 3:19878 ms 
thread 4:19898 ms 
thread 5:19918 ms 
thread 6:19938 ms 
thread 7:19958 ms 
thread 8:19978 ms 
thread 9:19997 ms 
pool:pool_of_n_t(1) 
thread 9:3637 ms 
thread 0:4538 ms 
thread 6:7558 ms 
thread 4:9779 ms 
thread 8:9997 ms 
thread 2:13058 ms 
thread 1:13997 ms 
thread 3:17076 ms 
thread 5:17995 ms 
thread 7:19994 ms 
pool:windows_pool_of_n_t(2) 
thread 1:9919 ms 
thread 0:9919 ms 
thread 2:9939 ms 
thread 3:9939 ms 
thread 5:9958 ms 
thread 4:9959 ms 
thread 6:9978 ms 
thread 7:9978 ms 
thread 9:9997 ms 
thread 8:9997 ms 
pool:pool_of_n_t(2) 
thread 2:6019 ms 
thread 0:7882 ms 
thread 4:8102 ms 
thread 5:8182 ms 
thread 1:8382 ms 
thread 8:8742 ms 
thread 7:9162 ms 
thread 9:9641 ms 
thread 3:9802 ms 
thread 6:10201 ms 
pool:windows_pool_of_n_t(5) 
thread 4:3978 ms 
thread 3:3978 ms 
thread 2:3979 ms 
thread 0:3980 ms 
thread 1:3980 ms 
thread 9:3997 ms 
thread 7:3999 ms 
thread 6:3999 ms 
thread 5:4000 ms 
thread 8:4001 ms 
pool:pool_of_n_t(5) 
thread 2:3080 ms 
thread 0:3498 ms 
thread 8:3697 ms 
thread 3:3699 ms 
thread 6:3797 ms 
thread 7:3857 ms 
thread 1:3978 ms 
thread 4:4039 ms 
thread 9:4057 ms 
thread 5:4059 ms 

der Code:

#include <iostream> 
#include <deque> 
#include <vector> 
#include <mutex> 
#include <thread> 
#include <sstream> 
#include <chrono> 
#include <iomanip> 
#include <cassert> 
#include <condition_variable> 
#include <windows.h> 

using namespace std; 

class pool_t { 
    public: 
     virtual void check_in(int size) = 0; 
     virtual int check_out() = 0; 
     virtual string pool_name() = 0; 
}; 

class pool_of_one_t : public pool_t { 
    mutex lock; 

public: 
    virtual void check_in(int resource) { 
     lock.unlock(); 
    } 

    virtual int check_out() { 
     lock.lock(); 
     return 0; 
    } 

    virtual string pool_name() { 
     return "pool_of_one_t"; 
    } 

}; 


class windows_pool_of_n_t : public pool_t { 
    vector<HANDLE> resources; 

public: 
    windows_pool_of_n_t(int size) { 
     for (int i=0; i < size; ++i) 
      resources.push_back(CreateMutex(NULL, FALSE, NULL)); 
    } 

    ~windows_pool_of_n_t() { 
     for (auto resource : resources) 
      CloseHandle(resource); 
    } 

    virtual void check_in(int resource) { 
     ReleaseMutex(resources[resource]); 
    } 

    virtual int check_out() { 
     DWORD result = WaitForMultipleObjects(resources.size(), 
       resources.data(), FALSE, INFINITE); 
     assert(result >= WAIT_OBJECT_0 
       && result < WAIT_OBJECT_0+resources.size()); 

     return result - WAIT_OBJECT_0; 
    } 

    virtual string pool_name() { 
     ostringstream name; 
     name << "windows_pool_of_n_t(" << resources.size() << ")"; 
     return name.str(); 
    } 
}; 

class pool_of_n_t : public pool_t { 
    deque<int> resources; 
    mutex lock; 
    condition_variable not_empty; 

public: 
    pool_of_n_t(int size) { 
     for (int i=0; i < size; ++i) 
      check_in(i); 
    } 

    virtual void check_in(int resource) { 
     unique_lock<mutex> resources_guard(lock); 
     resources.push_back(resource); 
     resources_guard.unlock(); 
     not_empty.notify_one(); 
    } 

    virtual int check_out() { 
     unique_lock<mutex> resources_guard(lock); 
     not_empty.wait(resources_guard, 
       [this](){return resources.size() > 0;}); 
     auto resource = resources.front(); 
     resources.pop_front(); 
     bool notify_others = resources.size() > 0; 
     resources_guard.unlock(); 
     if (notify_others) 
      not_empty.notify_one(); 

     return resource; 
    } 

    virtual string pool_name() { 
     ostringstream name; 
     name << "pool_of_n_t(" << resources.size() << ")"; 
     return name.str(); 
    } 
}; 


void worker_thread(int id, pool_t& resource_pool) 
{ 
    auto start_time = chrono::system_clock::now(); 
    for (int i=0; i < 100; ++i) { 
     auto resource = resource_pool.check_out(); 
     this_thread::sleep_for(chrono::milliseconds(20)); 
     resource_pool.check_in(resource); 
     this_thread::yield(); 
    } 

    static mutex cout_lock; 
    { 
     unique_lock<mutex> cout_guard(cout_lock); 
     cout << "thread " << id << ":" 
      << chrono::duration_cast<chrono::milliseconds>(
        chrono::system_clock::now() - start_time).count() 
      << " ms" << endl; 
    } 
} 

void test_it(pool_t& resource_pool) 
{ 
    cout << "pool:" << resource_pool.pool_name() << endl; 
    vector<thread> threads; 
    for (int i=0; i < 10; ++i) 
     threads.push_back(thread(worker_thread, i, ref(resource_pool))); 
    for (auto& thread : threads) 
     thread.join(); 

} 

int main(int argc, char* argv[]) 
{ 
    test_it(pool_of_one_t()); 
    test_it(windows_pool_of_n_t(1)); 
    test_it(pool_of_n_t(1)); 
    test_it(windows_pool_of_n_t(2)); 
    test_it(pool_of_n_t(2)); 
    test_it(windows_pool_of_n_t(5)); 
    test_it(pool_of_n_t(5)); 

    return 0; 
} 
+0

Dies könnte eine schwierige Frage sein. Ich meine, da ist die einfache Antwort: 'condition_variable' macht keine solchen Garantien. Die harte Antwort ist genau herauszufinden, wie schlimm es ist, vorausgesetzt, Sie haben in Ihrem obigen Code keine offensichtlichen Oops gemacht. – Yakk

+0

Ich sehe keine offensichtlichen Ups. Ich vermute, dass dies auf etwas unterschiedliche Interaktionen zwischen this_thread :: yield() und den zwei verschiedenen Mutex-Wegen zurückzuführen ist. Unter Linux erwarte ich, dass dein Code fair terminiert. Beachten Sie, dass der Standard von 'yield 'als * nur eine Gelegenheit zur Umplanung spricht *, aber die Details sind OS-spezifisch. Es könnte interessant sein, als Experiment zu versuchen, 'this_thread :: yield();' durch 'this_thread :: sleep_for (chrono :: nanoseconds (1));' zu ersetzen. Dies würde den Prioritätsverlust des Threads in der Planungswarteschlange erzwingen und möglicherweise die Unterschiede in Windows beseitigen. –

+0

Weder sleep_for() noch Win32s :: Sleep() führen zu einer besseren Planung. Es scheint, als wäre Yaaks Kommentar die Antwort - der Standard verspricht das nicht und ich sollte nicht versuchen, mich darauf zu verlassen. –

Antwort

7

ich Ihren Test pool:pool_of_n_t(2) auf Linux haben und das Problem in

this_thread::yield(); 

Siehe Ergebnisse auf meinem comp für die sehen Testpool: pool_of_n_t (2):

1) this_thread :: yield():

$./a.out                  
pool:pool_of_n_t(2) 
thread 0, run for:2053 ms 
thread 9, run for:3721 ms 
thread 5, run for:4830 ms 
thread 6, run for:6854 ms 
thread 3, run for:8229 ms 
thread 4, run for:8353 ms 
thread 7, run for:9441 ms 
thread 2, run for:9482 ms 
thread 1, run for:10127 ms 
thread 8, run for:10426 ms 

Sie sind ähnlich wie bei Ihnen.

2) Und der gleiche Test, wenn ich this_thread::yield() mit pthread_yield() ersetzen:

$ ./a.out                
pool:pool_of_n_t(2) 
thread 0, run for:7922 ms 
thread 3, run for:8853 ms 
thread 4, run for:8854 ms 
thread 1, run for:9077 ms 
thread 5, run for:9364 ms 
thread 9, run for:9446 ms 
thread 7, run for:9594 ms 
thread 2, run for:9615 ms 
thread 8, run for:10170 ms 
thread 6, run for:10416 ms 

Es ist viel schöner. Sie nehmen an, dass this_thread :: yield() in der Tat CPU zu einem anderen Thread gibt, aber es gibt es nicht.

Dies ist demon für this_thread :: Ausbeute für gcc 4.8:

(gdb) disassembl this_thread::yield 
Dump of assembler code for function std::this_thread::yield(): 
    0x0000000000401fb2 <+0>: push %rbp 
    0x0000000000401fb3 <+1>: mov %rsp,%rbp 
    0x0000000000401fb6 <+4>: pop %rbp 
    0x0000000000401fb7 <+5>: retq 
End of assembler dump. 

Ich sehe keine Umplanung

Und das ist demon für pthread_yield:

(gdb) disassemble pthread_yield 
Dump of assembler code for function pthread_yield: 
    0x0000003149c084c0 <+0>: jmpq 0x3149c05448 <[email protected]> 
End of assembler dump. 
(gdb) disassemble sched_yield 
Dump of assembler code for function sched_yield: 
    0x00000031498cf520 <+0>: mov $0x18,%eax 
    0x00000031498cf525 <+5>: syscall 
    0x00000031498cf527 <+7>: cmp $0xfffffffffffff001,%rax 
    0x00000031498cf52d <+13>: jae 0x31498cf530 <sched_yield+16> 
    0x00000031498cf52f <+15>: retq 
    0x00000031498cf530 <+16>: mov 0x2bea71(%rip),%rcx  # 0x3149b8dfa8 
    0x00000031498cf537 <+23>: xor %edx,%edx 
    0x00000031498cf539 <+25>: sub %rax,%rdx 
    0x00000031498cf53c <+28>: mov %edx,%fs:(%rcx) 
    0x00000031498cf53f <+31>: or  $0xffffffffffffffff,%rax 
    0x00000031498cf543 <+35>: jmp 0x31498cf52f <sched_yield+15> 
End of assembler dump. 
+0

Ich habe den gleichen Test mit meinem pool_of_one_t in Linux versucht und das Mutex-Verhalten war ähnlich unfair. Ich denke, die Antwort hier ist, dass die C + + -Std-Concurrency-APIs der Aufgabe nicht gewachsen sind und ich dies mit plattformspezifischen APIs schreiben muss. –

+2

Pro [diese Antwort] (http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about/12961816#12961816), für GCC 4.8 müssen Sie '--enable-libstdcxx- Zeit, wenn GCC gebaut wird, um 'yield()' nicht ein No-Op zu haben. –

+0

@MarkW Falls Sie nicht benachrichtigt wurden, müssen Sie den obigen Kommentar von T.C. lesen. – Yakk

2

I don Ich denke, Zustandsvariable ist der Schuldige.

Sowohl der Linux als auch der Windows Thread Dispatcher gehen davon aus, dass das ideale Ziel ist, jedem Thread seinen gesamten Zeitabschnitt zu geben (dh gerecht zu sein). Sie tragen dies so weit, dass ein Thread davon ausgeht, bevor er konsumiert es ist die ganze Zeit, dass es in die Nähe der Schlange kommt [das ist eine grobe Vereinfachung], weil das "fair" ist.

Ich finde das sehr bedauerlich.Wenn Sie drei Threads haben, von denen einer arbeiten kann und die anderen beiden blockiert sind, warten beide, sowohl Windows- als auch Linux-Dispatcher, zwischen den blockierten Threads hin und her, bevor sie dem "richtigen" Thread eine Chance geben .

+0

Ich beschuldige die Zustandsvariable, weil die anderen zwei Pools - einer, der ausschließlich auf einen Mutex wartet, der andere (windowsspezifisch), der auf einen Pool von Mutexen wartet - beide ein faires (oder fair genug für meine Bedürfnisse) Verhalten haben. Nur der Pool mit der Bedingungsvariablen zeigt die serielle Thread-Planung an. –

Verwandte Themen