Producer Consumer inter-thread communication using Monitor in C#

In this example we show inter-thread communication between producer and consumer threads using Monitor Wait and Pulse functions.  In the end two threads will ping-pong passing control to each other.

Code can be modified in such a way that producer will read and queue chunks of data from some source while consumer thread will process data from the queue. For example, we could be reading some data from a network in chunks and we want to process data as soon as new chunk arrives. In that scenario, ping-ponging will not be required as consumer should be processing data chunks independently of producer queuing it.

First we have to acquire lock on a producer thread because we can only make notifications to other threads from a synchronized block of code. Next we have to call Monitor.Wait() to release the lock and to change thread state to waiting. Now we can add a data chunk we have received from our data source to the queue and pulse to another thread that it can wake up and start processing the data. We do that by calling Monitor.Pulse() which changes state of the waiting thread to ready. Once producer thread releases the lock, consumer thread acquires the lock and can process data. Then we pulse from the consumer back to producer.

In production we would want consumer to process data independently from the producer thread where the latter will be accumulating data in the queue while the consumer does something with the data.

/* 
// Desc: Demonstrates inter-thread communication using Monitor. 
//       Producer and consumer threads will ping-pong between each other. 
//       Code can be modified in such a way that producer will read and queue  
//         chunks of data from some source, network stream for example, while 
//         consumer thread will process data from the queue. In that scenario, 
//         ping-ponging will not be required as consumer should be disposing of data 
//         independently of producer queuing it. 
*/ 

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace Bisque 
{ 
    public sealed class ProducerConsumer 
    { 
        const int MagicNumber = 30;                                 // Indicates how many times to bounce between ping and pong threads 
        private Object m_lock = new Object();                       // Lock to protect counter increment 
        private Queue<int> m_queue = new Queue<int>(); 

        // Ctor 
        public ProducerConsumer() 
        { 
        } 

        // Ping 
        public void Producer() 
        { 
            int counter = 0; 

            lock (m_lock)                                           // Allows only one thread at a time inside m_lock 
            { 
                while (counter < MagicNumber) 
                { 
                    Thread.Sleep(500);                              // Get data chunks from some source 
                    Monitor.Wait(m_lock);                           // Wait if the thread is busy. 'wait' will hold this loop until something else pulses it to release the wait. 
                    Console.WriteLine("producer {0}", counter); 
                    m_queue.Enqueue(counter); 
                    Monitor.Pulse(m_lock);                          // Releases consumer thread 

                    counter++; 
                } 
            } 
        } 

        public void Consumer() 
        { 
            lock (m_lock)                                           // Allows only one thread at a time inside m_lock 
            { 
                Monitor.Pulse(m_lock); 

                while (Monitor.Wait(m_lock, 1000))                  // Wait in the loop while producer is busy. Exit when producer times-out. 1000 = 1 second; app will hang without this time-out value 
                { 
                    int data = m_queue.Dequeue(); 
                    Console.WriteLine("consumer {0}", data); 
                    Monitor.Pulse(m_lock);                          // Release consumer 
                } 
            } 
        } 
    } 

    class Program 
    { 
        static void Main(string[] args) 
        { 
            ProducerConsumer app = new ProducerConsumer(); 

            // Create 2 threads 
            Thread t_producer = new Thread(new ThreadStart(app.Producer)); 
            Thread t_consumer = new Thread(new ThreadStart(app.Consumer)); 

            // Start threads 
            t_producer.Start(); 
            t_consumer.Start(); 

            // Waith for the threads to complete 
            t_producer.Join(); 
            t_consumer.Join(); 

            Console.WriteLine("\nPress any key to complete the program.\n"); 
            Console.ReadKey(false); 
        } 
    } 
} 

/* 
 * Will print: 

producer 
consumer 
producer 
consumer 
producer 
consumer 

Press any key to complete the program. 
*/
You can download project from here.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: