5

The bottom of this article beschreibt, wie die Verwendung von GetOrAdd korrupte/unerwartete Ergebnisse verursachen kann (wenn ich es richtig verstehe).Vermeidung von veralteten (logisch korrupten) Daten bei Verwendung von "ConcurrentDictionary.GetOrAdd()", Repro-Code enthalten

snip/

ConcurrentDictionary für multithreaded Szenarien ausgelegt. Sie müssen in Ihrem Code keine Sperren verwenden, um Elemente aus der Sammlung hinzuzufügen oder zu entfernen. Es ist jedoch immer möglich, dass ein Thread einen Wert abruft und ein anderer Thread die Sammlung sofort aktualisiert, indem er demselben Schlüssel einen neuen Wert gibt. Auch

, obwohl alle Methoden der ConcurrentDictionary sind threadsicher, sind nicht alle Methoden sind atomar, spezifisch GetOrAdd und AddOrUpdate. Der Benutzerdelegate, der an diese Methoden übergeben wird, wird außerhalb der internen Sperre des Wörterbuchs aufgerufen. (Dies geschieht, um unbekannten Code verhindert, dass alle Threads blockiert.) Deshalb ist es möglich, dass diese Abfolge von Ereignissen auftreten:

1) ThreadA nennt GetOrAdd, findet kein Element und erstellt ein neues Objekt Hinzufügen von Aufrufen des valueFactory-Delegaten.

2) ThreadB GetOrAdd gleichzeitig aufruft, sind seine Delegierten valueFactory aufgerufen, und es kommt zu der internen Verriegelung vor ThreadA, und so sein neues Schlüssel-Wert-Paar wird zu dem Wörterbuch hinzugefügt.

3) ThreadA Benutzer delegieren abgeschlossen ist, und der Faden kommt bei der Sperre, aber jetzt sieht, dass das Element

bereits

existiert 4) ThreadA führt eine „Get“ und gibt die Daten, die zuvor war hinzugefügt nach threadB.

Daher ist nicht garantiert, dass die Daten, die von GetOrAdd zurückgegeben werden, die gleichen Daten sind, die von valueFactory des Threads erstellt wurden. Eine ähnliche Sequenz von Ereignissen kann auftreten, wenn AddOrUpdate aufgerufen wird.

Frage

Was ist der richtige Weg, um die Daten zu überprüfen, und das Update wiederholen? Ein netter Ansatz wäre eine Erweiterungsmethode, um diese Operation basierend auf dem Inhalt des alten Werts zu versuchen/zu wiederholen.

Wie würde dies umgesetzt werden? Kann ich mich auf das Ergebnis (verify) als gültigen Endzustand verlassen oder muss ich die Werte mit einer anderen Methode wiederholen und erneut abrufen?

-Code

Der folgende Code hat eine Race-Bedingung, wenn die Werte zu aktualisieren. Das gewünschte Verhalten besteht darin, dass AddOrUpdateWithoutRetrieving() verschiedene Werte auf verschiedene Arten erhöht (mit ++ oder Interlocked.Increment()).

Ich möchte auch mehrere Feldoperationen in einer einzigen Einheit durchführen und das Update wiederholen, wenn das vorherige Update aufgrund einer Wettlaufsituation nicht "übernommen" hat.

Führen Sie den Code und Sie werden sehen, dass jeder Wert in der Konsole erscheinen beginnend um eins, aber jeder der Werte wird abweichen, und einige werden ein paar Iterationen vor/hinter sein.

namespace DictionaryHowTo 
{ 
    using System; 
    using System.Collections.Concurrent; 
    using System.Collections.Generic; 
    using System.Linq; 
    using System.Text; 
    using System.Threading; 
    using System.Threading.Tasks; 

    // The type of the Value to store in the dictionary: 
    class FilterConcurrentDuplicate 
    { 
     // Create a new concurrent dictionary. 
     readonly ConcurrentDictionary<int, TestData> eventLogCache = 
      new ConcurrentDictionary<int, TestData>(); 

     static void Main() 
     { 
      FilterConcurrentDuplicate c = new FilterConcurrentDuplicate(); 

      c.DoRace(null); 
     } 

     readonly ConcurrentDictionary<int, TestData> concurrentCache = 
      new ConcurrentDictionary<int, TestData>(); 
     void DoRace(string[] args) 
     { 
      int max = 1000; 

      // Add some key/value pairs from multiple threads. 
      Task[] tasks = new Task[3]; 

      tasks[0] = Task.Factory.StartNew(() => 
      { 

       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 500); 

       Thread.Sleep(MyRandomNumber); 
       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[1] = Task.Factory.StartNew(() => 
      { 
       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 1000); 

       Thread.Sleep(MyRandomNumber); 

       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[2] = Task.Factory.StartNew(() => 
      { 
       AddOrUpdateWithoutRetrieving(); 

      }); 
      // Output results so far. 
      Task.WaitAll(tasks); 

      AddOrUpdateWithoutRetrieving(); 

      Console.WriteLine("Press any key."); 
      Console.ReadKey(); 
     } 
     public class TestData : IEqualityComparer<TestData> 
     { 
      public string aStr1 { get; set; } 
      public Guid? aGud1 { get; set; } 
      public string aStr2 { get; set; } 
      public int aInt1 { get; set; } 
      public long? aLong1 { get; set; } 

      public DateTime aDate1 { get; set; } 
      public DateTime? aDate2 { get; set; } 

      //public int QueryCount { get; set; } 
      public int QueryCount = 0;// 

      public string zData { get; set; } 
      public bool Equals(TestData x, TestData y) 
      { 
       return x.aStr1 == y.aStr1 && 
        x.aStr2 == y.aStr2 && 
         x.aGud1 == y.aGud1 && 
         x.aStr2 == y.aStr2 && 
         x.aInt1 == y.aInt1 && 
         x.aLong1 == y.aLong1 && 
         x.aDate1 == y.aDate1 && 
         x.QueryCount == y.QueryCount ; 
      } 

      public int GetHashCode(TestData obj) 
      { 
       TestData ci = (TestData)obj; 
       // http://stackoverflow.com/a/263416/328397 
       return 
        new { 
         A = ci.aStr1, 
         Aa = ci.aStr2, 
         B = ci.aGud1, 
         C = ci.aStr2, 
         D = ci.aInt1, 
         E = ci.aLong1, 
         F = ci.QueryCount , 
         G = ci.aDate1}.GetHashCode(); 
      } 
     } 
     private void AddOrUpdateWithoutRetrieving() 
     { 
      // Sometime later. We receive new data from some source. 
      TestData ci = new TestData() 
      { 
       aStr1 = "Austin", 
       aGud1 = new Guid(), 
       aStr2 = "System", 
       aLong1 = 100, 
       aInt1 = 1000, 
       QueryCount = 0, 
       aDate1 = DateTime.MinValue 
      }; 

      TestData verify = concurrentCache.AddOrUpdate(123, ci, 
       (key, existingVal) => 
       { 
        existingVal.aStr2 = "test1" + existingVal.QueryCount; 
        existingVal.aDate1 = DateTime.MinValue; 
        Console.WriteLine 
        ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count A:" + existingVal.QueryCount); 
        Interlocked.Increment(ref existingVal.QueryCount); 
        System.Random RandNum = new System.Random(); 
        int MyRandomNumber = RandNum.Next(1, 1000); 

        Thread.Sleep(MyRandomNumber); 
        existingVal.aInt1++; 
        existingVal.aDate1 = 
         existingVal.aDate1.AddSeconds 
         (existingVal.aInt1); 
        Console.WriteLine(
          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count B:" + existingVal.QueryCount); 
        return existingVal; 
       }); 


      // After each run, every value here should be ++ the previous value 
      Console.WriteLine(
       "Thread:"+Thread.CurrentThread.ManagedThreadId + 
       ": Query Count returned:" + verify.QueryCount + 
       " eid:" + verify.aInt1 + " date:" + 
       verify.aDate1.Hour + " " + verify.aDate1.Second + 
       " NAME:" + verify.aStr2 
       ); 
     } 

    } 
} 

Ausgabe

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System 

Thread:12 Query Count A:0 
Thread:13 Query Count A:1 
Thread:12 Query Count B:2 
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11 

Thread:12 Query Count A:2 
Thread:13 Query Count B:3 
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12 

Thread:13 Query Count A:3 
Thread:11 Query Count A:4 
Thread:11 Query Count B:5 
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14 

Thread:11 Query Count A:5 
Thread:13 Query Count B:6 
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15 

....

Thread:11 Query Count A:658 
Thread:11 Query Count B:659 
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658 

Thread:11 Query Count A:659 
Thread:11 Query Count B:660 
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659 

Thread:11 Query Count A:660 
Thread:11 Query Count B:661 
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660 

Thread:11 Query Count A:661 
Thread:11 Query Count B:662 
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661 

In diesem Code "eid" sollte immer 1000 mehr als Abfrage zählen, aber im Laufe der Iterationen die Differenz variiert von 1 bis 7 zwischen den beiden. Diese Inkonsistenz kann dazu führen, dass einige Anwendungen fehlschlagen oder falsche Daten melden.

+0

'ConcurrentDictionary' ist nur Thread-Safe in Bezug auf seine eigenen Invarianten. d.h. es wird seine eigenen Daten nicht korrumpieren. Wenn Sie andere Invarianten haben, kann es möglicherweise nicht vorher über sie wissen oder erwartet werden, sie zu kompensieren. Sie müssen zusammenhängend definieren, was Ihre Invariante ist und schützen Sie als eine Transaktion mit einer Art Threading-Synchronisation. –

Antwort

4

Diese Vorlage auf einem falschen Verständnis der Ausführungen am Ende des Artikels basiert „Gewusst wie: Hinzufügen und Entfernen von Elementen aus einer ConcurrentDictionaryhttp://msdn.microsoft.com/en-us/library/dd997369.aspx und auf einem grundlegenden Concurrency Fehler - gleichzeitige Nicht-Atom-Modifikation eines gemeinsames Objekt

Zuerst klären wir, was der verlinkte Artikel wirklich sagt. Ich verwende AddOrUpdate als Beispiel, aber die Argumentation für GetOrAdd ist äquivalent.

Angenommen, Sie rufen AddOrUpdate aus mehreren Threads und geben Sie den gleichen Schlüssel. Angenommen, ein Eintrag mit diesem Schlüssel existiert bereits. Jeder Thread wird kommen, beachten Sie, dass es bereits einen Eintrag mit dem angegebenen Schlüssel gibt und dass der Update-Teil von AddOrUpdate relevant ist. Dabei blockiert kein Thread das Wörterbuch. Stattdessen werden einige verriegelte Anweisungen verwendet, um automatisch zu prüfen, ob ein Eintragsschlüssel existiert oder nicht.

So haben unsere verschiedenen Threads alle bemerkt, dass der Schlüssel existiert und dass die updateValueFactory aufgerufen werden muss. Dieser Delegat wird an AddOrUpdate übergeben. Es nimmt Bezug auf den vorhandenen Schlüssel und Wert und gibt den Aktualisierungswert zurück. Jetzt rufen alle beteiligten Threads die Factory gleichzeitig auf. Sie werden alle in einer zuvor unbekannten Reihenfolge abgeschlossen sein und jeder Thread wird versuchen, eine atomare Operation (mit verriegelten Anweisungen) zu verwenden, um den vorhandenen Wert durch den Wert zu ersetzen, den er gerade berechnet hat. Es gibt keine Möglichkeit zu wissen, welcher Thread "gewinnt". Der Thread, der gewinnt, wird seinen berechneten Wert speichern. Andere werden feststellen, dass der Wert im Wörterbuch nicht mehr der Wert ist, der als Argument an updateValueFactory übergeben wurde. Als Reaktion auf diese Erkenntnis werden sie die Operation aufgeben und den gerade berechneten Wert wegwerfen.Das ist genau das, was Sie wollen.

Als nächstes können klären, warum Sie seltsame Werte erhalten, wenn das Codebeispiel hier aufgeführten ausgeführt wird:

Daran erinnern, dass die updateValueFactory Delegierten AddOrUpdate weitergegeben BEZUG den vorhandenen Schlüssel und Wert annimmt und gibt die update-Wert. Das Codebeispiel in seiner AddOrUpdateWithoutRetrieving() -Methode startet die Ausführung von Operationen direkt mit dieser Referenz. Anstatt einen neuen Ersatzwert zu erstellen und THAT zu ändern, werden die Instanzmemberwerte von existingVal - ein Objekt, das sich bereits im Wörterbuch befindet - geändert, und der Verweis wird einfach zurückgegeben. Und das tut es nicht atomar - es liest einige Werte, aktualisiert einige Werte, liest mehr, aktualisiert mehr. Natürlich haben wir oben gesehen, dass dies bei mehreren Threads gleichzeitig geschieht - sie alle modifizieren das SAME-Objekt. Kein Wunder, das Ergebnis ist, dass zu jeder Zeit (wenn das Codebeispiel WriteLine aufruft) das Objekt Memberinstanzwerte enthält, die von verschiedenen Threads stammen.

Das Wörterbuch hat damit nichts zu tun - der Code ändert lediglich ein Objekt, das nicht atomar zwischen Threads geteilt wird. Dies ist einer der häufigsten Nebenläufigkeitsfehler. Die zwei häufigsten Problemumgehungen hängen vom Szenario ab. Verwenden Sie eine gemeinsame Sperre, um die gesamte Objektänderung als atomar zu definieren, oder kopieren Sie zuerst das gesamte Objekt und anschließend die lokale Kopie.

Für letztere versuchen, diese zu der Testdata Klasse:

private Object _copyLock = null; 

private Object GetLock() { 

    if (_copyLock != null) 
     return _copyLock; 

    Object newLock = new Object(); 
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null); 
    return (prevLock == null) ? newLock : prevLock; 
} 

public TestData Copy() { 

    lock (GetLock()) { 
     TestData copy = new TestData(); 
     copy.aStr1 = this.aStr1; 
     copy.aStr2 = this.aStr2; 
     copy.aLong1 = this.aLong1; 
     copy.aInt1 = this.aInt1; 
     copy.QueryCount = this.QueryCount; 
     copy.aDate1 = this.aDate1; 
     copy.aDate2 = this.aDate2; 
     copy.zData = this.zData; 

     return copy; 
    } 
} 

Dann das Werk wie folgt ändern:

TestData verify = concurrentCache.AddOrUpdate(123, ci, 
    (key, existingVal) => 
    { 
     TestData newVal = existingVal.Copy(); 
     newVal.aStr2 = "test1" + newVal.QueryCount; 
     newVal.aDate1 = DateTime.MinValue; 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count A:" + newVal.QueryCount); 
     Interlocked.Increment(ref newVal.QueryCount); 
     System.Random RandNum = new System.Random(); 
     int MyRandomNumber = RandNum.Next(1, 1000); 

     Thread.Sleep(MyRandomNumber); 
     newVal.aInt1++; 
     newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1); 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count B:" + newVal.QueryCount); 
     return newVal; 
    }); 

Ich hoffe, das hilft.

3

Wahrscheinlich ist der richtige Weg, sich nicht darum zu kümmern, ob der zurückgegebene Wert nicht derjenige ist, der von valueFactory erzeugt wird. Wenn dies nicht akzeptabel ist, müssen Sie eine Sperre verwenden.

+0

Siehe den angehängten Code ... Ich denke, eine C# -Erweiterungsmethode ist erforderlich, um zu versuchen ... wiederholen Sie das Update, bis es konsistent ist. Das oder ein Spinlock einbauen. Nicht interessiert ist nicht die Antwort IMO – LamonteCristo

+0

Solange die valueFactory keine Nebenwirkungen hat (was wahrscheinlich eine gute Idee ist), sollte es in der Regel keine Rolle spielen. – erikkallen

2

Es gibt keinen allgemeinen Schutz, der immer funktioniert. Eine häufige Problemumgehung besteht jedoch darin, eine Lazy<T> anstelle einer T zurückzugeben. Auf diese Weise schadet das Erstellen von unnötigen Faulen nicht, weil der Wille niemals gestartet wird. Nur ein Lazy wird es zum endgültigen Wert machen, der dem Schlüssel entspricht. Nur eine bestimmte Lazy-Instanz wird jemals zurückgegeben.

+0

Wissen Sie, wie ich dies in einer Erweiterungsmethode implementieren kann, die das Problem anspricht? – LamonteCristo

+0

http://blogs.msdn.com/b/pfxteam/archive/2010/04/23/10001621.aspx – usr

1

Sie könnten this implementation von GetOrAdd vom Mann selbst verwenden. Beachten Sie, dass auch hier die Factory aufgerufen werden kann, ohne dass das Ergebnis dem Wörterbuch hinzugefügt wird. Aber Sie würden einen Hinweis bekommen, was passiert ist.

+0

Ja, eine Erweiterungsfunktion, die GetOrAdd Wraps ist, was ich suche. Ich habe Probleme beim Erstellen einer Überladung, die das GetOrAdd automatisch wiederholt, wenn die Beschädigung auftritt. – LamonteCristo

Verwandte Themen