2010-12-16 12 views
6

Ich habe eine Erzeuger/Verbraucher-Warteschlange, außer dass es bestimmte Arten von Objekten gibt. So kann nicht jeder Verbraucher ein hinzugefügtes Objekt konsumieren. Ich möchte nicht für jeden Typ eine bestimmte Warteschlange erstellen, da es zu viele gibt. (Es dehnt die Definition von Erzeuger/Verbraucher aus, aber ich bin nicht sicher, was der richtige Begriff ist.)C# Produzent/Verbraucher/Beobachter?

Gibt es so etwas wie eine EventWaitHandle, die Impulse mit einem Parameter ermöglicht? z.B. myHandle.Set(AddedType = "foo"). Im Moment benutze ich Monitor.Wait und dann überprüft jeder Verbraucher, ob der Puls tatsächlich für sie bestimmt war, aber das scheint sinnlos.

A pseduocode Version von dem, was ich habe jetzt:

class MyWorker { 
    public string MyType {get; set;} 
    public static Dictionary<string, MyInfo> data; 

    public static void DoWork(){ 
     while(true){ 
      if(Monitor.Wait(data, timeout)){ 
        if (data.ContainsKey(MyType)){ 
         // OK, do work 
        } 
      } 
     } 
    } 
} 

Wie Sie sehen können, könnte ich Impulse erhalten, wenn andere Sachen zum dict hinzugefügt wird. Es ist mir egal, wenn MyType dem Diktat hinzugefügt wird. Gibt es eine Möglichkeit, das zu tun? Es ist keine große Sache, aber zum Beispiel muss ich jetzt Timeouts manuell behandeln, weil jeder Zugriff auf die Sperre innerhalb des Timeouts erfolgreich sein kann, aber MyType wird nie innerhalb von timeout dem Diktat hinzugefügt.

+0

Nur eine Idee, könnte jeder Objekttyp sein eigenes Sperrobjekt haben, so dass Sie das entsprechende Objekt "überwachen" können? –

+0

@deltreme: Ja, das ist möglich, aber ich möchte kein neues Objekt für jeden Typ erstellen. – Xodarap

+0

Ich bin mir nicht sicher, was Sie meinen, da es zu viele Typen gibt, um separate Warteschlangen für jeden zu erstellen. Wie das? Es ist der natürliche Weg, dies zu tun, und ich bin mir nicht sicher, wie viele separate Warteschlangen weniger effizient wären als jeder andere Mechanismus zur Handhabung von Ereignissen. –

Antwort

3

Dies ist eine interessante Frage. Es klingt wie der Schlüssel zur Lösung ist eine blockierende Variante eines priority queue. Java hat die PriorityBlockingQueue, aber leider ist das Äquivalent für die .NET BCL nicht existent. Sobald Sie einen haben, ist die Implementierung jedoch einfach.

class MyWorker 
{ 
    public string MyType {get; set;} 
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork() 
    { 
     while(true) 
     { 
      MyInfo value; 
      if (data.TryTake(MyType, timeout, out value)) 
      { 
       // OK, do work 
      } 
     } 
    } 
} 

Die Implementierung einer PriorityBlockingQueue ist nicht sehr schwierig. Nach dem gleichen Muster wie BlockingCollection unter Verwendung von Add und Take Stil Methoden kam ich mit dem folgenden Code.

public class PriorityBlockingQueue<TKey, TValue> 
{ 
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>(); 

    public void Add(TKey key, TValue value) 
    { 
     lock (m_Dictionary) 
     { 
      m_Dictionary.Add(key, value); 
      Monitor.Pulse(m_Dictionary); 
     } 
    } 

    public TValue Take(TKey key) 
    { 
     TValue value; 
     TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value); 
     return value; 
    } 

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value) 
    { 
     value = default(TValue); 
     DateTime initial = DateTime.UtcNow; 
     lock (m_Dictionary) 
     { 
      while (!m_Dictionary.TryGetValue(key, out value)) 
      { 
       if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important! 
       TimeSpan span = timeout - (DateTime.UtcNow - initial); 
       if (!Monitor.Wait(m_Dictionary, span)) 
       { 
        return false; 
       } 
      } 
      m_Dictionary.Remove(key); 
      return true; 
     } 
    } 
} 

