2010-01-16 8 views
10

Ich habe std::list<Info> infoList in meiner Anwendung, die zwischen zwei Threads geteilt wird. Diese zwei Threads den Zugriff auf diese Liste wie folgt:Was ist der beste Weg, Container-Zugriff zwischen mehreren Threads in Echtzeit-Anwendung zu synchronisieren

Thread 1: uses push_back(), pop_front() oder clear() auf der Liste (je nach Situation) Thread 2: verwendet einen iterator durch die Elemente in der iterieren Liste und mache einige Aktionen.

Thread 2 iteriert die Liste wie folgt aus:

for(std::list<Info>::iterator i = infoList.begin(); i != infoList.end(); ++i) 
{ 
    DoAction(i); 
} 

Der Code kompiliert mit GCC 4.4.2.

Manchmal ++ i verursacht einen segfault und stürzt die Anwendung ab. Der Fehler wird in std_list.h Linie 143 auf der folgenden Zeile verursacht:

_M_node = _M_node->_M_next; 

Ich denke, dies ist ein Rennzustand ist. Die Liste wurde möglicherweise von Thread 1 geändert oder sogar gelöscht, während Thread 2 sie wiederholte.

Ich benutzte Mutex, um den Zugriff auf diese Liste zu synchronisieren, und alles ging während meines ersten Tests in Ordnung. Aber das System friert nur im Stresstest ein und macht diese Lösung völlig inakzeptabel. Diese Anwendung ist eine Echtzeitanwendung und ich muss eine Lösung finden, damit beide Threads so schnell wie möglich ausgeführt werden können, ohne den Gesamtdurchsatz der Anwendungen zu beeinträchtigen.

Meine Frage ist dies: Thread 1 und Thread 2 müssen so schnell wie möglich ausgeführt werden, da dies eine Echtzeitanwendung ist. Was kann ich tun, um dieses Problem zu vermeiden und dennoch die Anwendungsleistung aufrechtzuerhalten? Gibt es für ein solches Problem einen Lock-Free-Algorithmus?

Es ist in Ordnung, wenn ich einige neu hinzugefügte Info Objekte in der Iteration von Thread 2 vermisse, aber was kann ich tun, um zu verhindern, dass der Iterator zu einem dangling pointer wird?

Dank

+1

Sutters Artikel über das Schreiben einer sperrfreien Warteschlange könnte helfen: http://www.ddj.com/cpp/210604448 –

+0

http://StackOverflow.com/Questions/1724630/How-Doi-Build- a-lockless-queue –

+0

Warum können Sie keine Warteschlange anstelle einer Liste verwenden? Wie wird Ihr Consumer-Thread über neue Info-Objekte benachrichtigt? –

Antwort

4

Im Allgemeinen ist es nicht sicher, STL-Container auf diese Weise zu verwenden. Sie müssen eine bestimmte Methode implementieren, um Ihren Code-Thread sicher zu machen. Die von Ihnen gewählte Lösung hängt von Ihren Anforderungen ab. Ich würde das wahrscheinlich lösen, indem ich zwei Listen führe, eine in jedem Thread. Und die Änderungen durch eine lock free queue kommunizieren (in den Kommentaren zu dieser Frage erwähnt). Sie können auch die Lebensdauer Ihrer Info-Objekte einschränken, indem Sie sie in boost :: shared_ptr z.

typedef boost::shared_ptr<Info> InfoReference; 
typedef std::list<InfoReference> InfoList; 

enum CommandValue 
{ 
    Insert, 
    Delete 
} 

struct Command 
{ 
    CommandValue operation; 
    InfoReference reference; 
} 

typedef LockFreeQueue<Command> CommandQueue; 

class Thread1 
{ 
    Thread1(CommandQueue queue) : m_commands(queue) {} 
    void run() 
    { 
     while (!finished) 
     { 
      //Process Items and use 
      // deleteInfo() or addInfo() 
     }; 

    } 

    void deleteInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Delete; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 

    void addInfo(InfoReference reference) 
    { 
     Command command; 
     command.operation = Insert; 
     command.reference = reference; 
     m_commands.produce(command); 
    } 
} 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 

class Thread2 
{ 
    Thread2(CommandQueue queue) : m_commands(queue) {} 

    void run() 
    { 
     while(!finished) 
     { 
      processQueue(); 
      processList(); 
     } 
    } 

    void processQueue() 
    { 
     Command command; 
     while (m_commands.consume(command)) 
     { 
      switch(command.operation) 
      { 
       case Insert: 
        m_infoList.push_back(command.reference); 
        break; 
       case Delete: 
        m_infoList.remove(command.reference); 
        break; 
      } 
     } 
    } 

    void processList() 
    { 
     // Iterate over m_infoList 
    } 

private: 
    CommandQueue& m_commands; 
    InfoList m_infoList; 
} 


void main() 
{ 
CommandQueue commands; 

Thread1 thread1(commands); 
Thread2 thread2(commands); 

thread1.start(); 
thread2.start(); 

waitforTermination(); 

} 

Dies wurde nicht erstellt. Sie müssen weiterhin sicherstellen, dass der Zugriff auf Ihre Info Objekte Thread-sicher ist.

1

Ihre Iterator Um zu verhindern, für ungültig erklärt wird, haben Sie die ganze for Schleife zu sperren. Jetzt denke ich, der erste Thread könnte Schwierigkeiten haben, die Liste zu aktualisieren. Ich würde versuchen, ihm eine Chance zu geben, seine Arbeit bei jeder (oder jeder N-ten Iteration) zu machen.

In Pseudo-Code, der wie folgt aussehen würde:

mutex_lock(); 
for(...){ 
    doAction(); 
    mutex_unlock(); 
    thread_yield(); // give first thread a chance 
    mutex_lock(); 
    if(iterator_invalidated_flag) // set by first thread 
    reset_iterator(); 
} 
mutex_unlock(); 
+0

Muss ich den Iterator jedes Mal an den Anfang der Liste zurücksetzen reset_iterator(); ? –

+0

@O. Askari: hängt von der Operation ab: 'clear' - natürlich ja,' push_back' - keine Notwendigkeit zurückzusetzen. Wie für 'pop_front' - müssen Sie möglicherweise zurücksetzen (z. B. wenn Sie am Anfang der Liste stehen). In diesem Fall müssen Sie den Iterator zwischen Threads freigeben, wenn Sie unnötige Rücksetzungen vermeiden möchten. – catwalk

-1

ich Sie können ohne Synchronisation überhaupt in diesem Fall bestimmte Operation nicht denken weg wird die Iteratoren ungültig machen Sie verwenden. Bei einer Liste ist dies ziemlich begrenzt (im Grunde, wenn beide Threads versuchen, Iteratoren gleichzeitig mit demselben Element zu bearbeiten), aber es besteht immer noch die Gefahr, dass Sie ein Element zur selben Zeit entfernen, zu der Sie es versuchen einen anhängen.

Sind Sie zufällig das Schloss über DoAction(i) halten? Sie möchten das Schloss natürlich nur so lange wie möglich halten, um die Leistung zu maximieren. Aus dem obigen Code, denke ich, wirst du die Schleife etwas zerlegen wollen, um beide Seiten der Operation zu beschleunigen.

Etwas entlang der Linien von:

while (processItems) { 
    Info item; 
    lock(mutex); 
    if (!infoList.empty()) { 
    item = infoList.front(); 
    infoList.pop_front(); 
    } 
    unlock(mutex); 
    DoAction(item); 
    delayALittle(); 
} 

und die Insert-Funktion würde immer noch so aussehen müssen:

lock(mutex); 
infoList.push_back(item); 
unlock(mutex); 

