2009-09-19 14 views
7

Ich brauche eine Warteschlange für die Weitergabe von Nachrichten von einem Thread (A) zu einem anderen (B), aber ive nicht in der Lage, eine zu finden, die wirklich was ich will, da sie in der Regel erlauben Hinzufügen eines Elements zum Fehlschlagen, ein Fall, der in meiner Situation ziemlich fatal ist, da die Nachricht verarbeitet werden muss, und der Thread kann wirklich nicht aufhören und auf den Ersatzraum warten.Multithreaded Single-Reader Single-Writer-Fifo-Warteschlange

  • Nur fädeln A Artikel ergänzt und nur Thread B liest sie
  • Thread A darf nicht blockieren, aber Thread B nicht die Leistung entscheidend ist, so muss es
  • Hinzufügen von Elementen kann immer erfolgreich, so dass die Warteschlange kippe eine obere Größengrenze (weniger als auf dem System aus dem Speicher ausgeführt wird)
  • Wenn die Warteschlange leer ist, sollte Thread B warten, bis ein Artikel ist
+0

Welche Threading-Bibliothek verwenden Sie? Pthreads? –

+0

boost :: thread und einige Bits plattformspezifischen Codes hier und dort –

+0

Ihr Ziel könnte dazu führen, dass nicht genügend Arbeitsspeicher zur Verfügung steht, da Sie nicht zulassen, dass der Writer-Thread Elemente blockiert oder löscht. Wenn Sie also eine kritische Größenbeschränkung der Warteschlange erreichen, müssen Sie entscheiden, ob Sie Elemente löschen oder den Writer-Thread blockieren möchten. Andernfalls fallen Sie Elemente indirekt, weil Ihr Programm fehlschlägt :-) – mmmmmmmm

Antwort

7

Hier ist, zu verarbeiten, wie ein Schloss- schreiben freie Warteschlange in C++:

http://www.ddj.com/hpc-high-performance-computing/210604448

Aber wenn Sie sagen, „Thread A darf nicht blockieren“, sind Sie sicher, dass die Anforderung ist? Windows ist kein Echtzeitbetriebssystem (und bei normaler Verwendung auch kein Linux). Wenn Thread A in der Lage sein soll, den gesamten verfügbaren Systemspeicher zu verwenden, muss er Speicher reservieren (oder warten, während jemand anderes dies tut). Das Betriebssystem selbst kann keine Timing-Garantien bieten, die besser wären als die, die Sie hätten, wenn sowohl Leser als auch Schreiber eine In-Process-Sperre (d. H. Einen nicht geteilten Mutex) ergriffen hätten, um die Liste zu manipulieren. Und das Schlimmste, wenn man eine Nachricht hinzufügt, ist das OS, um Speicher zu bekommen.

Kurz gesagt, es gibt einen Grund, warum Warteschlangen, die Sie nicht mögen, eine feste Kapazität haben - es ist so, dass sie keinen Speicher in dem Thread mit der angeblich niedrigen Latenz zuweisen müssen.

So wird der Lock-Free-Code in der Regel weniger Block-y, aber aufgrund der Speicherzuweisung ist es nicht garantiert, und die Leistung mit einem Mutex sollte nicht so schäbig sein, wenn Sie eine wirklich große haben Stream von Ereignissen zu verarbeiten (wie Sie einen Netzwerktreiber schreiben und die Nachrichten sind eingehende Ethernet-Pakete).

Also, in Pseudo-Code, das erste, was ich würde versuchen würde:

Writer: 
    allocate message and fill it in 
    acquire lock 
     append node to intrusive list 
     signal condition variable 
    release lock 

Reader: 
    for(;;) 
     acquire lock 
      for(;;) 
       if there's a node 
        remove it 
        break 
       else 
        wait on condition variable 
       endif 
      endfor 
     release lock 
     process message 
     free message 
    endfor 

Nur wenn dies nicht akzeptabel Verzögerungen bei dem Writer-Thread würde ich sperren freie Codes gehe einzuführen beweist, (es sei denn, ich hatte zufällig schon eine passende Warteschlange.

+0

Auf einer niedrigeren Ebene könnte man eine Single-Linked-Liste mit dem Anhängen des Schreibprozesses und dem Lesen verarbeiten. Dies kann sperrfrei sein, wenn der Schreibprozess den NULL-Zeiger auf Nicht-NULL setzt und der Leseprozess Nicht-NULL auf NULL ändert. Ein kleiner privater Heap würde eine gute amortisierte Leistung für die Listenelemente bereitstellen. Der Autor mallocs und der Leser befreit. Wenn der Leser schlafen geht, könnte ein dritter Prozess C bereitgestellt werden, der spekulativ den privaten Heap vergrößert und die blockierende Natur der Zuweisung von Prozess A versteckt. –

+0

Ihr Beispiel wird Deadlock.Während der Leser auf den Zustand wartet, hält er die Sperre, was verhindert, dass der Schreiber die Verriegelung und Signalisierung erhält. Sie müssen die Sperre aufheben, bevor Sie auf die Zustandsvariable warten und unmittelbar danach erneut erfassen. –

+0

@Bobby: Sie sind falsch. Wenn auf eine Bedingungsvariable gewartet wird, wird die zugehörige Sperre während des Wartens aufgehoben und dann erneut angefordert, bevor sie von der Warteoperation zurückkehrt. Dies ist Teil dessen, was "Bedingungsvariable" bedeutet - wenn die von Ihnen verwendete API das nicht für Sie tut, dann ist es keine Zustandsvariable, sondern eher ein Semaphor. Und es ist wichtig, dass die API das tut, denn dann kann sich Ihr Code darauf verlassen, dass das Freigeben der Sperre und das Warten auf die Bedingung atomar erfolgt - das heißt, dass kein anderer Thread irgendetwas unter der Sperre tun kann, bevor Ihr Thread ist ein Kellner. –

0

Sie möchten vielleicht Ihre Anforderungen berücksichtigen - ist es wirklich so, dass A überhaupt keine Warteschlangenelemente verwerfen kann? Oder möchten Sie nicht, dass B zwei aufeinanderfolgende Elemente aus der Warteschlange zieht, bei denen es sich nicht um aufeinanderfolgende Elemente handelt, weil dies eine Folge von Ereignissen falsch darstellen würde?

Zum Beispiel, wenn dies eine Art von Datenerfassungssystem ist, würden Sie (verständlicherweise) keine Lücken in der Aufzeichnung wollen - aber ohne einen unbegrenzten Speicher ist die Realität, dass Sie irgendwo in einem Eckfall wahrscheinlich überrennen könnten Ihre Warteschlangenkapazität.

In diesem Fall muss eine Lösung ein spezielles Element enthalten, das in die Warteschlange gestellt werden kann. Dies stellt den Fall von A dar, in dem festgestellt wird, dass Elemente gelöscht werden müssen. Im Grunde behalten Sie ein zusätzliches Element, das die meiste Zeit Null ist. Jedes Mal, wenn A Elemente zur Warteschlange hinzufügt, wenn dieses zusätzliche Element nicht null ist, geht das ein. Wenn A erkennt, dass in der Warteschlange kein Platz ist, konfiguriert es dieses zusätzliche Element so, dass es heißt "Hey, die Warteschlange war voll". .

Auf diese Weise blockiert A nie, Sie können Elemente löschen, wenn das System sehr beschäftigt ist, aber Sie verlieren nicht die Tatsache aus den Augen, dass Elemente gelöscht wurden, denn sobald Warteschlangenplatz verfügbar wird, wird diese Markierung eingefügt um anzuzeigen, wo der Datenverlust aufgetreten ist. Prozess B macht dann alles, was er tun muss, wenn er herausfindet, dass er dieses Überlaufmarkierungselement aus der Warteschlange gezogen hat.

1

Visual Studio 2010 fügt 2 neue Bibliotheken hinzu, die dieses Szenario sehr gut unterstützen, die Asynchronous Agents Library und Parallel Pattern Library.

Die Agenten Bibliothek hat die Unterstützung oder asynchrone Message-Passing und enthält Nachrichtenblöcke für Nachrichten zu ‚Ziele‘ zu senden und für Nachrichten von ‚Quellen‘

Ein unbounded_buffer ist eine Template-Klasse empfängt, das bietet, was ich glaube, Sie suchen für:

#include <agents.h> 
#include <ppl.h> 
#include <iostream> 

using namespace ::Concurrency; 
using namespace ::std; 

int main() 
{ 
    //to hold our messages, the buffer is unbounded... 
    unbounded_buffer<int> buf1; 
    task_group tasks; 

    //thread 1 sends messages to the unbounded_buffer 
    //without blocking 
    tasks.run([&buf1](){ 
     for(int i = 0 ; i < 10000; ++i) 
     send(&buf1,i) 
    //signal exit 
    send(&buf1,-1); 
    }); 

    //thread 2 receives messages and blocks if there are none 

    tasks.run([&buf1](){ 
     int result; 
     while(result = receive(&buf1)!=-1) 
     { 
      cout << "I got a " << result << endl; 
     } 
    }); 

    //wait for the threads to end 
    tasks.wait(); 
} 
+2

Läuft das wirklich unter der Kategorie Linux? –

+0

FWIW, in Ihrer Empfangsschleife geben Sie immer "Ich habe eine 1" aus, weil das! = Vor dem = ausgewertet wird –

1
  • Warum STL nicht <list> oder <deque> mit einem Mutex ar verwenden hinzufügen/entfernen? Ist die thread-safety of STL nicht ausreichend?

  • Warum erstellen Sie nicht Ihre eigene (single/double) Linked-List-Knoten-Klasse, die einen Zeiger enthält, und haben die Elemente, die hinzugefügt/entfernt werden sollen, davon geerbt? Somit ist eine zusätzliche Zuweisung unnötig. Sie sind nur ein paar Zeiger in threadA::add() und threadB::remove() und Sie sind fertig. (Während Sie wollen würde tun, dass unter einem Mutex, die blockierende Wirkung auf ThreadA vernachlässigbar sein würde, wenn Sie etwas wirklich falsch gemacht haben ...)

  • Wenn Sie pThreads verwenden, überprüfen sem_post() und sem_wait(). Die Idee ist, dass threadB unbegrenzt über sem_wait() blockieren kann, bis threadA etwas in die Warteschlange stellt. Dann ruft threadA sem_post() auf. Das weckt threadB, um es zu machen. Danach kann threadB wieder schlafen gehen. Es ist eine effiziente Möglichkeit, asynchrone Signalisierung zu behandeln, die Dinge wie multiple threadA::add() unterstützt, bevor threadB::remove() abgeschlossen wird.

Verwandte Themen