en / de
AI
Expertisen
Methoden
Dienstleistungen
Referenzen
Jobs & Karriere
Firma
Technologie-Trends TechCast WebCast TechBlog News Events Academy

Concurrent Collections in C#

Concurrent Collections in C# sind unverzichtbare Werkzeuge, um thread-sicheren Zugriff auf Datenstrukturen in multithreaded Anwendungen zu gewährleisten, ohne dass manuelle Sperrmechanismen erforderlich sind.

Beim Arbeiten mit Threads und Datenzugriff auf Collections kann es zu unterschiedlichen unvorhersehbaren Fehlern kommen. Eine klassische Lösung sind Locks, was bei richtiger Implementierung funktioniert, aber auch einen grossen Aufwand darstellt und bei fehlerhafter Implementierung zum Beispiel zu Deadlocks führen kann.

Concurrent Collections aus

using System.Collections.Concurrent;

sind genau dafür entwickelt worden und bieten eine elegante und effiziente Möglichkeit den Zugriff threadsicher zu implementieren. Sie sollten immer dann verwendet werden, wenn mehrere Threads gleichzeitig auf eine gemeinsame Datenstruktur zugreifen. 

Concurrent Dictionary

Das Concurrent Dictionary stellt die Funktion TryAdd() zur Verfügung

bool ConcurrentDictionary<string, string>.TryAdd(string key, string value) (+ 1 overload)

Attempts to add the specified key and value to the ConcurrentDictionary<TKey, TValue>.

Returns:
  true if the key/value pair was added to the ConcurrentDictionary<TKey, TValue> successfully; false if the key already exists.

Exceptions:
  ArgumentNullException
  OverflowException

Die funktion AddOrUpdate() bietet weitere Möglichkeiten.

string ConcurrentDictionary<string, string>.AddOrUpdate(string key, Func<string, string> addValueFactory, Func<string, string, string> updateValueFactory) (+ 2 overloads)

Uses the specified functions to add a key/value pair to the ConcurrentDictionary<TKey, TValue> if the key does not already exist, or to update a key/value pair in the ConcurrentDictionary<TKey, TValue> if the key already exists.

Returns:
  The new value for the key. This will be either be the result of addValueFactory (if the key was absent) or the result of updateValueFactory (if the key was present).

Exceptions:
  ArgumentNullException
  OverflowException

Die Methode GetOrAdd() gibt ein Wert zurück, wenn er existiert, andernfalls wird er hinzugefügt. 

string ConcurrentDictionary<string, string>.GetOrAdd(string key, string value) (+ 2 overloads)

Adds a key/value pair to the ConcurrentDictionary<TKey, TValue> if the key does not already exist. Returns the new value, or the existing value if the key exists.

Returns:
  The value for the key. This will be either the existing value for the key if the key is already in the dictionary, or the new value if the key was not in the dictionary.

Exceptions:
  ArgumentNullException
  OverflowException

Eine weitere Funktion ist TryRemove:

bool ConcurrentDictionary<string, string>.TryRemove(string key, out string value) (+ 1 overload)

Attempts to remove and return the value that has the specified key from the ConcurrentDictionary<TKey, TValue>.

Returns:
  true if the object was removed successfully; otherwise, false.

Exceptions:
  ArgumentNullException

Der folgende Code:

static class Programm
{
    private static ConcurrentDictionary<string, string> cars = 
    new ConcurrentDictionary<string, string>(); 
    
    public static void AddCar()
    {
        var success = cars.TryAdd("a", "b");
        var whoAdded = Task.CurrentId.HasValue ? ("Task " + Task.CurrentId) :
        "Main Thread";
        Console.WriteLine($"{whoAdded} {(success ? "added" :
        "failed to add")} the car");
    }

    static void Main(string[] args)
    {
        AddCar();
        Task.Factory.StartNew(AddCar).Wait(); 
        
        cars.AddOrUpdate("Ford", "F150", (o, oldCar) => "F150");
        Console.WriteLine($"Ford: {cars["Ford"]}");
        
        cars["Kia"] = "Optima"; 
        cars.GetOrAdd("Kia", "Stinger");
        Console.WriteLine($"Kia: {cars["Kia"]}");
        
        var keyToRemove = "Kia";
        var removed = cars.TryRemove(keyToRemove, out var removedKey);
        
        if (removed)
        { 
            Console.WriteLine($"{keyToRemove} was removed");
        } 
        else 
        { 
            Console.WriteLine($"Failed to remove {keyToRemove}");
        }
    }
}

Gibt aus:

Main Thread added the car
Task: 1 failed to add the car
Ford: F150
Kia: Optima
Kia was removed

 

Concurrent Queue

Eine Concurrent Queue bietet weitestgehend die gleichen Funktionen wie eine Queue.

Da wir aber threadsicher arbeiten wollen, wird versucht, eine Operation auszuführen, die Funktionen Dequeue und Peek geben also wieder ein Boolean zurück, um zu evaluieren, ob die Funktion erfolgreich war oder nicht. 

bool ConcurrentQueue<string>.TryDequeue(out string result)

Tries to remove and return the object at the beginning of the concurrent queue.

Returns:
  true if an element was removed and returned from the beginning of the ConcurrentQueue<T> successfully; otherwise, false.
bool ConcurrentQueue<string>.TryPeek(out string result)

Tries to return an object from the beginning of the ConcurrentQueue<T> without removing it.

Returns:
  true if an object was returned successfully; otherwise, false.

Der folgende Code:

static void Main(string[] args)
{
    var queue = new ConcurrentQueue<string>();
    queue.Enqueue("Ford");
    queue.Enqueue("Kia");

    if (queue.TryDequeue(out var dequeueResult))
    {
        Console.WriteLine($"Dequeued element {dequeueResult}");
    }

    if (queue.TryPeek(out var peekResult))
    {
        Console.WriteLine($"First Element is {peekResult}");
    }
}

Gibt:

Dequeued element Ford
First Element is Kia

Ist ein Dequeue nicht möglich, weil die Queue bereits von einem anderen Thread dequeued wurde, wird false zurückgegeben.

Concurrent Stack

Gleich wie bei einer Concurrent Queue haben wir bei einem Concurrent Stack Methoden, welche eine Operation versuchen und das Ergebnis zurückgeben: “TryPeek” oder “TryPop”. Zusätzlich stehen aber auch die Methoden “PushRange” und “TryPopRange” zur Verfügung.

void ConcurrentStack<string>.PushRange(string[] items) (+ 1 overload)

Inserts multiple objects at the top of the ConcurrentStack<T> atomically.

Exceptions:
  ArgumentNullException
int ConcurrentStack<string>.TryPopRange(string[] items) (+ 1 overload)

Attempts to pop and return multiple objects from the top of the ConcurrentStack<T> atomically.

Returns:
  The number of objects successfully popped from the top of the ConcurrentStack<T> and inserted in items.

Exceptions:
  ArgumentNullException

Der folgende Code:

static void Main(string[] args)
{
    var concurrentStack = new ConcurrentStack<string>();
    var values = new string[3] { "Ford", "Kia", "Honda" };

    concurrentStack.PushRange(values);

    var poppedValues = new string[2];
    if (concurrentStack.TryPopRange(poppedValues, 0, 2) > 0)
    {
        Console.WriteLine("Popped Values: " + string.Join(", ", poppedValues));  
        Console.WriteLine("Values in Stack: " + string.Join(", ", concurrentStack));    
    }
}

Gibt 

Popped Values: Honda, Kia, 
Values in Stack: Ford

 

Concurrent Bag

Concurrent Bag ist ein spezieller Datentyp, hier werden Objekte ohne eine bestimmte Reihenfolge abgespeichert. Der grosse Vorteil von Concurrent Bags ist die Performance. Jeder Thread hat seinen eigenen kleinen Bag, dadurch wird Blocking minimiert und die Performance bleibt hoch.

 

Zu sehen an folgendem Codebeispiel:

static void Main(string[] args)
{
    var concurrentBag = new ConcurrentBag<int>();

    var tasks = new List<Task>();
    for (int i = 0; i < 3; i++)
    {
        var index = i;
        tasks.Add(Task.Factory.StartNew(() =>
        {
            concurrentBag.Add(index);
            Console.WriteLine($"Task {Task.CurrentId} added {index}");
        }));
    }
    Task.WaitAll([.. tasks]);
}

Der Output ist: 

Task 1 added 1
Task 3 added 0
Task 2 added 2

Wenn wir nun mit TryTake() ein Element herausnehmen, bekommen wir ein Random-Element aus dem Bag. Concurrent Bags sind von grossem Vorteil, wenn die Reihenfolge keine Rolle spielt und die Operation performant sein muss. 

 

Producer-Consumer and BlockingCollection<T>

Blocking Collection ist keine Collection sondern ein Wrapper um Collections. Hier gibt es auch die Möglichkeit ein Hinzufügen von neuen Elementen zu blockieren, wenn zum Beispiel eine maximal gewollte Anzahl an Elementen vorhanden ist.

BlockingCollection<string>.BlockingCollection(IProducerConsumerCollection<string> collection, int boundedCapacity) (+ 3 overloads)

Initializes a new instance of the BlockingCollection<T> class with the specified upper-bound and using the provided IProducerConsumerCollection<T> as its underlying data store.

Exceptions:
  ArgumentNullException
  ArgumentOutOfRangeException
  ArgumentException
