8

ich für einen Ringpuffer Implementierung suchen (oder Pseudo-Code) in C mit den folgenden Eigenschaften:für die rechte Ringpuffer Implementierung in C Suche

  • mehrere Hersteller einzelne Verbraucher Muster (MPSC)
  • Blöcke Verbraucher Block auf leere
  • Hersteller auf volle
  • Lock-frei (ich erwarte, dass hohe Konkurrenz)

Bisher ich Ich habe nur mit SPSC-Puffern gearbeitet - einen pro Producer - aber ich möchte vermeiden, dass der Consumer ständig nach neuen Daten über alle Eingabepuffer sucht (und vielleicht einige Marshalling-Threads in meinem System loswerden könnte).

Ich entwickle für Linux auf Intel-Maschinen.

+0

Ich weiß nicht, in welcher Umgebung Sie sich befinden, aber in Win32 können Sie WaitForMultipleObjects verwenden, damit der Konsument in allen Warteschlangen gleichzeitig wartet, ohne sich zu drehen. –

+1

Es tut mir leid, ich habe nicht erwähnt, dass ich hauptsächlich an Linux arbeite – ziu

+1

Nur für den Fall, dass Sie keine echte Antwort erhalten, wird es ein Kinderspiel, mehrere Puffer mit diesem zu synchronisieren: http://neosmart.net/ blog/2011/waitformultipleobjects-and-win32-events-for-linux-und-lesen-schreiben-locks-for-windows/ –

Antwort

2

Ich denke, ich habe was du suchst. Es ist eine Lock-freie Ringpuffer-Implementierung, die den Producer/Consumer blockiert. Sie benötigen nur Zugriff auf atomare Primitive - in diesem Beispiel verwende ich die sync-Funktionen von gcc.

Es hat einen bekannten Fehler - wenn Sie den Puffer um mehr als 100% überlaufen, ist es nicht garantiert, dass die Warteschlange FIFO bleibt (es wird immer noch alle sie schließlich verarbeiten).

Diese Implementierung basiert auf Lesen/als eine atomare Operation die Pufferelemente zu schreiben (was ziemlich viel für Zeiger garantiert)

struct ringBuffer 
{ 
    void** buffer; 
    uint64_t writePosition; 
    size_t size; 
    sem_t* semaphore; 
} 

//create the ring buffer 
struct ringBuffer* buf = calloc(1, sizeof(struct ringBuffer)); 
buf->buffer = calloc(bufferSize, sizeof(void*)); 
buf->size = bufferSize; 
buf->semaphore = malloc(sizeof(sem_t)); 
sem_init(buf->semaphore, 0, 0); 

//producer 
void addToBuffer(void* newValue, struct ringBuffer* buf) 
{ 
    uint64_t writepos = __sync_fetch_and_add(&buf->writePosition, 1) % buf->size; 

    //spin lock until buffer space available 
    while(!__sync_bool_compare_and_swap(&(buf->buffer[writePosition]), NULL, newValue)); 
    sem_post(buf->semaphore); 
}  

//consumer 
void processBuffer(struct ringBuffer* buf) 
{ 
    uint64_t readPos = 0; 
    while(1) 
    { 
     sem_wait(buf->semaphore); 

     //process buf->buffer[readPos % buf->size] 
     buf->buffer[readPos % buf->size] = NULL; 
     readPos++; 
    } 
} 
+1

Dieses Beispiel ist interessant. Lassen Sie mich sehen, ob ich gut verstanden habe: Das Indexinkrement und das Schreiben sind sperrfrei, während Sie eine Sperre in Form eines Semaphors verwenden, um den Verbraucher zu blockieren, wenn nichts zu konsumieren ist. Ich verstehe nicht, wie es möglich ist, diesen Puffer zu überlaufen. Haben Sie diese Struktur in einem System verwendet, in dem Sie eine sehr kurze Spinnzeit erwartet haben? Wie war der Einfluss der Spinning Loop? – ziu

+0

Sie können den Puffer überlaufen lassen, indem Sie Daten schneller schreiben, als sie verarbeitet werden können. Irgendwann wird sich der Schreibindex durchsetzen und den Leser "passieren". An diesem Punkt muss der Schreiber in einer Drehsperre warten, während er darauf wartet, dass der Leser aufholt (andernfalls würde er Daten im Puffer überschreiben). Der Fehler tritt auf, wenn Sie die Warteschlange um mehr als 100% überschreiten. In diesem Szenario warten mehr als 1 Thread in einem Spinlock, damit der Puffer verfügbar wird. Es ist nicht garantiert, welcher der Threads zuerst in die Warteschlange schreiben wird. – charliehorse55

+2

Wäre es nicht einfacher, die obige Schleife wie folgt neu zu schreiben? 'while (1) { während (__ sync_bool_compare_and_swap (& (buf-> Puffer [writePosition]), NULL, newValue) == false); sem_post (buf-> Semaphor); Pause; } ' – ziu

4

liblfds See, ein Schloss frei MPMC Ringpuffer. Es blockiert überhaupt nicht — lock-free Datenstrukturen neigen dazu, dies zu tun, weil der Punkt der Sperre frei ist, um zu blockieren; Sie müssen damit umgehen, wenn die Datenstruktur mit einer NULLNULL zurückkommt, wenn Sie versuchen, auf leer lesen, aber nicht Ihre Anforderung beim Schreiben auf voll entspricht; hier wird es das älteste Element wegwerfen und dir das für dein Schreiben geben.

Es würde jedoch nur eine kleine Änderung erforderlich sein, um dieses Verhalten zu erhalten.

Aber es könnte eine bessere Lösung geben. Der schwierige Teil eines Ringpuffers besteht darin, das älteste vorherige Element vollständig zu laden und dieses wiederzuverwenden. Du brauchst das nicht. Ich denke, man könnte die SPSC-Speicherbarriere nur als Ringpuffer nehmen und mit atomaren Operationen umschreiben. Das wird ein Los leistungsfähiger, dass der MPMC Ringpuffer in liblfds (die eine Kombination aus einer Warteschlange und einem Stapel ist).

+0

Bis jetzt ist meine SPSC-Implementierung ziemlich trivial: Sie verwendet nur lokale und globale Positionszähler, um den Schreiber und den Leser zu synchronisieren (die lokalen Zähler sind dazu da, den Push/Pull von Elementen stapelweise zu verarbeiten und falsches Teilen zu reduzieren). Zustandsvariablen sind vorhanden, um das Drehen zu reduzieren (wenn keine Daten verfügbar sind, gibt es nichts anderes zu tun/wenn das Ziel voll ist, ist Gegendruck unvermeidlich). Ohne geeignete Speicherbarrieren funktioniert meine Implementierung nicht in einer anderen Architektur. Könnten Sie bitte etwas über Ihren letzten Punkt erzählen? Am Ende wird der Ring Buffer immer SPSC, oder? – ziu

+1

Es gibt einen bekannten SPSC-Ringpuffer, der zum Beispiel im Linux-Kernel verwendet wird, der nur Speicherbarrieren verwendet, NULL zurückgibt, wenn der Puffer voll oder leer ist.Ich vermute, es könnte MPMC mit atomaren Operationen gemacht werden. –

Verwandte Themen