After showing a simple thread pool with Boost.Asio in the last post i’m going to have a look at doing the same thing with the threading facilities in C++11. The biggest difference is that we don’t have the Asio library so we have to reproduce the relevant functionality ourselves.
The declarations remain mostly the same except that the ThreadPool class doesn’t have the io_service members anymore but instead has a deque and synchronization primitives that we will use instead:
#include <thread> #include <mutex> #include <condition_variable> class ThreadPool; // our worker thread objects class Worker { public: Worker(ThreadPool &s) : pool(s) { } void operator()(); private: ThreadPool &pool; }; // the actual thread pool class ThreadPool { public: ThreadPool(size_t); template<class F> void enqueue(F f); ~ThreadPool(); private: friend class Worker; // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::deque< std::function<void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; };
Previously the Worker threads simply ran the io_service. Now they are where most of the magic happens. The most important part here is the condition_variable
which is used to make the thread “sleep” when there are no jobs and wake it up when there are new jobs added to the queue. When calling condition_variable::wait
with a lock the lock is released and the thread is suspended. When condition_variable::notify_one
or condition_variable::notify_all
is called one or all waiting threads are woken up and reacquire the lock.
void Worker::operator()() { std::function<void()> task; while(true) { { // acquire lock std::unique_lock<std::mutex> lock(pool.queue_mutex); // look for a work item while(!pool.stop && pool.tasks.empty()) { // if there are none wait for notification pool.condition.wait(lock); } if(pool.stop) // exit if the pool is stopped return; // get the task from the queue task = pool.tasks.front(); pool.tasks.pop_front(); } // release lock // execute the task task(); } }
Constructor and destructor mostly remain the same. The destructor now uses notify_all
to make sure any suspended threads see that the stop flag is set.
// the constructor just launches some amount of workers ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i<threads;++i) workers.push_back(std::thread(Worker(*this))); } // the destructor joins all threads ThreadPool::~ThreadPool() { // stop all threads stop = true; condition.notify_all(); // join them for(size_t i = 0;i<workers.size();++i) workers[i].join(); }
Finally the enqueue function just locks the queue, adds a task to it and wakes up one thread in case any thread was suspended.
// add new work item to the pool template<class F> void ThreadPool::enqueue(F f) { { // acquire lock std::unique_lock<std::mutex> lock(queue_mutex); // add the task tasks.push_back(std::function<void()>(f)); } // release lock // wake up one thread condition.notify_one(); }
The interface of the ThreadPool is unchanged, so the usage example from the last blog post still works. This version of the ThreadPool is slightly longer than the version with Boost.Asio but actually still relatively short for what it does and reduces the boost dependencies since we now don’t have to link boost libraries anymore.