Producer Consumer inter-thread communication using condition variables in C++

In this example we show inter-thread communication between producer and consumer threads using condition_variable.  In the end two threads will ping-pong passing control to each other.

Code can be modified in such a way that producer will read and queue chunks of data from some source while consumer thread will process data from the queue. For example, we could be reading some data from a network stream in chunks adding them to a queue while another thread will be reading and removing data one chunk at a time from the queue as long as the queue is not empty, otherwise it will block until new data is queued. In that scenario, ping-ponging will not be required as consumer should be processing data chunks independently of producer queuing it in.

The Standard C++ 11 Library provides two implementations of a condition variable: condition_variable and condition_variable_any, – both declared in <condition_variable> header file. condition_variable_any is a  more general implementation of two and could incur some overhead compared to its brethren, it should be avoided unless additional flexibility is required. condition_variable must be used in conjunction with a mutex in order to provide synchronization while condition_variable_any does not have such restriction (it still requires some ‘mutex-like’ synchronization primitive).

First, let’s create a producer thread which is responsible for reading data from some data source, a network stream for example, and queuing it for another thread to process it.

thread producer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { 
        //. . . 
        lock_guard<mutex> lock(m_mutex); 
        //m_queue.push(i);                 
        m_isNotified = true;             
        m_alarm.notify_one();             
        //. . . 
});

We can now create a consumer thread which first has to lock the mutex. Unlike in producer, we have to use unique_lock and not lock_guard because the waiting consumer thread must unlock the mutex when it receives notification and then lock it again. lock_guard does not allow that.

thread consumer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { 
    while (m_haveData)                         
    { 
        unique_lock<mutex> lock(m_mutex);     

        while (!m_isNotified)                 
        { 
            m_alarm.wait(lock);                 
        } 

        //. . . 
    } 
});
Note that in the consumer thread we are guarding against spurious wakeup in the following loop, please read about it at the provided link:
while (!m_isNotified)     
{ 
    m_alarm.wait(lock);     
}

Here’s full source code listing:

//----------------------------------------------------------------------------- 
// File: Program.h 
// 
// Desc: Demonstrates inter-thread communication using condition variables. 
//       Producer and consumer threads will ping-pong between each other. 
//       Code can be modified in such a way that producer will read and queue  
//         chunks of data from some source, network stream for example, while 
//         consumer thread will process data from the queue. In that scenario, 
//         ping-ponging will not be required as consumer should be disposing of data 
//         independently of producer queuing it. 
// 
//----------------------------------------------------------------------------- 

#include <chrono> 
#include <condition_variable> 
#include <iostream> 
#include <mutex> 
#include <queue> 
#include <thread> 

using std::cout; 
using std::endl; 
using std::condition_variable; 
using std::lock_guard; 
using std::unique_lock; 
using std::mutex; 
using std::queue; 
using std::thread; 
using std::chrono::milliseconds; 
using std::this_thread::sleep_for; 

static const int MagicNumber = 30;                            // Magic number used for the sample, remove it for production code 

int main() 
{ 
    condition_variable    m_alarm;                            // Notifies threads that more work is available 
    mutex                m_mutex;                            // Synchronizes access to shared variables 
    queue<int>            m_queue;                            // Accumulates data chunks 
    bool                m_isNotified = false;                // This is a guard to prevent accidental spurious wakeups 
    bool                m_haveData   = true;                // Only used for this sample to end consumer thread, not required in production code 

    thread producer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { 
        for (int i = 0; i < MagicNumber; ++i) 
        { 
            sleep_for(milliseconds(500));                    // Executing some long operation 
            lock_guard<mutex> lock(m_mutex);                // Enter critical section 
            cout << "producer " << i << endl; 
            m_queue.push(i);                                // Add data chunk to the queue 
            m_isNotified = true;                            // Consumer can be woken up and it is not a fluke (see spurious wakeups) 
            m_alarm.notify_one();                            // Notify consumer 
        } 

        lock_guard<mutex> lock(m_mutex);                    // Work is done, app can exit 
        m_isNotified = true; 
        m_haveData = false; 
        m_alarm.notify_one(); 
    }); 

    thread consumer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { 
        while (m_haveData)                                    // In production, this check will be done on whether there is more data in the queue 
        { 
            unique_lock<mutex> lock(m_mutex);                // Must aquire unique_lock with condition variables 

            while (!m_isNotified)                            // Prevents from spurious wakeup 
            { 
                m_alarm.wait(lock);                            // Wait for a signal from producer thread 
            } 

            while (!m_queue.empty())                        // Process data and remove it from the queue 
            { 
                cout << "consumer " << m_queue.front() << endl; 
                m_queue.pop(); 
            } 

            m_isNotified = false;                            // Protect from spurious wakeup 
        } 
    }); 

    // Join threads and finish app 
    producer.join(); 
    consumer.join(); 

    return 0; 
}
Project is available here.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: