The bottom of this article beschreibt, wie die Verwendung von GetOrAdd korrupte/unerwartete Ergebnisse verursachen kann (wenn ich es richtig verstehe).Vermeidung von veralteten (logisch korrupten) Daten bei Verwendung von "ConcurrentDictionary.GetOrAdd()", Repro-Code enthalten
snip/
ConcurrentDictionary für multithreaded Szenarien ausgelegt. Sie müssen in Ihrem Code keine Sperren verwenden, um Elemente aus der Sammlung hinzuzufügen oder zu entfernen. Es ist jedoch immer möglich, dass ein Thread einen Wert abruft und ein anderer Thread die Sammlung sofort aktualisiert, indem er demselben Schlüssel einen neuen Wert gibt. Auch
, obwohl alle Methoden der ConcurrentDictionary sind threadsicher, sind nicht alle Methoden sind atomar, spezifisch GetOrAdd und AddOrUpdate. Der Benutzerdelegate, der an diese Methoden übergeben wird, wird außerhalb der internen Sperre des Wörterbuchs aufgerufen. (Dies geschieht, um unbekannten Code verhindert, dass alle Threads blockiert.) Deshalb ist es möglich, dass diese Abfolge von Ereignissen auftreten:
1) ThreadA nennt GetOrAdd, findet kein Element und erstellt ein neues Objekt Hinzufügen von Aufrufen des valueFactory-Delegaten.
2) ThreadB GetOrAdd gleichzeitig aufruft, sind seine Delegierten valueFactory aufgerufen, und es kommt zu der internen Verriegelung vor ThreadA, und so sein neues Schlüssel-Wert-Paar wird zu dem Wörterbuch hinzugefügt.
3) ThreadA Benutzer delegieren abgeschlossen ist, und der Faden kommt bei der Sperre, aber jetzt sieht, dass das Element
bereits
existiert 4) ThreadA führt eine „Get“ und gibt die Daten, die zuvor war hinzugefügt nach threadB.Daher ist nicht garantiert, dass die Daten, die von GetOrAdd zurückgegeben werden, die gleichen Daten sind, die von valueFactory des Threads erstellt wurden. Eine ähnliche Sequenz von Ereignissen kann auftreten, wenn AddOrUpdate aufgerufen wird.
Frage
Was ist der richtige Weg, um die Daten zu überprüfen, und das Update wiederholen? Ein netter Ansatz wäre eine Erweiterungsmethode, um diese Operation basierend auf dem Inhalt des alten Werts zu versuchen/zu wiederholen.
Wie würde dies umgesetzt werden? Kann ich mich auf das Ergebnis (verify
) als gültigen Endzustand verlassen oder muss ich die Werte mit einer anderen Methode wiederholen und erneut abrufen?
-Code
Der folgende Code hat eine Race-Bedingung, wenn die Werte zu aktualisieren. Das gewünschte Verhalten besteht darin, dass AddOrUpdateWithoutRetrieving() verschiedene Werte auf verschiedene Arten erhöht (mit ++
oder Interlocked.Increment()
).
Ich möchte auch mehrere Feldoperationen in einer einzigen Einheit durchführen und das Update wiederholen, wenn das vorherige Update aufgrund einer Wettlaufsituation nicht "übernommen" hat.
Führen Sie den Code und Sie werden sehen, dass jeder Wert in der Konsole erscheinen beginnend um eins, aber jeder der Werte wird abweichen, und einige werden ein paar Iterationen vor/hinter sein.
namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();
static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();
c.DoRace(null);
}
readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;
// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];
tasks[0] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();
});
// Output results so far.
Task.WaitAll(tasks);
AddOrUpdateWithoutRetrieving();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }
public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }
//public int QueryCount { get; set; }
public int QueryCount = 0;//
public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}
public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};
TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});
// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}
}
}
Ausgabe
Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System
Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11
Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12
Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14
Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
....
Thread:11 Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658
Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659
Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660
Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
In diesem Code "eid" sollte immer 1000 mehr als Abfrage zählen, aber im Laufe der Iterationen die Differenz variiert von 1 bis 7 zwischen den beiden. Diese Inkonsistenz kann dazu führen, dass einige Anwendungen fehlschlagen oder falsche Daten melden.
'ConcurrentDictionary' ist nur Thread-Safe in Bezug auf seine eigenen Invarianten. d.h. es wird seine eigenen Daten nicht korrumpieren. Wenn Sie andere Invarianten haben, kann es möglicherweise nicht vorher über sie wissen oder erwartet werden, sie zu kompensieren. Sie müssen zusammenhängend definieren, was Ihre Invariante ist und schützen Sie als eine Transaktion mit einer Art Threading-Synchronisation. –