rippled
|
Workers
is effectively a thread pool.
More...
Classes | |
struct | Callback |
Called to perform tasks as needed. More... | |
struct | PausedTag |
class | Worker |
Public Member Functions | |
Workers (Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency())) | |
Create the object. More... | |
~Workers () | |
int | getNumberOfThreads () const noexcept |
Retrieve the desired number of threads. More... | |
void | setNumberOfThreads (int numberOfThreads) |
Set the desired number of threads. More... | |
void | stop () |
Pause all threads and wait until they are paused. More... | |
void | addTask () |
Add a task to be performed. More... | |
int | numberOfCurrentlyRunningTasks () const noexcept |
Get the number of currently executing calls of Callback::processTask. More... | |
Static Private Member Functions | |
static void | deleteWorkers (beast::LockFreeStack< Worker > &stack) |
Private Attributes | |
Callback & | m_callback |
perf::PerfLog * | perfLog_ |
std::string | m_threadNames |
std::condition_variable | m_cv |
std::mutex | m_mut |
bool | m_allPaused |
semaphore | m_semaphore |
int | m_numberOfThreads |
std::atomic< int > | m_activeCount |
std::atomic< int > | m_pauseCount |
std::atomic< int > | m_runningTaskCount |
beast::LockFreeStack< Worker > | m_everyone |
beast::LockFreeStack< Worker, PausedTag > | m_paused |
Workers
is effectively a thread pool.
The constructor takes a "callback" that has a void processTask(int instance)
method, and a number of workers. It creates that many Worker
s and then waits for calls to Workers::addTask()
. It holds a semaphore that counts the number of pending "tasks", and a condition variable for the event when the last worker pauses itself.
A "task" is just a call to the callback's processTask
method. "Adding a task" means calling that method now, or remembering to call it in the future. This is implemented with a semaphore. If there are any workers waiting when a task is added, then one will be woken to claim the task. If not, then the next worker to wait on the semaphore will claim the task.
Creating a Worker
creates a thread that calls Worker::run()
. When that thread enters Worker::run
, it increments the count of active workers in the parent Workers
object and then tries to claim a task, which blocks if there are none pending. It will be unblocked whenever the semaphore is notified (i.e. when the number of pending tasks is incremented). That only happens in two circumstances: (1) when Workers::addTask
is called and (2) when Workers
wants to pause some workers ("pause one worker" is considered one task), which happens when someone wants to stop the workers or shrink the threadpool. No worker threads are ever destroyed until Workers
is destroyed; it merely pauses workers until then.
When a waiting worker is woken, it checks whether Workers
is trying to pause workers. If so, it changes its status from active to paused and blocks on its own condition variable. If not, then it calls processTask
on the "callback" held by Workers
.
When a paused worker is woken, it checks whether it should exit. The signal to exit is only set in the destructor of Worker
, which unblocks the paused thread and waits for it to exit. A Worker::run
thread checks whether it needs to exit only when it is woken from a pause (not when it is woken from waiting). This is why the destructor for Workers
pauses all the workers before destroying them.
|
explicit |
Create the object.
A number of initial threads may be optionally specified. The default is to create one thread per CPU.
threadNames | The name given to each created worker thread. |
Definition at line 27 of file Workers.cpp.
ripple::Workers::~Workers | ( | ) |
Definition at line 45 of file Workers.cpp.
|
noexcept |
Retrieve the desired number of threads.
This just returns the number of active threads that were requested. If there was a recent call to setNumberOfThreads, the actual number of active threads may be temporarily different from what was last requested.
Definition at line 53 of file Workers.cpp.
void ripple::Workers::setNumberOfThreads | ( | int | numberOfThreads | ) |
Set the desired number of threads.
Definition at line 63 of file Workers.cpp.
void ripple::Workers::stop | ( | ) |
Pause all threads and wait until they are paused.
If a thread is processing a task it will pause as soon as the task completes. There may still be tasks signaled even after all threads have paused.
Definition at line 114 of file Workers.cpp.
void ripple::Workers::addTask | ( | ) |
Add a task to be performed.
Every call to addTask will eventually result in a call to Callback::processTask unless the Workers object is destroyed or the number of threads is never set above zero.
Definition at line 126 of file Workers.cpp.
|
noexcept |
Get the number of currently executing calls of Callback::processTask.
While this function is thread-safe, the value may not stay accurate for very long. It's mainly for diagnostic purposes.
Definition at line 132 of file Workers.cpp.
|
staticprivate |
Definition at line 138 of file Workers.cpp.
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |