2017-02-16 2 views
0

Ich fragte eine Frage here darüber, warum das Starten eines Prozesses mit Thread.Run nicht so viele gleichzeitige Anforderungen ausgeführt, wie ich erwartet hatte.Handle rabbitmq Nachrichten concurrenrtly

Der Grund hinter dieser Frage war, dass ich versuchte, eine Klasse zu erstellen, die Nachrichten aus einer Rabbitmq-Warteschlange ziehen und gleichzeitig bis zu einer maximalen Anzahl gleichzeitiger Nachrichten verarbeiten kann.

Um dies zu tun, endete ich mit der folgenden Received Handler der EventingBasicConsumer Klasse.

Die Kommentare auf den vorherigen Beitrag sollten jedoch nicht einen Thread starten, es sei denn CPU gebunden Arbeit.

Der obige Handler weiß nicht, ob die Arbeit CPU-gebunden, Netzwerk, Festplatte oder anderweitig ist. (Process ist eine abstrakte Methode).

Trotzdem denke ich, ich muss hier einen Thread oder eine Aufgabe starten, sonst blockiert die Process Methode den rabbitmq-Thread und der Event-Handler wird erst wieder aufgerufen, wenn er fertig ist. Ich kann also nur eine Methode gleichzeitig anwenden.

Starte eine neue Thread hier in Ordnung? Ursprünglich hatte ich Task.Run verwendet, aber dies produzierte nicht so viele Arbeiter wie gewünscht. Siehe anderen Beitrag.

FYI. Die Anzahl der gleichzeitigen Threads wird durch Festlegen der InitialCount auf dem Semaphor begrenzt.

Antwort

0

Wie bereits in verknüpften Frage gesagt, eine große Anzahl von Threads garantiert nicht die Leistung, als ob ihre Zahl mehr als die Anzahl der logischen Kerne bekommt, haben Sie eine thread starvation Situation ohne echte Arbeit getan.

Wenn Sie jedoch weiterhin die Anzahl der gleichzeitigen Vorgänge behandeln müssen, können Sie versuchen, die TPL Dataflow Bibliothek mit Einstellungen bis MaxDegreeOfParallelism, wie in this tutorial.

var workerBlock = new ActionBlock<EventArgs>(
    // Process event 
    e => Process(e), 
    // Specify a maximum degree of parallelism. 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = InitialCount 
    }); 
var bufferBlock = new BufferBlock(); 
// link the blocks for automatically propagading the messages 
bufferBlock.LinkTo(workerBlock); 

// asynchronously send the message 
await bufferBlock.SendAsync(...); 
// synchronously send the message 
bufferBlock.Post(...); 

BufferBlock ist eine Warteschlange, so dass die Reihenfolge der Nachrichten gewahrt werden.

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs); 
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs); 
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs); 

aber in diesem Fall, dass Sie Setup einen Standard-Handler am Ende der Kette sollen, so das: Sie können aber auch die verschiedenen Handler (mit einem anderen Grad der Parallelität) mit Verknüpfung der Blöcke mit Filter Lambda hinzufügen Nachricht verschwinden würde nicht (Sie NullTarget Baustein dafür verwenden können):

bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>); 

auch könnte der Block ein Beobachter sein, so dass sie perfekt mit Reactive Extensions auf UI-Seite arbeiten.

Verwandte Themen