var blockingCollection = new BlockingCollection<string>(new ConcurrentBag<string>(), 5);

Hier erstellen wir eine BlockingCollection mit einem ConcurrentBag, der Bag hat eine Kapazität von 5 Elementen. Ist der Bag mit 5 Elementen gefüllt, wird ein weiterer Versuch ein Element hinzuzufügen blockiert. Sobald ein Element entfernt wurde, wird die Blockade aufgehoben. 

 

Beispiel:

Wir haben einen Producer, welcher Nachrichten generiert und unsere BlockingCollection damit befüllt.

private static void Producer()
{
    do
    {
        cancellationTokenSource.Token.ThrowIfCancellationRequested();

        var message = random.Next(50).ToString();
        messages.Add(message);
        Console.WriteLine($"Added {message}");
        Thread.Sleep(100);
    } while (true);
}

Ein Consumer, welcher diese Nachrichten nimmt und bearbeitet.

private static void Consumer()
{
    foreach (var message in messages.GetConsumingEnumerable())
    {
        cancellationTokenSource.Token.ThrowIfCancellationRequested();
        Console.WriteLine($"Consumed {message}");

        Thread.Sleep(500);
    }
}

Wir können sehen, dass die Nachrichten deutlich schneller generiert als bearbeitet werden. Das Erstellen dauert 100ms und das Bearbeiten 500ms. 

Wenn wir den folgenden Code ausführen:

static CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
static Random random = new Random();
static BlockingCollection messages = new BlockingCollection(5);

static void Main(string[] args)
{
    Task.Factory.StartNew(ProducerConsumer);
    
    Console.ReadKey();
    cancellationTokenSource.Cancel();
}

private static void ProducerConsumer()
{
    var producer = Task.Factory.StartNew(Producer);
    var consumer = Task.Factory.StartNew(Consumer);
}

können wir sehen, dass sobald die Anzahl von 5 Elementen erreicht wurde, das Hinzufügen geblockt wird und erst dann wieder freigegeben wird, wenn eine Nachricht bearbeitet wurde.

Added 26
Consumed 26
Added 7
Added 9
Added 13
Added 10
Consumed 7
Added 49
Added 36
Consumed 9
Added 11
Consumed 13
Added 30
Consumed 10
Added 30

 

Implementation ohne Concurrent Collections

Um das vorhergegangene Beispiel (Producer-Consumer and BlockingCollection<T>) ohne Blocking Collections zu verwenden, muss die Synchronisierung manuell implementiert werden.

private static void Producer()
{
    try
    {
        while (true)
        {
            cancellationTokenSource.Token.ThrowIfCancellationRequested();
            var message = random.Next(50).ToString();


            lock (_lock)
            {
                messages.Enqueue(message);
                Monitor.Pulse(_lock);
            }

            Console.WriteLine($"Added {message}");
            Thread.Sleep(100);
        }
    }
    catch (OperationCanceledException) { }
}

private static void Consumer()
{
    try
    {
        while (true)
        {
            cancellationTokenSource.Token.ThrowIfCancellationRequested();
            string message;

            lock (_lock)
            {
                while (messages.Count == 0)
                {
                    Monitor.Wait(_lock);
                }

                message = messages.Dequeue();
            }

            Console.WriteLine($"Consumed {message}");
            Thread.Sleep(500);
        }
    }
    catch (OperationCanceledException) { }
}

 

Nachteile 

Wenn concurrent collections in Single-Thread Szenarien verwendet werden, kann es zu Performance Einbussen kommen, die Synchronisierungsmechanismen brauchen Zeit.  Funktionen zur Indexierung oder Sortierung sind oft nicht verfügbar. Die Funktion .Count() gibt die Anzahl der Elemente zurück, diese kann sich aber zum Ausgabezeitpunkt bereits geändert haben, wenn ein anderer Thread auf die Daten zugreift. 

Fazit

Concurrent Collections abstrahieren die komplexe Logik von Locks und Synchronisierung. Mit ihrer robusten und leicht verständlichen API machen sie es dem Entwickler leicht, diese anzuwenden. Mit BlockingCollections ist ein Producer-Consumer-Pattern schnell und einfach implementiert. Wenn wir also das nächste Mal ein lock(object) um eine Collection schreiben, wäre die Überlegung angebracht, ob eine Concurrent Collection nicht vielleicht die bessere Wahl wäre.

Kommentare

Schreiben Sie einen Kommentar

Ihre E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Newsletter - aktuelle Angebote, exklusive Tipps und spannende Neuigkeiten

 Jetzt anmelden

Copyright © 2025 Noser Engineering AG – Alle Rechte vorbehalten.

NACH OBEN
Privacy Policy Cookie Policy
Zur Webcast Übersicht