Es sei denn, die Warteschlange zu massiven wahrscheinlich ist, würde ich versucht sein, etwas wie ein std::vector<Info> oder sogar ein std::vector<boost::shared_ptr<Info> > zu verwenden, um das Kopieren der Info-Objekte zu minimieren (vorausgesetzt, dass diese im Vergleich zu einem boost :: shared_ptr etwas teurer zu kopieren sind. Im Allgemeinen tendieren Operationen auf einem Vektor etwas schneller als auf eine Liste, besonders wenn die Objekte, die in dem Vektor gespeichert sind, klein und billig zu kopieren sind.

+0

Danke, diese Lösung sieht vielversprechend aus. Die Liste enthält nicht viele Elemente, maximal 20, und jedes Informationselement belegt etwa 128 Byte. Was aber, wenn ich verarbeitete Elemente in der InfoList behalten und nicht entfernen muss? –

1

Sie müssen entscheiden, welcher Thread der wichtigere ist. Wenn es sich um den Update-Thread handelt, muss er dem Iterator-Thread signalisieren, dass er anhalten, warten und erneut starten soll. Wenn es der Iterator-Thread ist, kann er die Liste einfach sperren, bis die Iteration abgeschlossen ist.

5

Ihre for() - Schleife kann möglicherweise eine Sperre für eine relativ lange Zeit beibehalten, abhängig davon, wie viele Elemente sie iteriert. Sie können echte Probleme bekommen, wenn sie die Warteschlange "abfragt" und ständig prüft, ob ein neues Element verfügbar ist. Das führt dazu, dass der Thread den Mutex für eine unvernünftig lange Zeit besitzt, was dem Produzenten-Thread wenig Möglichkeiten gibt, einzubrechen und ein Element hinzuzufügen. Und verbrennen viele unnötige CPU-Zyklen in dem Prozess.

Sie benötigen eine "begrenzte blockierende Warteschlange". Schreib es nicht selbst, das Schlossdesign ist nicht trivial. Schwer zu finden gute Beispiele, das meiste davon ist .NET-Code. This article sieht vielversprechend aus.

0

Sie müssen eine Threading-Bibliothek verwenden. Wenn Sie Intel TBB verwenden, können Sie concurrent_vector oder concurrent_queue verwenden. Siehe this.

3

Ich würde gerne wissen, was der Zweck dieser Liste ist, wäre es einfacher, die Frage zu beantworten.

Wie Hoare sagte, ist es in der Regel eine schlechte Idee zu versuchen, Daten für die Kommunikation zwischen zwei Threads zu teilen, eher sollten Sie kommunizieren, um Daten zu teilen: dh Messaging.

Wenn diese Liste zum Beispiel eine Warteschlange modelliert, können Sie einfach eine der verschiedenen Möglichkeiten nutzen, um zwischen den beiden Threads zu kommunizieren (z. B. Sockets). Consumer/Producer ist ein bekanntes und bekanntes Problem.

Wenn Ihre Artikel teuer sind und Sie die Zeiger nur während der Kommunikation weitergeben, vermeiden Sie es, die Artikel selbst zu kopieren.

Im Allgemeinen ist es extrem schwierig, Daten zu teilen, obwohl es leider die einzige Art der Programmierung ist, von der wir in der Schule hören.Normalerweise sollte sich nur die Low-Level-Implementierung von "Kommunikationskanälen" um Synchronisation kümmern und Sie sollten lernen, die Kanäle zu verwenden, um zu kommunizieren, anstatt zu versuchen, sie zu emulieren.

+0

+1 - nur für die Hoare Zitat wenn nichts anderes. Multithread-Programmierung ist nicht die einzige Antwort auf alle Probleme. –

0

Wenn Sie weiterhin std::list in einer Multithread-Umgebung verwenden möchten, würde ich empfehlen, es in eine Klasse mit einem Mutex zu verpacken, der gesperrten Zugriff darauf bietet. Abhängig von der genauen Verwendung, könnte es sinnvoll sein, auf ein ereignisgesteuerte Warteschlange Modell zu wechseln, in den Nachrichten in einer Warteschlange übergeben werden, die mehr Worker-Threads verbrauchen (Hinweis: Erzeuger-Verbraucher).

Ich würde ernsthaft Matthieu's thought in Betracht ziehen. Viele Probleme, die mit Multithread-Programmierung gelöst werden, lassen sich besser mit Message-Passing zwischen Threads oder Prozessen lösen. Wenn Sie einen hohen Durchsatz benötigen und nicht unbedingt erforderlich, dass die Verarbeitung die gleiche Speicherplatz, sollten Sie so etwas wie die Message-Passing Interface (MPI) anstelle Ihrer eigenen Multi-Threaded-Lösung von Rollen. Es gibt eine Reihe von C++ Implementierungen verfügbar - OpenMPI, Boost.MPI, Microsoft MPI, etc. etc.

1

Wie Sie erwähnt, dass Sie sich nicht, wenn Ihr Iterieren Verbraucher einige neu hinzugefügten Einträge vermisst, Sie Urheber verwenden könnten on-write Liste darunter. Das ermöglicht es der Iterieren Verbraucher auf einer konsistenten Snapshot der Liste für den Betrieb ab, wenn es zum ersten Mal gestartet, während in anderen Threads, Aktualisierungen der Liste Ausbeute frisch, aber konsistente Kopien, ohne eine des vorhandenen Snapshots zu stören.

Der Handel hier besteht darin, dass jede Aktualisierung der Liste lange genug für den exklusiven Zugriff gesperrt werden muss, um die gesamte Liste zu kopieren. Diese Technik ist darauf ausgerichtet, viele gleichzeitige Leser und weniger häufige Aktualisierungen zu haben.

Wenn Sie versuchen, dem Container zuerst eine intrinsische Sperre hinzuzufügen, müssen Sie zuerst darüber nachdenken, welche Operationen sich in atomaren Gruppen verhalten müssen. Zum Beispiel überprüft, ob die Liste leer ist, bevor Sie das erste Element erfordert einen atomaren Pop-if-nicht-leer Betrieb abspringt; Andernfalls kann sich die Antwort auf die leere Liste ändern, wenn der Anrufer die Antwort erhält und versucht, darauf zu reagieren.

Es ist nicht klar, in Ihrem Beispiel über das, was garantiert die Iteration gehorchen muss. Muss in der Liste jedes Element schließlich vom Iterieren Thread besucht werden? Macht es mehrere Durchgänge? Was bedeutet es für einen Thread, ein Element aus der Liste zu entfernen, während ein anderer Thread DoAction() dagegen ausführt? Ich vermute, dass die Bearbeitung dieser Fragen zu erheblichen Designänderungen führen wird.


Sie arbeiten in C++, und Sie haben erwähnt, eine Warteschlange mit einem Pop-if-nicht-leer Betrieb benötigen. Ich schrieb eine two-lock queue vor vielen Jahren mit den ACE Library Parallelität Primitiven, wie die Boost thread library war noch nicht bereit für die Produktion zu verwenden, und die Chance für die C++ Standard-Bibliothek einschließlich solcher Einrichtungen war ein ferner Traum. Etwas moderner zu portieren wäre einfach.

Diese Warteschlange von mir - genannt concurrent::two_lock_queue - ermöglicht den Zugriff auf den Kopf der Warteschlange nur über RAII. Dies stellt sicher, dass das Erfassen des Schlosses zum Lesen des Kopfes immer mit einer Freigabe des Schlosses verbunden wird. Ein Konsument konstruiert einen const_front (const-Zugriff auf Kopfelement), einen front (nicht-const-Zugriff auf Kopfelement) oder ein renewable_front-Objekt (nichtkonstanter Zugriff auf Kopf- und Nachfolgerelemente), um das exklusive Recht auf Zugriff auf das Kopfelement darzustellen der Warteschlange. Solche "vorderen" Objekte können nicht kopiert werden.

Klasse two_lock_queue bietet auch eine pop_front() Funktion, die bis mindestens ein Element wartet verfügbar entfernt werden, aber mit std::queue ‚s und std::stack‘ s-Art von nicht Containern Mutation und Wert Kopieren Mischen pop_front() kehrt leer zu halten.

In einer Companion-Datei gibt es einen Typ mit der Bezeichnung concurrent::unconditional_pop, mit dem durch RAII sichergestellt werden kann, dass das Kopfelement der Warteschlange beim Verlassen des aktuellen Bereichs gelöscht wird.

Die Companion-Datei error.hh definiert die Ausnahmen, die sich aus der Verwendung der Funktion two_lock_queue::interrupt() ergeben, die zum Entsperren von Threads verwendet wird, die auf den Zugriff auf den Kopf der Warteschlange warten.

Werfen Sie einen Blick auf den Code und lassen Sie mich wissen, wenn Sie weitere Erläuterungen benötigen, wie Sie es verwenden.

+0

Jedes Element in der Liste muss von DoAction() verarbeitet werden. Ein anderer Thread kann jedoch ein Element während seiner Ausführung löschen. Dies wird keinen Schaden anrichten, da es die Gültigkeit des Info-Objekts überprüfen kann. Die Liste kann nach der Schleife nicht mehr geleert werden, da sie woanders verwendet wird. Ich habe einen Snapshot für den Iterator verwendet, aber ich habe immer noch Probleme, wie man einen atomaren Pop-wenn-nicht-leer macht. Vielen Dank –

1

Der beste Weg, dies zu tun ist, einen Container zu verwenden, der intern synchronisiert ist. TBB und Microsofts concurrent_queue tun dies. Anthony Williams hat auch eine gute Umsetzung auf seinem Blog here

1

Andere bereits vorgeschlagen haben Lock-freie Alternativen, also werde ich antworten, als ob Sie Schlösser stecken mit ...

Wenn Sie eine Liste ändern , vorhandene Iteratoren können ungültig werden, weil sie nicht mehr auf gültigen Speicher zeigen (die Liste weist automatisch mehr Arbeitsspeicher zu, wenn sie wachsen muss). Um ungültige Iteratoren zu verhindern, können Sie den Erzeuger blockieren auf einen Mutex, während Ihr Verbraucher die Liste durchläuft, aber das wäre unnötig warten auf für den Hersteller. Das Leben wäre einfacher, wenn Sie anstelle einer Liste eine Warteschlange verwenden und Ihren Kunden einen synchronisierten queue<Info>::pop_front()-Aufruf anstelle von Iteratoren geben, die hinter Ihrem Rücken ungültig gemacht werden können.

Wenn Ihr Verbraucher wirklich Teile von Info auf einmal verschlingen muss, dann verwenden Sie eine condition variable, die Ihren Verbraucher blockieren bis queue.size() >= minimum.

Die Boost-Bibliothek verfügt über eine nette portable Implementierung von Bedingungsvariablen (die sogar mit älteren Windows-Versionen funktioniert), sowie die usual threading library stuff.

Für eine Producer-Consumer-Warteschlange, die (altmodische) Sperren verwendet, überprüfen Sie die Vorlagenklasse BlockingQueue der ZThreads-Bibliothek. Ich habe ZThreads nicht selbst benutzt, ich war besorgt wegen des Mangels an aktuellen Updates und weil es anscheinend nicht weit verbreitet war. Jedoch habe ich es als Inspiration für das Rollen meiner eigenen Thread-sicheren Producer-Consumer-Warteschlange verwendet (bevor ich über lock-free Warteschlangen und TBB erfuhr).

Eine blockierungsfreie Queue/Stack-Bibliothek scheint sich in der Boost-Review-Queue zu befinden. Hoffen wir, dass wir in naher Zukunft ein neues Boost.Lockfree sehen werden! :)

