rippled
Workers.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_CORE_WORKERS_H_INCLUDED
21 #define RIPPLE_CORE_WORKERS_H_INCLUDED
22 
23 #include <ripple/beast/core/LockFreeStack.h>
24 #include <ripple/core/impl/semaphore.h>
25 #include <atomic>
26 #include <condition_variable>
27 #include <mutex>
28 #include <string>
29 #include <thread>
30 
31 namespace ripple {
32 
33 namespace perf {
34 class PerfLog;
35 }
36 
79 class Workers
80 {
81 public:
83  struct Callback
84  {
85  virtual ~Callback() = default;
86  Callback() = default;
87  Callback(Callback const&) = delete;
88  Callback&
89  operator=(Callback const&) = delete;
90 
101  virtual void
102  processTask(int instance) = 0;
103  };
104 
112  explicit Workers(
113  Callback& callback,
114  perf::PerfLog* perfLog,
115  std::string const& threadNames = "Worker",
116  int numberOfThreads =
117  static_cast<int>(std::thread::hardware_concurrency()));
118 
119  ~Workers();
120 
129  int
130  getNumberOfThreads() const noexcept;
131 
135  void
136  setNumberOfThreads(int numberOfThreads);
137 
146  void
147  stop();
148 
157  void
158  addTask();
159 
164  int
165  numberOfCurrentlyRunningTasks() const noexcept;
166 
167  //--------------------------------------------------------------------------
168 
169 private:
170  struct PausedTag
171  {
172  explicit PausedTag() = default;
173  };
174 
175  /* A Worker executes tasks on its provided thread.
176 
177  These are the states:
178 
179  Active: Running the task processing loop.
180  Idle: Active, but blocked on waiting for a task.
181  Paused: Blocked waiting to exit or become active.
182  */
183  class Worker : public beast::LockFreeStack<Worker>::Node,
184  public beast::LockFreeStack<Worker, PausedTag>::Node
185  {
186  public:
187  Worker(
188  Workers& workers,
189  std::string const& threadName,
190  int const instance);
191 
192  ~Worker();
193 
194  void
195  notify();
196 
197  private:
198  void
199  run();
200 
201  private:
204  int const instance_;
205 
209  int wakeCount_; // how many times to un-pause
211  };
212 
213 private:
214  static void
216 
217 private:
220  std::string m_threadNames; // The name to give each thread
221  std::condition_variable m_cv; // signaled when all threads paused
224  semaphore m_semaphore; // each pending task is 1 resource
225  int m_numberOfThreads; // how many we want active now
226  std::atomic<int> m_activeCount; // to know when all are paused
227  std::atomic<int> m_pauseCount; // how many threads need to pause now
229  m_runningTaskCount; // how many calls to processTask() active
230  beast::LockFreeStack<Worker> m_everyone; // holds all created workers
232  m_paused; // holds just paused workers
233 };
234 
235 } // namespace ripple
236 
237 #endif
ripple::Workers::m_cv
std::condition_variable m_cv
Definition: Workers.h:221
ripple::Workers::perfLog_
perf::PerfLog * perfLog_
Definition: Workers.h:219
ripple::Workers::Callback::processTask
virtual void processTask(int instance)=0
Perform a task.
std::string
STL class.
ripple::Workers::stop
void stop()
Pause all threads and wait until they are paused.
Definition: Workers.cpp:114
ripple::Workers::deleteWorkers
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition: Workers.cpp:138
ripple::Workers::m_pauseCount
std::atomic< int > m_pauseCount
Definition: Workers.h:227
ripple::Workers::getNumberOfThreads
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
ripple::Workers::Worker::instance_
const int instance_
Definition: Workers.h:204
ripple::Workers::Worker::mutex_
std::mutex mutex_
Definition: Workers.h:207
ripple::Workers::m_paused
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition: Workers.h:232
ripple::Workers::m_everyone
beast::LockFreeStack< Worker > m_everyone
Definition: Workers.h:230
ripple::Workers::setNumberOfThreads
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition: Workers.cpp:63
ripple::Workers::Worker::~Worker
~Worker()
Definition: Workers.cpp:171
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:48
ripple::Workers::Worker::Worker
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition: Workers.cpp:158
ripple::Workers::PausedTag
Definition: Workers.h:170
ripple::Workers::Worker::wakeup_
std::condition_variable wakeup_
Definition: Workers.h:208
ripple::Workers::Worker::wakeCount_
int wakeCount_
Definition: Workers.h:209
ripple::Workers::Callback
Called to perform tasks as needed.
Definition: Workers.h:83
ripple::Workers::m_threadNames
std::string m_threadNames
Definition: Workers.h:220
ripple::Workers::m_numberOfThreads
int m_numberOfThreads
Definition: Workers.h:225
ripple::Workers::~Workers
~Workers()
Definition: Workers.cpp:45
ripple::Workers::m_activeCount
std::atomic< int > m_activeCount
Definition: Workers.h:226
ripple::Workers::m_runningTaskCount
std::atomic< int > m_runningTaskCount
Definition: Workers.h:229
thread
ripple::basic_semaphore< std::mutex, std::condition_variable >
ripple::Workers::Worker::m_workers
Workers & m_workers
Definition: Workers.h:202
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
ripple::Workers::Worker::threadName_
const std::string threadName_
Definition: Workers.h:203
ripple::Workers::m_semaphore
semaphore m_semaphore
Definition: Workers.h:224
atomic
ripple::Workers::Workers
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition: Workers.cpp:27
ripple::Workers
Workers is effectively a thread pool.
Definition: Workers.h:79
ripple::Workers::Worker
Definition: Workers.h:183
ripple::Workers::Worker::notify
void notify()
Definition: Workers.cpp:184
ripple::Workers::Worker::run
void run()
Definition: Workers.cpp:192
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Workers::Callback::operator=
Callback & operator=(Callback const &)=delete
condition_variable
ripple::Workers::addTask
void addTask()
Add a task to be performed.
Definition: Workers.cpp:126
ripple::Workers::Callback::Callback
Callback()=default
ripple::Workers::Callback::~Callback
virtual ~Callback()=default
ripple::Workers::Worker::thread_
std::thread thread_
Definition: Workers.h:206
mutex
ripple::Workers::m_mut
std::mutex m_mut
Definition: Workers.h:222
ripple::Workers::m_callback
Callback & m_callback
Definition: Workers.h:218
ripple::Workers::numberOfCurrentlyRunningTasks
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition: Workers.cpp:132
ripple::Workers::Worker::shouldExit_
bool shouldExit_
Definition: Workers.h:210
ripple::Workers::m_allPaused
bool m_allPaused
Definition: Workers.h:223
beast::LockFreeStack
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Definition: LockFreeStack.h:146
string