Dies war eine schnelle Implementierung und es hat ein paar Probleme. Erstens habe ich es überhaupt nicht getestet. Zweitens verwendet es einen rot-schwarzen Baum (über SortedDictionary) als die zugrunde liegende Datenstruktur. Das bedeutet, dass die Methode TryTake die Komplexität O (log (n)) hat. Prioritätswarteschlangen haben typischerweise eine Komplexität zur Entfernung von O (1). Die typische Datenstruktur der Wahl für Prioritätswarteschlangen ist ein heap, aber ich finde, dass skip lists tatsächlich aus mehreren Gründen in der Praxis besser sind. Keine davon existiert in der .NET BCL, weshalb ich eine SortedDictionary stattdessen trotz seiner unterlegenen Leistung in diesem Szenario verwendet habe.

Ich sollte hier darauf hinweisen, dass dies das sinnlose Wait/Pulse Verhalten tatsächlich nicht löst. Es ist einfach in der Klasse PriorityBlockingQueue eingekapselt. Aber zumindest wird dies den Kern Ihres Codes bereinigen.

Es sah nicht so aus, als würde Ihr Code mehrere Objekte pro Schlüssel behandeln, aber das wäre leicht hinzuzufügen, indem Sie beim Hinzufügen zum Wörterbuch eine Queue<MyInfo> anstelle einer einfachen alten MyInfo verwenden.

+0

Interessant, gute Idee, dies zu kapseln. Gibt es einen Grund, warum du 'SortedDictionary' anstelle von' Dictionary' benutzt hast? Einfach schneller einfügen? – Xodarap

+0

@ Xodarap: Nun, gute Frage. Ich dachte ursprünglich, dass Sie eine 'Take'-Überladung benötigen würden, die einen Schlüssel nicht akzeptiert und das erste Element aus dem Wörterbuch entfernt. Sie benötigen jedoch ein sortiertes Wörterbuch, um das richtig zu machen. Das ist eigentlich, wie eine Prioritätswarteschlange funktionieren soll. Ich nehme an, Sie könnten ein einfaches altes Wörterbuch verwenden und stattdessen einfach die Wrapper-Klasse 'BlockingDictionary' aufrufen. Netter Fang. –

1

Es scheint, als ob Sie die Producer/Consumer-Warteschlange mit einem Observer-Muster kombinieren möchten - generischer Consumer-Thread oder Threads liest aus der Warteschlange und übergibt dann das Ereignis an den erforderlichen Code. In diesem Fall würden Sie den Observer nicht wirklich signalisieren, sondern ihn nur aufrufen, wenn der Consumer-Thread identifiziert, wer an einem bestimmten Arbeitselement interessiert ist.

Observer-Muster in .NET werden normalerweise mithilfe von C# -Ereignissen implementiert. Sie müssten nur den Event-Handler für das Objekt aufrufen und ein oder mehrere Beobachter würden dadurch aufgerufen werden. Der Zielcode müsste sich zunächst beim beobachteten Objekt registrieren, indem er sich dem Ereignis zur Benachrichtigung bei Ankunft der Arbeit hinzufügt.

+0

Das hört sich so an, als müsste ich einen 'Dictionary ' haben und 'Monitor.Pulse (Dictionary [type])' wenn 'type' aktualisiert wird? Es scheint, als könnte ich den Beobachter entfernen und mache das dann manuell vom Produzenten. (Ich stimme zu, dass es funktioniert, aber ich hatte gehofft, dass ich eine Sammlung von Konsumenten hinter den Kulissen verwenden könnte.) – Xodarap

+0

Wenn Sie 'Monitor' verwenden, um das Zielobjekt zu aktivieren, scheint ein separater Consumer-Thread über die Warteschlange redundant zu sein , sicher. –