A Small C++11 ThreadPool Update

Over on github I uploaded a more advanced version of the ThreadPool class from the Thread Pool with C++11 entry. It allows to get std::futures as return values of the enqueue method.

Example usage:

    ThreadPool pool(4);
    
    std::future<std::string> str = 
        pool.enqueue<std::string>(
        []()
        {
            return "hello world";
        }
    );
    
    std::future<int> x = 
        pool.enqueue<int>(
        []()
        {
            return 42;
        }
    );
    
    std::cout << str.get() << ' ' 
              << x.get() << std::endl;

A Thread Pool with C++11

After showing a simple thread pool with Boost.Asio in the last post i’m going to have a look at doing the same thing with the threading facilities in C++11. The biggest difference is that we don’t have the Asio library so we have to reproduce the relevant functionality ourselves.
The declarations remain mostly the same except that the ThreadPool class doesn’t have the io_service members anymore but instead has a deque and synchronization primitives that we will use instead:

#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool;
 
// our worker thread objects
class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};
 
// the actual thread pool
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;

    // the task queue
    std::deque< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

Previously the Worker threads simply ran the io_service. Now they are where most of the magic happens. The most important part here is the condition_variable which is used to make the thread “sleep” when there are no jobs and wake it up when there are new jobs added to the queue. When calling condition_variable::wait with a lock the lock is released and the thread is suspended. When condition_variable::notify_one or condition_variable::notify_all is called one or all waiting threads are woken up and reacquire the lock.

void Worker::operator()()
{
    std::function<void()> task;
    while(true)
    {
        {   // acquire lock
            std::unique_lock<std::mutex> 
                lock(pool.queue_mutex);
            
            // look for a work item
            while(!pool.stop && pool.tasks.empty())
            { // if there are none wait for notification
                pool.condition.wait(lock);
            }

            if(pool.stop) // exit if the pool is stopped
                return;

            // get the task from the queue
            task = pool.tasks.front();
            pool.tasks.pop_front();

        }   // release lock

        // execute the task
        task();
    }
}

Constructor and destructor mostly remain the same. The destructor now uses notify_all to make sure any suspended threads see that the stop flag is set.

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.push_back(std::thread(Worker(*this)));
}
  
// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    // stop all threads
    stop = true;
    condition.notify_all();
    
    // join them
    for(size_t i = 0;i<workers.size();++i)
        workers[i].join();
}

Finally the enqueue function just locks the queue, adds a task to it and wakes up one thread in case any thread was suspended.

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    { // acquire lock
        std::unique_lock<std::mutex> lock(queue_mutex);
        
        // add the task
        tasks.push_back(std::function<void()>(f));
    } // release lock
    
    // wake up one thread
    condition.notify_one();
}

The interface of the ThreadPool is unchanged, so the usage example from the last blog post still works. This version of the ThreadPool is slightly longer than the version with Boost.Asio but actually still relatively short for what it does and reduces the boost dependencies since we now don’t have to link boost libraries anymore.

A Thread Pool with Boost.Threads and Boost.Asio

After spending some time being frustrated with the C++11 async/future stuff (not sure if i’m too dumb or the implementations aren’t that great yet), I dug up some old code and found this nice way of doing a thread pool with Boost.Threads (obviously) and Boost.Asio. Since the code is actually pretty short for what it does I’ll just dump it here:

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>

class ThreadPool;

// our worker thread objects
class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool; 
};

// the actual thread pool
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F>
    void enqueue(F f);
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::unique_ptr<boost::thread> > workers;
    
    // the io_service we are wrapping
    boost::asio::io_service service;
    boost::asio::io_service::work working;
    friend class Worker;
};

// all the workers do is execute the io_service
void Worker::operator()() { pool.service.run(); }

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t threads) : working(service)
{
    for(size_t i = 0;i<threads;++i)
        workers.push_back(
            std::unique_ptr<boost::thread>(
                new boost::thread(Worker(*this))
            )
        );
}

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f)
{
    service.post(f);
}

// the destructor joins all threads
ThreadPool::~ThreadPool()
{
    service.stop();
    for(size_t i = 0;i<workers.size();++i)
        workers[i]->join();
}

It’s essentially a wrapper around a io_service. The usage then looks something like this:

// create a thread pool of 4 worker threads
ThreadPool pool(4);

// queue a bunch of "work items"
for(int i = 0;i<8;++i)
{
    pool.enqueue([i]
    {
        std::cout << "hello " << i << std::endl;
        boost::this_thread::sleep(
            boost::posix_time::milliseconds(1000)
        );
        std::cout << "world " << i << std::endl;
    });
}

which produces a funny mixture of garbled output clearly showing that the lambdas are executed in parallel.

Boosts MPI and serialization

Since my project is going to involve MPI parallelization and there is a Boost.MPI library I’ll of course have a look at it and most likely also use it (I really like Boost). One of the main advantages of using Boost.MPI over MPIs C interface is that it is typesafe, more expressive and generally nice to use. Instead of packaging data by hand into buffers before sending them we can have Boost.Serialization handle this task for us.

Let’s say we have the following structs that represent one and multidimensional Ranges:

template<class T>
struct Range {
   T begin, end;
};
template<class T, int Dim>
struct NRange {
   Range<T>& operator[](int i)
   { return ranges[i]; }
   const Range<T>& operator[](int i) const
   { return ranges[i]; }
private:
   Range<T> ranges[Dim];
};

To make them serializable we could put a serialize function as required by Boost.Serialization into their definitions. But since they are already in their own headers etc. that don’t depend on boost serialization we might want to make them serializable in a non intrusive way, which is luckily possible:

#include "Range.h"

namespace boost {
namespace serialization {

template<class Archive, class T>
void serialize(Archive & ar, Range<T> & r, const unsigned int version)
{
   ar & r.begin;
   ar & r.end;
}

template<class Archive, class T, int Dim>
void serialize(Archive & ar, NRange<T,Dim> & r, const unsigned int version)
{
   for(int i = 0;i<Dim;++i)
      ar & r[i];
}

} // namespace serialization
} // namespace boost

//...

Notice how we don’t even have to include any boost headers since serialize only takes template parameters, so we actually don’t even introduce any boost dependency here. So since the structs are now serializable we can just send them off with MPI:

//...

#include <iostream>
#include <boost/mpi.hpp>

int main(int argc, char* argv[])
{
   mpi::environment env(argc, argv);
   mpi::communicator world;

   if (world.rank() == 0)
   {
      NRange<int,3> range;
      range[0].begin = 23;
      range[0].end = 42;
      world.send(1, 0, range);
   } 
   else
   {
      NRange<int,3> range;
      world.recv(0, 0, range);
      std::cout << range[0].begin << ' ' << range[0].end;
   }
   return 0;
}

One of the core features of the framework I’m working are distributed grids. So naturally we probably want a way to send chunks of the grid to other nodes via MPI. A very simple implementation of a chunk class could look like this:

#include <vector>
#include <boost/serialization/vector.hpp>

//...

template<class T>
class Chunk {
public:
    Chunk() { }
    
    Chunk(const NRange<size_t, 3> &b)
    : bounds_(b), data_(b.volume())
    {
    }

    // subscript operators etc...

private:
    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & bounds_;
        ar & data_;
    }

    NRange<size_t, 3> bounds_;
    std::vector<T> data_;
};

Here we see the intrusive version of putting the serialize function right into class itself. The additional header we included along with vector provides serialization for std::vector so we don’t have to do that ourselves. There are a few additional traits we can define which for example allow bitwise copy in some cases or turn of the versioning of the serialization library, but essentially we can now send around Chunks without worrying about their packaging etc.