This code is from the chapter 4 of the C++ Concurrency in Action book by Anthony Williams and can be downloaded directly from the book companion site. I made some minor changes and compiled it in Visual Studio 2013. Making the code available on MSDN may help many who learn multithreading in C++.
Code to chapter 3 of the book explaining basic thread synchronization can be downloaded here.
Solution has 14 projects showing how to communicate between threads while preventing data from corruption. Please find detailed explanations in the book. Here’s a brief overview..
First example shows how to make one thread wait for data to arrive on another thread using condition variable. To pass the data between the threads this example is using a queue. When the data is ready, the first thread (producer) acquires lock using lock_guard and pushes data onto the queue. It then notifies waiting thread by calling notify_one of the condition variable.
The waiting thread acquires its own lock using unique_lock and then calls wait passing in the lambda expression that describes what it is waiting for.
// prepares data for consumption void producer() { while (more_data_to_prepare()) { auto const data = prepare_data(); lock_guard<mutex> lk(m_mutex); m_queue.push(data); m_condition.notify_one(); // notify consumer if there is one } } // consumes data and does useful work void consumer() { while (true) { unique_lock<mutex> lk(m_mutex); m_condition.wait(lk, []{ return !m_queue.empty(); }); auto data = m_queue.front(); m_queue.pop(); lk.unlock(); process(data); } } // stop debugging manually because c1 will wait for-ever.. int main() { thread c1(consumer); thread t1(producer); thread t2(producer); t1.join(); t2.join(); c1.join(); }
template < typename T, typename Container = deque<T> > class queue { public: explicit queue(const Container&); explicit queue(Container&& = Container()); queue(queue&& q); template <class Alloc> explicit queue(const Alloc&); template <class Alloc> queue(const Container&, const Alloc&); template <class Alloc> queue(Container&&, const Alloc&); template <class Alloc> queue(queue&&, const Alloc&); queue& operator=(queue&& q); void swap(queue&& q); bool empty() const; size_t size() const; T& front(); const T& front() const; T& back(); const T& back() const; void push(const T& x); void push(T&& x); void pop(); };
template <typename T> class threadsafe_queue { public: threadsafe_queue(); threadsafe_queue(const threadsafe_queue&); threadsafe_queue& operator=(const threadsafe_queue&) = delete; void push(T value); bool try_pop(T& value); shared_ptr<T> try_pop(); void wait_and_pop(T& value); shared_ptr<T> wait_and_pop(); bool empty() const; };
template <typename T> class threadsafe_queue { public: void push(T value) { lock_guard<mutex> lk(m_mutex); m_queue.push(value); m_condition.notify_one(); } void wait_and_pop(T& value) { unique_lock<mutex> lk(m_mutex); m_condition.wait(lk, [this]{ return !m_queue.empty(); }); value = m_queue.front(); m_queue.pop(); } private: condition_variable m_condition; mutex m_mutex; queue<T> m_queue; }; struct data_chunk { }; data_chunk prepare_data (){ return data_chunk(); }; bool more_data_to_prepare (){ return false; }; void process (data_chunk){}; bool is_last_chunk (data_chunk){ return true; }; threadsafe_queue<data_chunk> data_queue; void producer() { while (more_data_to_prepare()) { data_chunk const data = prepare_data(); data_queue.push(data); } } void consumer() { while (true) { data_chunk data; data_queue.wait_and_pop(data); process(data); if (is_last_chunk(data)) { break; } } }
template <typename T> class threadsafe_queue { public: threadsafe_queue() { } threadsafe_queue(threadsafe_queue const& other) { lock_guard<mutex> lk(other.m_mutex); m_queue = other.m_queue; } void push(T value) { lock_guard<mutex> lk(m_mutex); m_queue.push(value); m_condition.notify_one(); } void wait_and_pop(T& value) { unique_lock<mutex> lk(m_mutex); m_condition.wait(lk, [this]{ return !m_queue.empty(); }); value = m_queue.front(); m_queue.pop(); } shared_ptr<T> wait_and_pop() { unique_lock<mutex> lk(m_mutex); m_condition.wait(lk, [this]{ return !m_queue.empty(); }); shared_ptr<T> value(make_shared<T>(m_queue.front())); return value; } bool try_pop(T& value) { lock_guard<mutex> lk(m_mutex); if (m_queue.empty()) { return false; } value = m_queue.front(); m_queue.pop(); } shared_ptr<T> try_pop() { lock_guard<mutex> lk(m_mutex); if (m_queue.empty()) { return shared_ptr<T>(); } shared_ptr<T> value(make_shared<T>(m_queue.front())); m_queue.pop(); return value; } bool empty() const { lock_guard<mutex> lk(m_mutex); return m_queue.empty(); } private: condition_variable m_condition; mutable mutex m_mutex; queue<T> m_queue; };
int find_answer_to_ltuae() { milliseconds delay(30); sleep_for(delay); return 42; } void do_other_stuff() { } int main() { auto answer = async(find_answer_to_ltuae); do_other_stuff(); wstring value = L" answer is: " + to_wstring(answer.get()) + L"\n"; }
struct X { void foo(int, string const&){} string bar(string const&){ return "some text"; } }; class move_only { public: move_only(){} move_only(move_only&&){} move_only(move_only const&){} move_only& operator=(move_only&&){} move_only& operator=(move_only const&) = delete; void operator()(){}; }; int main() { X x; auto f1 = async(&X::foo, &x, 42, "hello"); auto f2 = async(&X::bar, x, "goodbye"); struct Y { double operator()(double){ return 6.67; }; }; Y y; auto f3 = async(Y(), 3.141); auto f4 = async(ref(y), 2.718); auto f6 = async(move_only()); auto f7 = async(launch::async, Y(), 1.2); }
template <typename T> list<T> sequential_quick_sort(list<T> input) { if (input.empty()) { return input; } list<T> result; result.splice(result.begin(), input, input.begin()); T const& pivot = *result.begin(); auto midPoint = partition(input.begin(), input.end(), [&](T const& t){ return t<pivot>; }); list<T> lowerPart; lowerPart.splice(lowerPart.end(), input, input.begin(), midPoint); auto newLower (sequential_quick_sort(move(lowerPart))); auto newHigher(sequential_quick_sort(move(input))); result.splice(result.end(), newHigher); result.splice(result.begin(), newLower); return result; }
template <typename T> list<T> parallel_quick_sort(list<T> input) { if (input.empty()) { return input; } list<T> result; result.splice(result.begin(), input, input.begin()); T const& pivot = *result.begin(); auto midPoint = partition(input.begin(), input.end(), [&](T const& t){ return t<pivot>; }); list<T> lowerPart; lowerPart.splice(lowerPart.end(), input, input.begin(), midPoint); future< list<T> > newLower(async(¶llel_quick_sort<T>, move(lowerPart))); auto newHigher(parallel_quick_sort(move(input))); result.splice(result.end(), newHigher); result.splice(result.begin(), newLower.get()); return result; }
Code for this article can be downloaded here. Memory model is described here