Documentation
The full documentation is also available as a pdf-file.
Implementation and Usage of a Thread Pool based on POSIX Threads
1 Introduction
Although distributed computing in the form of clusters gains more and more popularity, shared memory machines, especially with only a few number of processors, are still more often used as workstations or compute servers. In such a situation often the wish arises, that a serial program makes use of all processors in this machine. Sometimes an easy way of doing this is using threads, which represent parallel computation paths in a single process. A widely adopted interface to threads are POSIX Threads or Pthreads. Most computer systems supply a software library which implements the functions of this interface. Unfortunately the handling of threads is complicated and often distracts from the real problem to solve. Since most of the time, only a few of the Pthread functions are used, an easier interface is wanted. Also, because the creation of threads comes with some costs, using several hundreds of them might not be efficient. One way out of this situation is a thread pool. A thread pool consists of a fixed number, usually the same as the processor number, of threads and delegates incoming jobs to these threads in an efficient and easy way. This paper describes one way on how to implement such a thread pool and how to use it. Section 2 shows the implementation in the programming language C++. Typical examples for the usage are given in section 3. A basic knowledge about C++ and POSIX threads is assumed. For a good reference about Pthreads see [1]. The complete source code for the described implementation is freely available from [4].2 Implementation
A thread pool as implemented below was designed to allow at most p jobs to work in parallel. But it should also be able to handle more than p requests. By implementing these requirements, the thread pool is capable of working in two different kinds. In the first model, at most p jobs are started simultaneously, working in parallel and are finally synchronised at the same time by the calling thread. This working mode is usable if the load is already balanced between the processors and the thread pool is only used to simplify the starting of the threads. In the other mode an arbitrary number of jobs are given to the thread pool at arbitrary times and are synchronised by the calling thread at the same time. This kind of programming is especially helpful, if no load balancing is done, but the number of requests is much bigger than p and the work per request is more or less the same. The implementation is done in C++ and is inspired by the thread classes defined in the Java programming language (see e.g. [3]).2.1 Thread Class
The classTThread is implemented as a wrapper around the Pthread
functions. The interface is very similar to the thread classes in Java.
The member variables include the Pthread ID and attribute
(_thread_id and _thread_attr), a boolean variable
indicating whether the thread is running or not (_running) and an
integer to store a given thread number (_thread_no). This
thread number can be used to identify the local processor in the parallel
algorithms.
The member functions of TThread include methods to access the local
data and start or stop the thread. The special function run() is
abstract. It must be overloaded in derived classes and is called when the thread
is running.
The whole interface of TThread is listed in the following source.
class TThread {
protected:
pthread_t _thread_id; // ID of the thread
pthread_attr_t _thread_attr; // thread attributes
bool _running; // true if thread is running
int _thread_no; // opt. number of thread
public:
// constructor and destructor
TThread ( int p = -1 );
virtual TThread ();
// access local data
int thread_no () const { return _thread_no; }
void set_thread_no ( int p ) { _thread_no = p; }
bool on_proc ( int p ) const;
// user interface
virtual void run () = 0;
virtual void start ( bool d = false,
bool s = false ) { create( d, s ); }
virtual void stop () { cancel(); }
// thread management
void create ( bool d = false, bool s = false );
void join ();
void cancel ();
};
The default thread number of -1 can be used, if the algorithm needs no
identification of the local processor. If _thread_no is assign to
this value, the method on_proc(int) always returns true. In
any other case on_proc(int) only returns true if the given integer
equals the local thread number or the argument is -1.
A real POSIX thread is created by a call to create(bool,bool) or,
indirectly by start(bool,bool). The first boolean argument to both
functions determines, whether the thread should be started in a
joinable or detached state. If the thread is in the joinable
state, other threads can synchronise with the termination of the thread and gather
informations about the termination status. If the thread is detached, all
resources of the thread are freed after the termination. It is therefore not
possible to join with such a thread. By default all threads start in the
joinable state.
The second argument to create(bool,bool) and start(bool,bool)
sets the contention scope of the thread. The default is process
contention scope. On most systems, such threads are cheaper to create,
because they compete for resources (e.g. processor time) only among
themselves. If the argument is false, the scope is set to
system contention scope, where the threads compete against all threads
in the system. Although more costly when creating a thread, system contention
scope usually results in a more predictable behaviour of the runtime and a
better utilisation of system resources.
A termination request to finish the execution can be send to the thread by the
method cancel(). After the termination by cancel() the thread
is still join-able if it was created in this state.
There are several other functions defined in the Pthread interface, like
pthread_exit or pthread_kill. Usually their functionality
is not needed, especially not in the thread pool, and therefore they are not
included in the thread class.
The following listing shows the implementation of the constructor, the
destructor and the method on_proc(int).
TThread::TThread ( int p )
: _running(false), _thread_no(p)
{}
TThread:: TThread ()
{ if ( _running )
cancel();
}
bool TThread::on_proc ( int p ) const
{ if ((p == -1) || (_thread_no == -1) || (p == _thread_no))
return true;
else
return false
}
The method create(bool,bool) first checks, if the thread is already
running. After this, the attributes of the thread are initialised and,
optionally, the detached state and system contention scope are set. Finally the
POSIX thread is created.
void
TThread::create ( bool detached, bool sscope )
{
if ( ! _running )
{
int status;
pthread_attr_init( & _thread_attr );
if ( detached )
pthread_attr_setdetachstate( & _thread_attr,
PTHREAD_CREATE_DETACHED );
if ( sscope )
pthread_attr_setscope( & _thread_attr,
PTHREAD_SCOPE_SYSTEM );
pthread_create( & _thread_id, & _thread_attr,
_run_thread, this );
_running = true;
}
else
printf( "ERROR : thread is already running\n" );
}
Argument 3 of the function pthread_create is not the member function
run() but an external, non-virtual function. Because of the way a
virtual method is called in C++, it is not possible to obtain a pointer to such
a function. Instead a pointer to the function _run_thread(void*) is
supplied which does nothing but call the member function run():
extern "C" void *
_run_thread ( void * arg )
{ if (arg != NULL)
((TThread*) arg)->run();
return NULL;
}
The implementation of the methods join() and cancel() is
straightforward: after ensuring, that a thread is running, the appropriate
Pthread function is called.
2.2 Classes for Mutices and Condition Variables
Two other ingredients are very important when working with threads: mutices and condition variables. A mutex allows the synchronisation between different threads, especially important if several threads write to the same memory area. The following listing shows a simple wrapper around the Pthread functions for mutices.
class TMutex
{
friend class TCondition;
protected:
// the mutex itself and the mutex attribute
pthread_mutex_t _mutex;
pthread_mutexattr_t _mutex_attr;
public:
TMutex ()
{ pthread_mutexattr_init( & _mutex_attr );
pthread_mutex_init( & _mutex, & _mutex_attr );
}
TMutex ()
{ pthread_mutex_destroy( & _mutex );
pthread_mutexattr_destroy( & _mutex_attr );
}
// lock and unlock mutex (return 0 on success)
int lock () { return pthread_mutex_lock( & _mutex ); }
int unlock () { return pthread_mutex_unlock( & _mutex ); }
// try a lock (return 0 if lock was successful)
int trylock () { return pthread_mutex_trylock( & _mutex ); }
};
A very convenient way to have some kind of signalling between threads are condition
variables. They are using mutices, which is the reason for the
friend declaration inside the TMutex class.
class TCondition
{
protected:
// the Pthread condition variable
pthread_cond_t _cond;
public:
// constructor and destructor
TCondition () { pthread_cond_init( & _cond, NULL ); }
TCondition () { pthread_cond_destroy( & _cond ); }
// condition variable related methods
void wait ( TMutex & m )
{ pthread_cond_wait( & _cond, & m._mutex ); }
void signal ()
{ pthread_cond_signal( & _cond ); }
void broadcast ()
{ pthread_cond_broadcast( & _cond ); }
};
Condition variables in the Pthread implementation can also be initialised with
attributes, but usually the standard attributes are sufficient and therefore the
are not included in the wrapper.
2.3 Thread Pool Class
The thread pool works by starting p local threads during the initialisation of the pool. These threads are blocked, until a job is given to the thread pool. This job is then associated to one of these threads which executes the job. After finishing, the thread is again blocked until the next job arrives. If no free thread is available for an incoming job, the calling thread blocks, until a local thread of the pool has finished and is idle.2.3.1 Jobs for the Thread Pool
The implementation starts with a class for a job, which can be handled by the pool. This class is similar to the thread class above, but should not be a thread itself. Instead it just has to hold some informations for the thread pool.
class TJob
{
protected:
// number of processor this job was assigned to
int _job_no;
// associated thread in thread pool
TPoolThr * _pool_thr;
public:
// constructor
TJob ( int p ) : _job_no(p), _pool_thr(NULL) {}
// running method
virtual void run ( void * ptr ) = 0;
// access local data
int job_no () const { return _job_no; }
TPoolThr * pool_thr () { return _pool_thr; }
void set_pool_thr ( TPoolThr * t ) { _pool_thr = t; }
// compare if given processor is local
bool on_proc ( int p ) const;
};
The variable _job_no and the method on_proc(int) have the
same purpose as _thread_no and on_proc(int) in the
TThread class and the implementation is identical.
For synchronisation purposes, which become clear later, a pointer
(_pool_thr) to the associated thread in the thread pool is kept
within each job object.
As in the class for a thread, the function run(void*) has to be
overwritten by a derived class and should contain the real computation to be
performed.
2.3.2 A Thread in a Thread Pool
The classTPoolThr is derived from TThread and contains
mutices and condition variables for the synchronisation with the
thread pool. Also a pointer to the actual pool it belongs to is saved.
class TPoolThr : public TThread
{
protected:
// thread pool we belong to
TThreadPool * _pool;
// job to run and data for the job
TJob * _job;
void * _data_ptr;
// condition and mutex for waiting for job
// and pool synchronisation
TCondition _work_cond, _sync_cond;
TMutex _work_mutex, _sync_mutex;
// indicates work-in-progress, end-of-thread
// and a mutex for the local variables
bool _work, _end;
TMutex _var_mutex;
// mutex for synchronisation with destructor
TMutex _del_mutex;
// should the job be deleted upon completion
bool _del_job;
public:
// constructor and destructor
TPoolThr ( int n, TThreadPool * p );
// running method
void run ();
// access local variables
void set_end ( bool f );
void set_work ( bool f );
void set_job ( TJob * j, void * p, bool del );
bool is_working () const { return _job != NULL; }
TJob * job () { return _job; }
TCondition & work_cond () { return _work_cond; }
TMutex & work_mutex () { return _work_mutex; }
TCondition & sync_cond () { return _sync_cond; }
TMutex & sync_mutex () { return _sync_mutex; }
TMutex & del_mutex () { return _del_mutex; }
};
All changes to the local variables, e.g. done by set_end(bool),
set_work(bool) and set_job(TJob*,void*), are guarded by
_var_mutex.
The function run() is implemented as an infinite loop, waiting for a
job to arrive and executing it. To prevent the destruction of the object before
the thread has finished its execution, the mutex _del_mutex is used
for the synchronisation of the destruction of the thread object.
void
TThreadPool::TPoolThr::run ()
{
_del_mutex.lock();
while ( ! _end )
{
// wait for work
_work_mutex.lock();
while ((_job == NULL) && ! _end )
_work_cond.wait( _work_mutex );
_work_mutex.unlock();
// check again if job is set and execute it
if ( _job != NULL )
{
_job->run( _data_ptr );
// detach thread from job
_job->set_pool_thr( NULL );
if ( _del_job )
delete _job;
set_job( NULL, NULL );
_sync_mutex.unlock();
}
// append thread to idle list
_pool->append_idle( this );
}
_del_mutex.unlock();
}
The condition variable is guarded by the associated mutex and the predicate
(_job == NULL) is checked twice before assuming it valid. After the
execution of the job, the connection between the thread and the job is
reset. That way, any synchronisation with the job can be skipped. Finally the
thread is inserted into the list of idle threads of the thread pool.
2.3.3 The Thread Pool Class
In the constructor of the thread pool, p threads of typeTPoolThr are
created and p is saved in a local variable (_max_parallel). Because
all threads in the pool are only created once, system contention scope (see
2.1) is chosen for a better system utilisation. The threads are
stored in an array and also, because they are immediately blocked, in a list
holding the idle threads (_idle_threads).
TArray and TSLL are classes representing dynamic arrays and
single-linked lists. The implementation of these classes is omitted in this
paper.
The complete definition of TThreadPool is shown in the next listing:
class TThreadPool
{
public:
class TJob { ... };
class TPoolThr { ... };
protected:
// maximum degree of parallelism
uint _max_parallel;
// array of threads, handled by pool
TArray< TPoolThr * > _threads;
// list of idle threads, mutices and condition for it
TSLL< TPoolThr * > _idle_threads;
TMutex _idle_mutex, _list_mutex;
TCondition _idle_cond;
public:
// constructor and destructor
TThreadPool ( uint max_p );
TThreadPool ();
// access local variables
uint max_parallel () const { return _max_parallel; }
// run, stop and synchronise with job
void run ( TJob * job, void * ptr = NULL, bool del = false );
void sync ( TJob * job );
void sync_all ();
// return idle thread form pool
TPoolThr * get_idle ();
// insert idle thread into pool
void append_idle ( TPoolThr * t );
};
In the first section of TThreadPool are the definitions for the above
described classes TJob and TPoolThread, which are subclasses
of TThreadPool.
The implementation of the constructor is as described above. Optionally a call
to pthread_setconcurrency might follow at the end of the constructor
to tell the Pthread implementation, how many concurrent threads to expect. The
destructor starts by synchronising with all threads. After that, each thread is
terminated by setting the variable _end in the pool-thread and
signalling the termination. Finally, after ensuring that each thread has
finished by locking _del_mutex, all resources are freed.
TThreadPool::TThreadPool ( uint p )
{
_max_parallel = p;
_threads.set_size( p );
for ( uint i = 0; i < p; i++ )
{
_threads[i] = new TPoolThr( i, this );
_idle_threads.append( _threads[i] );
_threads[i]->start( true, true );
}
// pthread_setconcurrency( p + pthread_getconcurrency() );
}
TThreadPool:: TThreadPool ()
{
sync_all();
for ( uint i = 0; i < _max_parallel; i++ )
{
_threads[i]->sync_mutex().lock();
_threads[i]->set_end( true );
_threads[i]->set_job( NULL, NULL );
_threads[i]->work_mutex().lock();
_threads[i]->work_cond().signal();
_threads[i]->work_mutex().unlock();
_threads[i]->sync_mutex().unlock();
}
// cancel still pending threads and delete them all
for ( uint i = 0; i < _max_parallel; i++ )
{
_threads[i]->del_mutex().lock();
delete _threads[i];
}
}
The method run(TJob*,void*) takes a given job with an optional
argument, looks for an idle job, attaches the job to the thread and signals the
thread to begin the execution.
void
TThreadPool::run ( TThreadPool::TJob * job, void * ptr, bool del )
{
TPoolThr * t = get_idle();
// and start the job
t->sync_mutex().lock();
t->set_job( job, ptr, del );
// attach thread to job
job->set_pool_thr( t );
t->work_mutex().lock();
t->work_cond().signal();
t->work_mutex().unlock();
}
To synchronise with the termination of a job, sync(TJob*) is used. This
function uses the pointer to the pool-thread, stored in the
TJob object. If this pointer exists, the synchronisation is done with
this thread. In case the pointer is NULL, e.g. the thread finished
before calling sync(TJob*), the method returns. Finally the connection
between the job and the pool-thread is nullified.
void
TThreadPool::sync ( TJob * job )
{
if ( job == NULL )
return;
TPoolThr * t = job->pool_thr();
// check if job is already released
if ( t == NULL )
return;
// look if thread is working and wait for signal
t->sync_mutex().lock();
t->set_job( NULL, NULL );
t->sync_mutex().unlock();
// detach job and thread
job->set_pool_thr( NULL );
}
To synchronise with all running threads, sync_all() was
implemented. Instead of using the thread pointer in the job objects, it uses the
pointer to the threads, stored in the thread pool directly. To not block, when
checking if a thread is running, the function first tries to block, if this is
successful, the thread is already idle, otherwise sync_all() waits
for the thread to unlock the mutex.
void
TThreadPool::sync_all ()
{
for ( uint i = 0; i < _max_parallel; i++ )
{
if ( _threads[i]->sync_mutex().trylock() )
_threads[i]->sync_mutex().lock();
_threads[i]->sync_mutex().unlock();
}
}
As mentioned above, the thread pool blocks if no idle thread is available to
execute the incoming job. This is done by the function get_idle(). It
blocks, until the list of idle threads (_idle_list is non-empty and
returns the first thread in that list. All accesses to the list are guarded by a
mutex.
TThreadPool::TPoolThr *
TThreadPool::get_idle ()
{
while ( true )
{
// wait for an idle thread
_idle_mutex.lock();
while ( _idle_threads.size() == 0 )
_idle_cond.wait( _idle_mutex );
_idle_mutex.unlock();
// get first idle thread
_list_mutex.lock();
if ( _idle_threads.size() > 0 )
{
TPoolThr * t = _idle_threads.behead();
_list_mutex.unlock();
return t;
}
_list_mutex.unlock();
}
}
The condition variable used in get_idle() is signalled by the method
append_idle(TPoolThr*) which inserts an idle thread into the list of
the pool. Before this, the list is tested, whether the given thread is already
inside or not. This is mainly for safety reasons. After the thread is inserted,
a signal is send to the condition variable _idle_cond, which wakes up
any blocking threads waiting for it.
void
TThreadPool::append_idle ( TThreadPool::TPoolThr * t )
{
_list_mutex.lock();
// check if given thread is already in list
// and only append if not so
TSLL< TPoolThr * >::TIterator iter = _idle_threads.first();
while ( ! iter.eol() )
{
if ( iter() == t )
{
_list_mutex.unlock();
return;
}
++iter;
}
_idle_threads.append( t );
_list_mutex.unlock();
_idle_mutex.lock();
_idle_cond.broadcast();
_idle_mutex.unlock();
}
3 Usage
As mentioned is section 2, the thread pool was implemented with two different kinds of usage in mind. The first case where the number of subproblems n is much larger than the number of processors p and the costs between the subproblems only vary little, list scheduling (see [2]) can be used for load balancing. List scheduling works by assigning the next available, not executed job to the first free processor (or thread). This kind of scheduling is in general not optimal, e.g. it approximates the time achieved by the best possible scheduling by a factor of up to 2 - ((2) / (n-1)) . But this worst case only occurs, if a small number of very costly jobs exists which are executed last. If the costs per job do not differ much, as assumed above, list scheduling works quite well. The second method is based on load balancing done by the user and only helps to simplify the parallel execution. Using this strategy n should be in the order of p. In both cases, the parallel work has to be put into an object of typeTJob. For this a new class must be derived and the run method
has to by overloaded.
class TMyJob : public TThreadPool::TJob
{
public:
TMyJob ( int p ) : TThreadPool::TJob(p) {}
void run ( void * arg )
{ // do something }
};
The overhead involved in using the thread pool on different hardware and
software systems is shown in the following table. In this benchmark 1,000,000
jobs are created and executed, while each job immediately returns from the
run() method. The same is repeated using threads with process (proc)
and system (sys) contention scope.
| System | thread pool | Thread (proc) | Thread (sys) |
|---|---|---|---|
| Linux | 20.7 s | 76.8 s | 77.0 s |
| Solaris 7 | 79.7 s | 142.3 s | 249.2 s |
| Solaris 9 | 49.1 s | 100.4 s | 100.3 s |
| HP-UX | 76.8 s | 192.8 s | 194.4 s |
| IBM AIX | 29.8 s | 56.4 s | 64.6 s |
3.1 Automatic Load Balancing
The example presented in this section uses a recursive function where the computations are done in the last level (e.g. the leafs) of the recursion. All computations are assumed to be independent.
TThreadPool * thread_pool;
void f ( int l )
{
if ( l == 0 )
thread_pool->run( new TMyJob() );
else
{ f( l-1 ); f( l-1 ); }
}
void
main ()
{
thread_pool = new TThreadPool( p );
f( max_depth );
thread_pool->sync_all();
}
The function creates new jobs until no idle threads remain in the
thread pool. Any new job blocks until a previous one has finished. After all
jobs have been given to the thread pool, the function returns and waits for the
termination of the running threads.
In figure 3.1 the speedup for an example using the described
strategy is given. A total of 256 jobs is executed by an increasing number of
threads. The work in each job varies randomly, whereat the minimal and maximal
costs differ by a factor of 1.5. As can be seen, the speedup is almost
optimal, indicating the good efficiency of list scheduling in such a case.
3.2 User Defined Load Balancing
In the following a load balancing is assumed, which distributes the work over all p processors. Then p jobs have to be created and given to the thread pool.TThreadPool thread_pool( p ); TMyJob ** jobs = new TMyJob* [p]; for ( int i = 0; i < p; i++ ) jobs[i] = new TMyJob( i ); for ( int i = 0; i < p; i++ ) thread_pool.run( jobs[i] ); for ( int i = 0; i < p; i++ ) thread_pool.sync( jobs[i] ); for ( int i = 0; i < p; i++ ) delete jobs[i]; delete[] jobs;The number of the local processor was assigned to each thread to allow the parallel routines to identify each thread. If this is not necessary, it can be omitted.
References
- D.R. Butenhof: Programming with POSIX Threads, Addison-Wesley, 1997.
- R.L. Graham: Bounds on Multiprocessing Timing Anomalies, SIAM Journal of Applied Mathematics, Volume 17(2), 1969, pp. 416-429.
- P. Hyde: Java Thread Programming, SAMS, 2001.
- Max Planck Institute for Mathematics in the Sciences, Scientific Computing, Internet-address: http://www.mis.mpg.de/scicomp/