A Thread Pool with C++11

After showing a simple thread pool with Boost.Asio in the last post i’m going to have a look at doing the same thing with the threading facilities in C++11. The biggest difference is that we don’t have the Asio library so we have to reproduce the relevant functionality ourselves.
The declarations remain mostly the same except that the ThreadPool class doesn’t have the io_service members anymore but instead has a deque and synchronization primitives that we will use instead:

#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool;
 
// our worker thread objects
class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};
 
// the actual thread pool
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;

    // the task queue
    std::deque< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

Previously the Worker threads simply ran the io_service. Now they are where most of the magic happens. The most important part here is the condition_variable which is used to make the thread “sleep” when there are no jobs and wake it up when there are new jobs added to the queue. When calling condition_variable::wait with a lock the lock is released and the thread is suspended. When condition_variable::notify_one or condition_variable::notify_all is called one or all waiting threads are woken up and reacquire the lock.

void Worker::operator()()
{
    std::function<void()> task;
    while(true)
    {
        {   // acquire lock
            std::unique_lock<std::mutex> 
                lock(pool.queue_mutex);
            
            // look for a work item
            while(!pool.stop && pool.tasks.empty())
            { // if there are none wait for notification
                pool.condition.wait(lock);
            }

            if(pool.stop) // exit if the pool is stopped
                return;

            // get the task from the queue
            task = pool.tasks.front();
            pool.tasks.pop_front();

        }   // release lock

        // execute the task
        task();
    }
}

Constructor and destructor mostly remain the same. The destructor now uses notify_all to make sure any suspended threads see that the stop flag is set.

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.push_back(std::thread(Worker(*this)));
}
  
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    // stop all threads
    stop = true;
    condition.notify_all();
    
    // join them
    for(size_t i = 0;i<workers.size();++i)
        workers[i].join();
}

Finally the enqueue function just locks the queue, adds a task to it and wakes up one thread in case any thread was suspended.

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    { // acquire lock
        std::unique_lock<std::mutex> lock(queue_mutex);
        
        // add the task
        tasks.push_back(std::function<void()>(f));
    } // release lock
    
    // wake up one thread
    condition.notify_one();
}

The interface of the ThreadPool is unchanged, so the usage example from the last blog post still works. This version of the ThreadPool is slightly longer than the version with Boost.Asio but actually still relatively short for what it does and reduces the boost dependencies since we now don’t have to link boost libraries anymore.

16 thoughts on “A Thread Pool with C++11

  1. Hi,

    thanks for posting this. Just wanted to tell you that there is a small error in your code though. You need to lock the queue_mutex before setting stop to true. Otherwise, the worker threads might end up missing the notification to terminate.

    Best,
    Frank

      • Hey,

        consider this:
        you have one worker thread. It is sitting right past the line
        “while(!pool.stop && pool.tasks.empty())”
        so it is inside the for loop. Now the thread that owns the ThreadPool wants to destroy the ThreadPool object and calls the destructor. So now it calls:
        “stop = true;
        condition.notify_all();”
        Now the worker thread continues. It will now do
        “pool.condition.wait(lock);”
        Since this wait happens after the notify, the thread misses the notify_all! But there won’t be another one thus the worker thread will never terminate and the other thread joins forever in the destructor!

        Hope that clarifies it.

        • Frank is correct. But that’s not the only issue.

          Due to memory pipelining, there is no guarantee that ‘true’ will be written to the physical memory of the variable ‘false’ at the time the notification is sent out. It’s possible that it won’t be written until several cycles later.

          In which case, even if the worker thread ISN’T inside the for loop as Frank described, it would still be possible for the worker thread to miss the notification. (or rather, it would get the notification, but when it polled the ‘stop’ variable it would see an out of date value because ‘true’ hasn’t actually been written to memory yet).

          The only way to prevent that is to put stop’s memory access behind a memory barrier (typically a mutex lock or use of an std::atomic).

        • To fix this problem, one can use the version of wait that includes a predicate:

          workerCondition.wait( lock, [&]() {
          return !( workQueue.size() == 0 && !isShutdown );
          } );

      • The workers could read a stale (e.g., cached ) version of stop after it is been updated by the main thread. I think you need to protect it with std::atomic.

  2. Hi. Thanks for this code, but I’m getting an error in the class definition using gcc-4.6.3 in Ubuntu 12.04:
    g++ -c -Wall -std=c++0x -lpthread myclass.cpp -o myClass.o
    threadPool.h:22:25: error: ‘void ThreadPool::enqueue(F) [with F = main(int, char**)::]’, declared using local type ‘main(int, char**)::’, is used but never defined [-fpermissive]

    Any suggestions would be much appreciated.

  3. Hi Jakob,
    How to avoid the main thread exit before the working threads start in your implementation. For example.

    void f1(){}
    void f2(){}

    int main(int argc, char* argv[])
    {
    ThreadPool tp(2);
    tp.enqueue(&fl);
    tp.enqueue(&f2);
    }

    the main thread will exit before f1 or f2 get any chance to be executed.

  4. I don’t quite get what this line of code is supposed to do:

    workers.push_back(std::thread(Worker(*this)));

    I know it creates a new thread and loads it in the thread pool, but what are the parameters inside the std::thread? Does this also create an instance of a worker? Also, what is the Operator()() for? I’ve never seen anyone use anything like that before.

    • operator()() overloads the call operator. The first “()” indicates the operator to overload, the second is the argument list of the overload. So a object of type Worker can be called like it was a function (syntactically speaking). So we can then pass a Worker object to a thread constructor which the thread will start executing. In the updated version on github the Worker class is gone btw. and replaced by a lambda.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>