In this example we show inter-thread communication between producer and consumer threads using condition_variable. In the end two threads will ping-pong passing control to each other.
Code can be modified in such a way that producer will read and queue chunks of data from some source while consumer thread will process data from the queue. For example, we could be reading some data from a network stream in chunks adding them to a queue while another thread will be reading and removing data one chunk at a time from the queue as long as the queue is not empty, otherwise it will block until new data is queued. In that scenario, ping-ponging will not be required as consumer should be processing data chunks independently of producer queuing it in.
The Standard C++ 11 Library provides two implementations of a condition variable: condition_variable and condition_variable_any, – both declared in <condition_variable> header file. condition_variable_any is a more general implementation of two and could incur some overhead compared to its brethren, it should be avoided unless additional flexibility is required. condition_variable must be used in conjunction with a mutex in order to provide synchronization while condition_variable_any does not have such restriction (it still requires some ‘mutex-like’ synchronization primitive).
First, let’s create a producer thread which is responsible for reading data from some data source, a network stream for example, and queuing it for another thread to process it.
thread producer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { //. . . lock_guard<mutex> lock(m_mutex); //m_queue.push(i); m_isNotified = true; m_alarm.notify_one(); //. . . });
We can now create a consumer thread which first has to lock the mutex. Unlike in producer, we have to use unique_lock and not lock_guard because the waiting consumer thread must unlock the mutex when it receives notification and then lock it again. lock_guard does not allow that.
thread consumer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { while (m_haveData) { unique_lock<mutex> lock(m_mutex); while (!m_isNotified) { m_alarm.wait(lock); } //. . . } });
while (!m_isNotified) { m_alarm.wait(lock); }
Here’s full source code listing:
//----------------------------------------------------------------------------- // File: Program.h // // Desc: Demonstrates inter-thread communication using condition variables. // Producer and consumer threads will ping-pong between each other. // Code can be modified in such a way that producer will read and queue // chunks of data from some source, network stream for example, while // consumer thread will process data from the queue. In that scenario, // ping-ponging will not be required as consumer should be disposing of data // independently of producer queuing it. // //----------------------------------------------------------------------------- #include <chrono> #include <condition_variable> #include <iostream> #include <mutex> #include <queue> #include <thread> using std::cout; using std::endl; using std::condition_variable; using std::lock_guard; using std::unique_lock; using std::mutex; using std::queue; using std::thread; using std::chrono::milliseconds; using std::this_thread::sleep_for; static const int MagicNumber = 30; // Magic number used for the sample, remove it for production code int main() { condition_variable m_alarm; // Notifies threads that more work is available mutex m_mutex; // Synchronizes access to shared variables queue<int> m_queue; // Accumulates data chunks bool m_isNotified = false; // This is a guard to prevent accidental spurious wakeups bool m_haveData = true; // Only used for this sample to end consumer thread, not required in production code thread producer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { for (int i = 0; i < MagicNumber; ++i) { sleep_for(milliseconds(500)); // Executing some long operation lock_guard<mutex> lock(m_mutex); // Enter critical section cout << "producer " << i << endl; m_queue.push(i); // Add data chunk to the queue m_isNotified = true; // Consumer can be woken up and it is not a fluke (see spurious wakeups) m_alarm.notify_one(); // Notify consumer } lock_guard<mutex> lock(m_mutex); // Work is done, app can exit m_isNotified = true; m_haveData = false; m_alarm.notify_one(); }); thread consumer([&m_mutex, &m_queue, &m_alarm, &m_isNotified, &m_haveData]() { while (m_haveData) // In production, this check will be done on whether there is more data in the queue { unique_lock<mutex> lock(m_mutex); // Must aquire unique_lock with condition variables while (!m_isNotified) // Prevents from spurious wakeup { m_alarm.wait(lock); // Wait for a signal from producer thread } while (!m_queue.empty()) // Process data and remove it from the queue { cout << "consumer " << m_queue.front() << endl; m_queue.pop(); } m_isNotified = false; // Protect from spurious wakeup } }); // Join threads and finish app producer.join(); consumer.join(); return 0; }
- Project is available here.