Pages

Search

Friday, July 20, 2012

Producer-Consumer Problem using TPL and Blocking Collection

.NET 4.0 frameworks, has introduced TPL (Task Parallel Library) that has several advantages in achieving concurrency. Also there are thread safe collections like “ConcurrentQueue”, “BlockingCollection”, “ConcurrentDictionary”, “ConcurrentBag”, etc….

Producer would broadcast the data maybe which should be thread safe, in case if there are multiple producers who post the data.
Consumer reads the data published may be via queue.
At a point of time consumers should know or signaled saying the posting of data is completed by the producers or source.
Below example is considering there are 3 producers (3 tasks) adding numbers from 1 to 100 to queue in parallel.
At the same time, there are 2 consumers (2 tasks) reading numbers from the queue.
So adding and reading happens parallel using 5 tasks (3 for adding, 2 for reading). However when adding is completed, reading tasks should be signaled and that helps consumers to quit from reading data.
For this we use “BlockingCollection”, which is a new thread safe collection class in .NET 4.0.
Enumerator from BlockingCollection, has a feature to pop the item from the collection. Which mean, when we iterate using enumerator of blocking collection, it not only reads the item but also removes the item which is read from the collection. Also it tries to pop the items until it is signaled (calling “CompleteAdding” method).
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
  var queue = newBlockingCollection<int>();
                var producers = Enumerable.Range(1, 3)
                    .Select(_ => Task.Factory.StartNew(
                        () =>
                        {
                            Enumerable.Range(1, 100)
                                .ToList()
                                .ForEach((i) =>
                            {
                                queue.Add(i);
                                Thread.Sleep(100);
                            });
                        }
                        ))
                        .ToArray();

                var consumers = Enumerable.Range(1, 2)
                    .Select(_ => Task.Factory.StartNew(
                        () =>
                        {
                            foreach (var item in queue.GetConsumingEnumerable())
                            {
                                Console.WriteLine(item);
                            }
                        }
                        ))
                      .ToArray();
                Task.WaitAll(producers);
                queue.CompleteAdding();
                Task.WaitAll(consumers);
 Console.WriteLine("Done!");
            }
            catch (Exceptionexp)
            {
                Console.WriteLine("Error : " + exp.Message);
            }
            Console.Read();