A Thread Pool with C++11

After showing a simple thread pool with Boost.Asio in the last post i’m going to have a look at doing the same thing with the threading facilities in C++11. The biggest difference is that we don’t have the Asio library so we have to reproduce the relevant functionality ourselves.
The declarations remain mostly the same except that the ThreadPool class doesn’t have the io_service members anymore but instead has a deque and synchronization primitives that we will use instead:

#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool;
 
// our worker thread objects
class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};
 
// the actual thread pool
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;

    // the task queue
    std::deque< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

Previously the Worker threads simply ran the io_service. Now they are where most of the magic happens. The most important part here is the condition_variable which is used to make the thread “sleep” when there are no jobs and wake it up when there are new jobs added to the queue. When calling condition_variable::wait with a lock the lock is released and the thread is suspended. When condition_variable::notify_one or condition_variable::notify_all is called one or all waiting threads are woken up and reacquire the lock.

void Worker::operator()()
{
    std::function<void()> task;
    while(true)
    {
        {   // acquire lock
            std::unique_lock<std::mutex> 
                lock(pool.queue_mutex);
            
            // look for a work item
            while(!pool.stop && pool.tasks.empty())
            { // if there are none wait for notification
                pool.condition.wait(lock);
            }

            if(pool.stop) // exit if the pool is stopped
                return;

            // get the task from the queue
            task = pool.tasks.front();
            pool.tasks.pop_front();

        }   // release lock

        // execute the task
        task();
    }
}

Constructor and destructor mostly remain the same. The destructor now uses notify_all to make sure any suspended threads see that the stop flag is set.

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.push_back(std::thread(Worker(*this)));
}
  
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    // stop all threads
    stop = true;
    condition.notify_all();
    
    // join them
    for(size_t i = 0;i<workers.size();++i)
        workers[i].join();
}

Finally the enqueue function just locks the queue, adds a task to it and wakes up one thread in case any thread was suspended.

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    { // acquire lock
        std::unique_lock<std::mutex> lock(queue_mutex);
        
        // add the task
        tasks.push_back(std::function<void()>(f));
    } // release lock
    
    // wake up one thread
    condition.notify_one();
}

The interface of the ThreadPool is unchanged, so the usage example from the last blog post still works. This version of the ThreadPool is slightly longer than the version with Boost.Asio but actually still relatively short for what it does and reduces the boost dependencies since we now don’t have to link boost libraries anymore.

7 thoughts on “A Thread Pool with C++11

  1. Hi,

    thanks for posting this. Just wanted to tell you that there is a small error in your code though. You need to lock the queue_mutex before setting stop to true. Otherwise, the worker threads might end up missing the notification to terminate.

    Best,
    Frank

      • Hey,

        consider this:
        you have one worker thread. It is sitting right past the line
        “while(!pool.stop && pool.tasks.empty())”
        so it is inside the for loop. Now the thread that owns the ThreadPool wants to destroy the ThreadPool object and calls the destructor. So now it calls:
        “stop = true;
        condition.notify_all();”
        Now the worker thread continues. It will now do
        “pool.condition.wait(lock);”
        Since this wait happens after the notify, the thread misses the notify_all! But there won’t be another one thus the worker thread will never terminate and the other thread joins forever in the destructor!

        Hope that clarifies it.

  2. Hi. Thanks for this code, but I’m getting an error in the class definition using gcc-4.6.3 in Ubuntu 12.04:
    g++ -c -Wall -std=c++0x -lpthread myclass.cpp -o myClass.o
    threadPool.h:22:25: error: ‘void ThreadPool::enqueue(F) [with F = main(int, char**)::]’, declared using local type ‘main(int, char**)::’, is used but never defined [-fpermissive]

    Any suggestions would be much appreciated.

  3. Hi Jakob,
    How to avoid the main thread exit before the working threads start in your implementation. For example.

    void f1(){}
    void f2(){}

    int main(int argc, char* argv[])
    {
    ThreadPool tp(2);
    tp.enqueue(&fl);
    tp.enqueue(&f2);
    }

    the main thread will exit before f1 or f2 get any chance to be executed.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>