C++ Events using Condition Variables

This code is from the chapter 4 of the C++ Concurrency in Action book by Anthony Williams and can be downloaded directly from the book companion site. I made some minor changes and compiled it in Visual Studio 2013. Making the code available on MSDN may help many who learn multithreading in C++.

Code to chapter 3 of the book explaining basic thread synchronization can be downloaded here.

Solution has 14 projects showing how to communicate between threads while preventing data from corruption. Please find detailed explanations in the book. Here’s a brief overview..

First example shows how to make one thread wait for data to arrive on another thread using condition variable. To pass the data between the threads this example is using a queue. When the data is ready, the first thread (producer) acquires lock using lock_guard and pushes data onto the queue. It then notifies waiting thread by calling notify_one of the condition variable.

The waiting thread acquires its own lock using unique_lock and then calls wait passing in the lambda expression that describes what it is waiting for.

// prepares data for consumption
void producer() 
{ 
    while (more_data_to_prepare()) 
    { 
        auto const data = prepare_data(); 

        lock_guard<mutex> lk(m_mutex); 

        m_queue.push(data); 
        m_condition.notify_one();                // notify consumer if there is one 
    } 
} 

// consumes data and does useful work 
void consumer() 
{ 
    while (true) 
    { 
        unique_lock<mutex> lk(m_mutex); 
        m_condition.wait(lk, []{ return !m_queue.empty(); }); 

        auto data = m_queue.front(); 
        m_queue.pop(); 
        lk.unlock(); 

        process(data); 
    } 
} 

// stop debugging manually because c1 will wait for-ever.. 
int main() 
{ 
    thread c1(consumer); 
    thread t1(producer); 
    thread t2(producer); 

    t1.join(); 
    t2.join(); 
    c1.join(); 
}
Second example is queue implementation from the Standard Library. It is provided to demonstrate how to build your own containers.
template < typename T, typename Container = deque<T> > 
class queue 
{ 
public: 
    explicit queue(const Container&); 
    explicit queue(Container&& = Container()); 
    queue(queue&& q); 

    template <class Alloc> explicit queue(const Alloc&); 
    template <class Alloc> queue(const Container&, const Alloc&); 
    template <class Alloc> queue(Container&&, const Alloc&); 
    template <class Alloc> queue(queue&&, const Alloc&); 

    queue& operator=(queue&& q); 

    void swap(queue&& q); 

    bool empty() const; 
    size_t size() const; 

    T& front(); 
    const T& front() const; 
    T& back(); 
    const T& back() const; 

    void push(const T& x); 
    void push(T&& x); 
    void pop(); 
};
The following threadsafe_queue implementation is a simplified example of such a container.
template <typename T>
class threadsafe_queue 
{ 
public: 
    threadsafe_queue(); 
    threadsafe_queue(const threadsafe_queue&); 

    threadsafe_queue& operator=(const threadsafe_queue&) = delete; 

    void push(T value); 

    bool try_pop(T& value); 
    shared_ptr<T> try_pop(); 

    void wait_and_pop(T& value); 
    shared_ptr<T> wait_and_pop(); 

    bool empty() const; 
};
Expanding threadsafe_queue interface, we can wrap mutex and condition variable within the implementation to hide them from the callers.
template <typename T> 
class threadsafe_queue 
{ 
public: 
    void push(T value) 
    { 
        lock_guard<mutex> lk(m_mutex); 
        m_queue.push(value); 
        m_condition.notify_one(); 
    } 

    void wait_and_pop(T& value) 
    { 
        unique_lock<mutex> lk(m_mutex); 
        m_condition.wait(lk, [this]{ return !m_queue.empty(); }); 
        value = m_queue.front(); 
        m_queue.pop(); 
    } 

private: 
    condition_variable    m_condition; 
    mutex                m_mutex; 
    queue<T>            m_queue; 
}; 

struct data_chunk 
{ 
}; 

data_chunk    prepare_data            (){ return data_chunk(); }; 
bool        more_data_to_prepare    (){ return false; }; 
void        process                    (data_chunk){}; 
bool        is_last_chunk            (data_chunk){ return true; }; 

threadsafe_queue<data_chunk> data_queue; 

void producer() 
{ 
    while (more_data_to_prepare()) 
    { 
        data_chunk const data = prepare_data(); 
        data_queue.push(data); 
    } 
} 

void consumer() 
{ 
    while (true) 
    { 
        data_chunk data; 
        data_queue.wait_and_pop(data); 
        process(data); 

        if (is_last_chunk(data)) 
        { 
            break; 
        } 
    } 
}
Fifth example shows final implementation of the threadsafe_queue. Note that mutex here is marked as mutable. It is due to a fact that locking mutex is a mutating operation and will be necessary to use it for acquiring locks in the constructor and in empty function.
template <typename T> 
class threadsafe_queue 
{ 
public: 
    threadsafe_queue() 
    { 
    } 

    threadsafe_queue(threadsafe_queue const& other) 
    { 
        lock_guard<mutex> lk(other.m_mutex); 
        m_queue = other.m_queue; 
    } 

