Threading – thread-safe size capped queue

Yan Cui

I help clients go faster for less using serverless technologies.

This article is brought to you by

Don’t reinvent the patterns. Catalyst gives you consistent APIs for messaging, data, and workflow with key microservice patterns like circuit-breakers and retries for free.

Try the Catalyst beta

Here is a queue class based on the implementation Marc Gravell provided in this StackOverflow question:

/// <summary>
/// A thread-safe fixed sized queue implementation
/// See: http://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
/// </summary>
public sealed class SizeQueue<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly int _maxSize;
    private readonly object _syncRoot = new object();
    public SizeQueue(int maxSize)
    {
        _maxSize = maxSize;
    }
    public int Count
    {
        get
        {
            lock (_syncRoot)
            {
                return _queue.Count;
            }
        }
    }
    public object SyncRoot
    {
        get
        {
            return _syncRoot;
        }
    }
    /// <summary>
    /// Puts an item onto the queue
    /// </summary>
    public void Enqueue(T item)
    {
        lock (_syncRoot)
        {
            // don't enqueue new item if the max size has been met
            while (_queue.Count >= _maxSize)
            {
                Monitor.Wait(_syncRoot);
            }
            _queue.Enqueue(item);
            // wake up any blocked dequeue
            Monitor.PulseAll(_syncRoot);
        }
    }
    /// <summary>
    /// Returns the first item from the queue
    /// </summary>
    public T Dequeue()
    {
        return Dequeue(1).FirstOrDefault();
    }
    /// <summary>
    /// Returns the requested number of items from the head of the queue
    /// </summary>
    public IEnumerable<T> Dequeue(int count)
    {
        lock (_syncRoot)
        {
            // wait until there're items on the queue
            while (_queue.Count == 0)
            {
                Monitor.Wait(_syncRoot);
            }
            
            // read as many items off the queue as required (and possible)
            var items = new List<T>();
            while (count > 0 && _queue.Count > 0)
            {
                items.Add(_queue.Dequeue());
                count--;
            }
           return items;
        }
    }
}

Whenever you’re ready, here are 3 ways I can help you:

  1. Production-Ready Serverless: Join 20+ AWS Heroes & Community Builders and 1000+ other students in levelling up your serverless game. This is your one-stop shop for quickly levelling up your serverless skills.
  2. I help clients launch product ideas, improve their development processes and upskill their teams. If you’d like to work together, then let’s get in touch.
  3. Join my community on Discord, ask questions, and join the discussion on all things AWS and Serverless.

1 thought on “Threading – thread-safe size capped queue”

  1. You need to pulse on the Dequeue as well, or else it won’t enqueue anymore if the queue fills up.

Leave a Comment

Your email address will not be published. Required fields are marked *