Wenn Interesse besteht, kann ich ein Beispiel für eine blockierende Warteschlange schreiben, die std :: queue und Boost thread locking verwendet.

EDIT:

Der Blog von Rick Antwort verwiesen hat bereits eine blockierende Warteschlange Beispiel, das std :: Warteschlange und Boost condvars verwendet.Wenn Ihr Verbraucher Brocken verschlingen muss, können Sie das Beispiel wie folgt erweitern:

void wait_for_data(size_t how_many) 
    { 
     boost::mutex::scoped_lock lock(the_mutex); 
     while(the_queue.size() < how_many) 
     { 
      the_condition_variable.wait(lock); 
     } 
    } 

Sie auch zwicken möchten es Timeouts und Stornierungen zu ermöglichen.

Sie erwähnten, dass Geschwindigkeit ein Problem war. Wenn Ihre Info s Schwergewicht sind, sollten Sie überlegen, sie durch shared_ptr zu übergeben. Sie können auch versuchen, Ihre Info s feste Größe und verwenden Sie eine memory pool (die viel schneller als der Heap sein kann).

1

Wenn Sie C++ verwenden 0x Sie intern Liste Iteration diese Weise synchronisieren könnte:

Angenommen, die Klasse hat eine Templat-Liste namens objects_ und ein boost :: mutex mutex_

benannt das toall Verfahren ist Mitglied Verfahren der Liste Wrapper

void toAll(std::function<void (T*)> lambda) 
{ 
boost::mutex::scoped_lock(this->mutex_); 
for(auto it = this->objects_.begin(); it != this->objects_.end(); it++) 
{ 
     T* object = it->second; 
     if(object != nullptr) 
     { 
       lambda(object); 
      } 
     } 
} 

Berufung:

synchronizedList1->toAll(
     [&](T* object)->void // Or the class that your list holds 
     { 
      for(auto it = this->knownEntities->begin(); it != this->knownEntities->end(); it++) 
      { 
       // Do something 
      } 
     } 
); 
Verwandte Themen