    void push(T value) 
    { 
        lock_guard<mutex> lk(m_mutex); 
        m_queue.push(value); 
        m_condition.notify_one(); 
    } 

    void wait_and_pop(T& value) 
    { 
        unique_lock<mutex> lk(m_mutex); 
        m_condition.wait(lk, [this]{ return !m_queue.empty(); }); 
        value = m_queue.front(); 
        m_queue.pop(); 
    } 

    shared_ptr<T> wait_and_pop() 
    { 
        unique_lock<mutex> lk(m_mutex); 
        m_condition.wait(lk, [this]{ return !m_queue.empty(); }); 
        shared_ptr<T> value(make_shared<T>(m_queue.front())); 
        return value; 
    } 

    bool try_pop(T& value) 
    { 
        lock_guard<mutex> lk(m_mutex); 

        if (m_queue.empty()) 
        { 
            return false; 
        } 

        value = m_queue.front(); 
        m_queue.pop(); 
    } 

    shared_ptr<T> try_pop() 
    { 
        lock_guard<mutex> lk(m_mutex); 

        if (m_queue.empty()) 
        { 
            return shared_ptr<T>(); 
        } 

        shared_ptr<T> value(make_shared<T>(m_queue.front())); 
        m_queue.pop(); 
        return value; 
    } 

    bool empty() const 
    { 
        lock_guard<mutex> lk(m_mutex); 
        return m_queue.empty(); 
    } 

private: 
    condition_variable    m_condition; 
    mutable mutex        m_mutex; 
    queue<T>            m_queue; 
};
In the example 6 we use std::async to start an asynchronous task which immediately returns to us std::future object allowing the main thread to continue execution. When we need the data of the asynchronous calculation we call get() on the future which will block the thread until the asynchronous operation completes.
int find_answer_to_ltuae() 
{ 
    milliseconds delay(30); 
    sleep_for(delay); 

    return 42; 
} 

void do_other_stuff() 
{ 
} 

int main() 
{ 
    auto answer = async(find_answer_to_ltuae); 
    do_other_stuff(); 

    wstring value = L"   answer is: " + to_wstring(answer.get()) + L"\n"; 
}
Example 7 shows how to pass arguments to a function using std::async.
struct X 
{ 
    void foo(int, string const&){} 
    string bar(string const&){ return "some text"; } 
}; 

class move_only 
{ 
public: 
    move_only(){} 
    move_only(move_only&&){} 
    move_only(move_only const&){} 

    move_only& operator=(move_only&&){} 
    move_only& operator=(move_only const&) = delete; 

    void operator()(){}; 
}; 

int main() 
{ 
    X x; 
    auto f1 = async(&X::foo, &x, 42, "hello"); 
    auto f2 = async(&X::bar, x, "goodbye"); 

    struct Y 
    { 
        double operator()(double){ return 6.67; }; 
    }; 

    Y y; 
    auto f3 = async(Y(), 3.141); 
    auto f4 = async(ref(y), 2.718); 
    auto f6 = async(move_only()); 
    auto f7 = async(launch::async, Y(), 1.2); 
}
I will skip a few examples here explanations for which you can find in the book and go to #12: quick sort. Here the function starts with getting a reference to the first element in the list which will be used as a pivot for future comparison. It then uses std::partition to divide elements into those greater and less than the pivot. Then two halves are sorted independently using recursion and finally pieced together.
template <typename T> 
list<T> sequential_quick_sort(list<T> input) 
{ 
    if (input.empty()) 
    { 
        return input; 
    } 

    list<T> result; 
    result.splice(result.begin(), input, input.begin()); 

    T const& pivot = *result.begin(); 

    auto midPoint = partition(input.begin(), input.end(), [&](T const& t){ return t<pivot>; }); 

    list<T> lowerPart; 
    lowerPart.splice(lowerPart.end(), input, input.begin(), midPoint); 

    auto newLower (sequential_quick_sort(move(lowerPart))); 
    auto newHigher(sequential_quick_sort(move(input))); 

    result.splice(result.end(), newHigher); 
    result.splice(result.begin(), newLower); 

    return result; 
}
Example 13 is a parallel version of quick sort. It is almost identical to 12 except that the lower half of values is sorted on another thread using std::async.
template <typename T> 
list<T> parallel_quick_sort(list<T> input) 
{ 
    if (input.empty()) 
    { 
        return input; 
    } 

    list<T> result; 
    result.splice(result.begin(), input, input.begin()); 

    T const& pivot = *result.begin(); 
    auto midPoint = partition(input.begin(), input.end(), [&](T const& t){ return t<pivot>; }); 

    list<T> lowerPart; 
    lowerPart.splice(lowerPart.end(), input, input.begin(), midPoint); 

    future< list<T> > newLower(async(&parallel_quick_sort<T>, move(lowerPart))); 

    auto newHigher(parallel_quick_sort(move(input))); 

    result.splice(result.end(), newHigher); 
    result.splice(result.begin(), newLower.get()); 

    return result; 
} 

Code for this article can be downloaded here. Memory model is described here

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: