Concurrent Collections in C# sind unverzichtbare Werkzeuge, um thread-sicheren Zugriff auf Datenstrukturen in multithreaded Anwendungen zu gewährleisten, ohne dass manuelle Sperrmechanismen erforderlich sind.
Beim Arbeiten mit Threads und Datenzugriff auf Collections kann es zu unterschiedlichen unvorhersehbaren Fehlern kommen. Eine klassische Lösung sind Locks, was bei richtiger Implementierung funktioniert, aber auch einen grossen Aufwand darstellt und bei fehlerhafter Implementierung zum Beispiel zu Deadlocks führen kann.
Concurrent Collections aus
using System.Collections.Concurrent;
sind genau dafür entwickelt worden und bieten eine elegante und effiziente Möglichkeit den Zugriff threadsicher zu implementieren. Sie sollten immer dann verwendet werden, wenn mehrere Threads gleichzeitig auf eine gemeinsame Datenstruktur zugreifen.
Das Concurrent Dictionary stellt die Funktion TryAdd() zur Verfügung
bool ConcurrentDictionary<string, string>.TryAdd(string key, string value) (+ 1 overload) Attempts to add the specified key and value to the ConcurrentDictionary<TKey, TValue>. Returns: true if the key/value pair was added to the ConcurrentDictionary<TKey, TValue> successfully; false if the key already exists. Exceptions: ArgumentNullException OverflowException
Die funktion AddOrUpdate() bietet weitere Möglichkeiten.
string ConcurrentDictionary<string, string>.AddOrUpdate(string key, Func<string, string> addValueFactory, Func<string, string, string> updateValueFactory) (+ 2 overloads) Uses the specified functions to add a key/value pair to the ConcurrentDictionary<TKey, TValue> if the key does not already exist, or to update a key/value pair in the ConcurrentDictionary<TKey, TValue> if the key already exists. Returns: The new value for the key. This will be either be the result of addValueFactory (if the key was absent) or the result of updateValueFactory (if the key was present). Exceptions: ArgumentNullException OverflowException
Die Methode GetOrAdd() gibt ein Wert zurück, wenn er existiert, andernfalls wird er hinzugefügt.
string ConcurrentDictionary<string, string>.GetOrAdd(string key, string value) (+ 2 overloads) Adds a key/value pair to the ConcurrentDictionary<TKey, TValue> if the key does not already exist. Returns the new value, or the existing value if the key exists. Returns: The value for the key. This will be either the existing value for the key if the key is already in the dictionary, or the new value if the key was not in the dictionary. Exceptions: ArgumentNullException OverflowException
Eine weitere Funktion ist TryRemove:
bool ConcurrentDictionary<string, string>.TryRemove(string key, out string value) (+ 1 overload) Attempts to remove and return the value that has the specified key from the ConcurrentDictionary<TKey, TValue>. Returns: true if the object was removed successfully; otherwise, false. Exceptions: ArgumentNullException
Der folgende Code:
static class Programm
{
private static ConcurrentDictionary<string, string> cars =
new ConcurrentDictionary<string, string>();
public static void AddCar()
{
var success = cars.TryAdd("a", "b");
var whoAdded = Task.CurrentId.HasValue ? ("Task " + Task.CurrentId) :
"Main Thread";
Console.WriteLine($"{whoAdded} {(success ? "added" :
"failed to add")} the car");
}
static void Main(string[] args)
{
AddCar();
Task.Factory.StartNew(AddCar).Wait();
cars.AddOrUpdate("Ford", "F150", (o, oldCar) => "F150");
Console.WriteLine($"Ford: {cars["Ford"]}");
cars["Kia"] = "Optima";
cars.GetOrAdd("Kia", "Stinger");
Console.WriteLine($"Kia: {cars["Kia"]}");
var keyToRemove = "Kia";
var removed = cars.TryRemove(keyToRemove, out var removedKey);
if (removed)
{
Console.WriteLine($"{keyToRemove} was removed");
}
else
{
Console.WriteLine($"Failed to remove {keyToRemove}");
}
}
}
Gibt aus:
Main Thread added the car Task: 1 failed to add the car Ford: F150 Kia: Optima Kia was removed
Eine Concurrent Queue bietet weitestgehend die gleichen Funktionen wie eine Queue.
Da wir aber threadsicher arbeiten wollen, wird versucht, eine Operation auszuführen, die Funktionen Dequeue und Peek geben also wieder ein Boolean zurück, um zu evaluieren, ob die Funktion erfolgreich war oder nicht.
bool ConcurrentQueue<string>.TryDequeue(out string result) Tries to remove and return the object at the beginning of the concurrent queue. Returns: true if an element was removed and returned from the beginning of the ConcurrentQueue<T> successfully; otherwise, false.
bool ConcurrentQueue<string>.TryPeek(out string result) Tries to return an object from the beginning of the ConcurrentQueue<T> without removing it. Returns: true if an object was returned successfully; otherwise, false.
Der folgende Code:
static void Main(string[] args)
{
var queue = new ConcurrentQueue<string>();
queue.Enqueue("Ford");
queue.Enqueue("Kia");
if (queue.TryDequeue(out var dequeueResult))
{
Console.WriteLine($"Dequeued element {dequeueResult}");
}
if (queue.TryPeek(out var peekResult))
{
Console.WriteLine($"First Element is {peekResult}");
}
}
Gibt:
Dequeued element Ford First Element is Kia
Ist ein Dequeue nicht möglich, weil die Queue bereits von einem anderen Thread dequeued wurde, wird false zurückgegeben.
Gleich wie bei einer Concurrent Queue haben wir bei einem Concurrent Stack Methoden, welche eine Operation versuchen und das Ergebnis zurückgeben: “TryPeek” oder “TryPop”. Zusätzlich stehen aber auch die Methoden “PushRange” und “TryPopRange” zur Verfügung.
void ConcurrentStack<string>.PushRange(string[] items) (+ 1 overload) Inserts multiple objects at the top of the ConcurrentStack<T> atomically. Exceptions: ArgumentNullException
int ConcurrentStack<string>.TryPopRange(string[] items) (+ 1 overload) Attempts to pop and return multiple objects from the top of the ConcurrentStack<T> atomically. Returns: The number of objects successfully popped from the top of the ConcurrentStack<T> and inserted in items. Exceptions: ArgumentNullException
Der folgende Code:
static void Main(string[] args)
{
var concurrentStack = new ConcurrentStack<string>();
var values = new string[3] { "Ford", "Kia", "Honda" };
concurrentStack.PushRange(values);
var poppedValues = new string[2];
if (concurrentStack.TryPopRange(poppedValues, 0, 2) > 0)
{
Console.WriteLine("Popped Values: " + string.Join(", ", poppedValues));
Console.WriteLine("Values in Stack: " + string.Join(", ", concurrentStack));
}
}
Gibt
Popped Values: Honda, Kia, Values in Stack: Ford
Concurrent Bag ist ein spezieller Datentyp, hier werden Objekte ohne eine bestimmte Reihenfolge abgespeichert. Der grosse Vorteil von Concurrent Bags ist die Performance. Jeder Thread hat seinen eigenen kleinen Bag, dadurch wird Blocking minimiert und die Performance bleibt hoch.
Zu sehen an folgendem Codebeispiel:
static void Main(string[] args)
{
var concurrentBag = new ConcurrentBag<int>();
var tasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
var index = i;
tasks.Add(Task.Factory.StartNew(() =>
{
concurrentBag.Add(index);
Console.WriteLine($"Task {Task.CurrentId} added {index}");
}));
}
Task.WaitAll([.. tasks]);
}
Der Output ist:
Task 1 added 1 Task 3 added 0 Task 2 added 2
Wenn wir nun mit TryTake() ein Element herausnehmen, bekommen wir ein Random-Element aus dem Bag. Concurrent Bags sind von grossem Vorteil, wenn die Reihenfolge keine Rolle spielt und die Operation performant sein muss.
Blocking Collection ist keine Collection sondern ein Wrapper um Collections. Hier gibt es auch die Möglichkeit ein Hinzufügen von neuen Elementen zu blockieren, wenn zum Beispiel eine maximal gewollte Anzahl an Elementen vorhanden ist.
BlockingCollection<string>.BlockingCollection(IProducerConsumerCollection<string> collection, int boundedCapacity) (+ 3 overloads) Initializes a new instance of the BlockingCollection<T> class with the specified upper-bound and using the provided IProducerConsumerCollection<T> as its underlying data store. Exceptions: ArgumentNullException ArgumentOutOfRangeException ArgumentException
var blockingCollection = new BlockingCollection<string>(new ConcurrentBag<string>(), 5);
Hier erstellen wir eine BlockingCollection mit einem ConcurrentBag, der Bag hat eine Kapazität von 5 Elementen. Ist der Bag mit 5 Elementen gefüllt, wird ein weiterer Versuch ein Element hinzuzufügen blockiert. Sobald ein Element entfernt wurde, wird die Blockade aufgehoben.
Beispiel:
Wir haben einen Producer, welcher Nachrichten generiert und unsere BlockingCollection damit befüllt.
private static void Producer()
{
do
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();
var message = random.Next(50).ToString();
messages.Add(message);
Console.WriteLine($"Added {message}");
Thread.Sleep(100);
} while (true);
}
Ein Consumer, welcher diese Nachrichten nimmt und bearbeitet.
private static void Consumer()
{
foreach (var message in messages.GetConsumingEnumerable())
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();
Console.WriteLine($"Consumed {message}");
Thread.Sleep(500);
}
}
Wir können sehen, dass die Nachrichten deutlich schneller generiert als bearbeitet werden. Das Erstellen dauert 100ms und das Bearbeiten 500ms.
Wenn wir den folgenden Code ausführen:
static CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
static Random random = new Random();
static BlockingCollection messages = new BlockingCollection(5);
static void Main(string[] args)
{
Task.Factory.StartNew(ProducerConsumer);
Console.ReadKey();
cancellationTokenSource.Cancel();
}
private static void ProducerConsumer()
{
var producer = Task.Factory.StartNew(Producer);
var consumer = Task.Factory.StartNew(Consumer);
}
können wir sehen, dass sobald die Anzahl von 5 Elementen erreicht wurde, das Hinzufügen geblockt wird und erst dann wieder freigegeben wird, wenn eine Nachricht bearbeitet wurde.
Added 26 Consumed 26 Added 7 Added 9 Added 13 Added 10 Consumed 7 Added 49 Added 36 Consumed 9 Added 11 Consumed 13 Added 30 Consumed 10 Added 30
Um das vorhergegangene Beispiel (Producer-Consumer and BlockingCollection<T>) ohne Blocking Collections zu verwenden, muss die Synchronisierung manuell implementiert werden.
private static void Producer()
{
try
{
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();
var message = random.Next(50).ToString();
lock (_lock)
{
messages.Enqueue(message);
Monitor.Pulse(_lock);
}
Console.WriteLine($"Added {message}");
Thread.Sleep(100);
}
}
catch (OperationCanceledException) { }
}
private static void Consumer()
{
try
{
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();
string message;
lock (_lock)
{
while (messages.Count == 0)
{
Monitor.Wait(_lock);
}
message = messages.Dequeue();
}
Console.WriteLine($"Consumed {message}");
Thread.Sleep(500);
}
}
catch (OperationCanceledException) { }
}
Wenn concurrent collections in Single-Thread Szenarien verwendet werden, kann es zu Performance Einbussen kommen, die Synchronisierungsmechanismen brauchen Zeit. Funktionen zur Indexierung oder Sortierung sind oft nicht verfügbar. Die Funktion .Count() gibt die Anzahl der Elemente zurück, diese kann sich aber zum Ausgabezeitpunkt bereits geändert haben, wenn ein anderer Thread auf die Daten zugreift.
Concurrent Collections abstrahieren die komplexe Logik von Locks und Synchronisierung. Mit ihrer robusten und leicht verständlichen API machen sie es dem Entwickler leicht, diese anzuwenden. Mit BlockingCollections ist ein Producer-Consumer-Pattern schnell und einfach implementiert. Wenn wir also das nächste Mal ein lock(object) um eine Collection schreiben, wäre die Überlegung angebracht, ob eine Concurrent Collection nicht vielleicht die bessere Wahl wäre.
Schreiben Sie einen Kommentar