rippled
Classes | Public Member Functions | Static Private Member Functions | Private Attributes | List of all members
ripple::Workers Class Reference

Workers is effectively a thread pool. More...

Collaboration diagram for ripple::Workers:
Collaboration graph
[legend]

Classes

struct  Callback
 Called to perform tasks as needed. More...
 
struct  PausedTag
 
class  Worker
 

Public Member Functions

 Workers (Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
 Create the object. More...
 
 ~Workers ()
 
int getNumberOfThreads () const noexcept
 Retrieve the desired number of threads. More...
 
void setNumberOfThreads (int numberOfThreads)
 Set the desired number of threads. More...
 
void stop ()
 Pause all threads and wait until they are paused. More...
 
void addTask ()
 Add a task to be performed. More...
 
int numberOfCurrentlyRunningTasks () const noexcept
 Get the number of currently executing calls of Callback::processTask. More...
 

Static Private Member Functions

static void deleteWorkers (beast::LockFreeStack< Worker > &stack)
 

Private Attributes

Callbackm_callback
 
perf::PerfLogperfLog_
 
std::string m_threadNames
 
std::condition_variable m_cv
 
std::mutex m_mut
 
bool m_allPaused
 
semaphore m_semaphore
 
int m_numberOfThreads
 
std::atomic< int > m_activeCount
 
std::atomic< int > m_pauseCount
 
std::atomic< int > m_runningTaskCount
 
beast::LockFreeStack< Workerm_everyone
 
beast::LockFreeStack< Worker, PausedTagm_paused
 

Detailed Description

Workers is effectively a thread pool.

The constructor takes a "callback" that has a void processTask(int instance) method, and a number of workers. It creates that many Workers and then waits for calls to Workers::addTask(). It holds a semaphore that counts the number of pending "tasks", and a condition variable for the event when the last worker pauses itself.

A "task" is just a call to the callback's processTask method. "Adding a task" means calling that method now, or remembering to call it in the future. This is implemented with a semaphore. If there are any workers waiting when a task is added, then one will be woken to claim the task. If not, then the next worker to wait on the semaphore will claim the task.

Creating a Worker creates a thread that calls Worker::run(). When that thread enters Worker::run, it increments the count of active workers in the parent Workers object and then tries to claim a task, which blocks if there are none pending. It will be unblocked whenever the semaphore is notified (i.e. when the number of pending tasks is incremented). That only happens in two circumstances: (1) when Workers::addTask is called and (2) when Workers wants to pause some workers ("pause one worker" is considered one task), which happens when someone wants to stop the workers or shrink the threadpool. No worker threads are ever destroyed until Workers is destroyed; it merely pauses workers until then.

When a waiting worker is woken, it checks whether Workers is trying to pause workers. If so, it changes its status from active to paused and blocks on its own condition variable. If not, then it calls processTask on the "callback" held by Workers.

When a paused worker is woken, it checks whether it should exit. The signal to exit is only set in the destructor of Worker, which unblocks the paused thread and waits for it to exit. A Worker::run thread checks whether it needs to exit only when it is woken from a pause (not when it is woken from waiting). This is why the destructor for Workers pauses all the workers before destroying them.

Definition at line 79 of file Workers.h.

Constructor & Destructor Documentation

◆ Workers()

ripple::Workers::Workers ( Callback callback,
perf::PerfLog perfLog,
std::string const &  threadNames = "Worker",
int  numberOfThreads = static_cast<int>(std::thread::hardware_concurrency()) 
)
explicit

Create the object.

A number of initial threads may be optionally specified. The default is to create one thread per CPU.

Parameters
threadNamesThe name given to each created worker thread.

Definition at line 27 of file Workers.cpp.

◆ ~Workers()

ripple::Workers::~Workers ( )

Definition at line 45 of file Workers.cpp.

Member Function Documentation

◆ getNumberOfThreads()

int ripple::Workers::getNumberOfThreads ( ) const
noexcept

Retrieve the desired number of threads.

This just returns the number of active threads that were requested. If there was a recent call to setNumberOfThreads, the actual number of active threads may be temporarily different from what was last requested.

Note
This function is not thread-safe.

Definition at line 53 of file Workers.cpp.

◆ setNumberOfThreads()

void ripple::Workers::setNumberOfThreads ( int  numberOfThreads)

Set the desired number of threads.

Note
This function is not thread-safe.

Definition at line 63 of file Workers.cpp.

◆ stop()

void ripple::Workers::stop ( )

Pause all threads and wait until they are paused.

If a thread is processing a task it will pause as soon as the task completes. There may still be tasks signaled even after all threads have paused.

Note
This function is not thread-safe.

Definition at line 114 of file Workers.cpp.

◆ addTask()

void ripple::Workers::addTask ( )

Add a task to be performed.

Every call to addTask will eventually result in a call to Callback::processTask unless the Workers object is destroyed or the number of threads is never set above zero.

Note
This function is thread-safe.

Definition at line 126 of file Workers.cpp.

◆ numberOfCurrentlyRunningTasks()

int ripple::Workers::numberOfCurrentlyRunningTasks ( ) const
noexcept

Get the number of currently executing calls of Callback::processTask.

While this function is thread-safe, the value may not stay accurate for very long. It's mainly for diagnostic purposes.

Definition at line 132 of file Workers.cpp.

◆ deleteWorkers()

void ripple::Workers::deleteWorkers ( beast::LockFreeStack< Worker > &  stack)
staticprivate

Definition at line 138 of file Workers.cpp.

Member Data Documentation

◆ m_callback

Callback& ripple::Workers::m_callback
private

Definition at line 218 of file Workers.h.

◆ perfLog_

perf::PerfLog* ripple::Workers::perfLog_
private

Definition at line 219 of file Workers.h.

◆ m_threadNames

std::string ripple::Workers::m_threadNames
private

Definition at line 220 of file Workers.h.

◆ m_cv

std::condition_variable ripple::Workers::m_cv
private

Definition at line 221 of file Workers.h.

◆ m_mut

std::mutex ripple::Workers::m_mut
private

Definition at line 222 of file Workers.h.

◆ m_allPaused

bool ripple::Workers::m_allPaused
private

Definition at line 223 of file Workers.h.

◆ m_semaphore

semaphore ripple::Workers::m_semaphore
private

Definition at line 224 of file Workers.h.

◆ m_numberOfThreads

int ripple::Workers::m_numberOfThreads
private

Definition at line 225 of file Workers.h.

◆ m_activeCount

std::atomic<int> ripple::Workers::m_activeCount
private

Definition at line 226 of file Workers.h.

◆ m_pauseCount

std::atomic<int> ripple::Workers::m_pauseCount
private

Definition at line 227 of file Workers.h.

◆ m_runningTaskCount

std::atomic<int> ripple::Workers::m_runningTaskCount
private

Definition at line 229 of file Workers.h.

◆ m_everyone

beast::LockFreeStack<Worker> ripple::Workers::m_everyone
private

Definition at line 230 of file Workers.h.

◆ m_paused

beast::LockFreeStack<Worker, PausedTag> ripple::Workers::m_paused
private

Definition at line 232 of file Workers.h.