rippled
Workers.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/PerfLog.h>
21 #include <ripple/beast/core/CurrentThreadName.h>
22 #include <ripple/core/impl/Workers.h>
23 #include <cassert>
24 
25 namespace ripple {
26 
28  Callback& callback,
29  perf::PerfLog* perfLog,
30  std::string const& threadNames,
31  int numberOfThreads)
32  : m_callback(callback)
33  , perfLog_(perfLog)
34  , m_threadNames(threadNames)
35  , m_allPaused(true)
36  , m_semaphore(0)
37  , m_numberOfThreads(0)
38  , m_activeCount(0)
39  , m_pauseCount(0)
40  , m_runningTaskCount(0)
41 {
42  setNumberOfThreads(numberOfThreads);
43 }
44 
46 {
47  stop();
48 
50 }
51 
52 int
54 {
55  return m_numberOfThreads;
56 }
57 
58 // VFALCO NOTE if this function is called quickly to reduce then
59 // increase the number of threads, it could result in
60 // more paused threads being created than expected.
61 //
62 void
63 Workers::setNumberOfThreads(int numberOfThreads)
64 {
65  static int instance{0};
66  if (m_numberOfThreads == numberOfThreads)
67  return;
68 
69  if (perfLog_)
70  perfLog_->resizeJobs(numberOfThreads);
71 
72  if (numberOfThreads > m_numberOfThreads)
73  {
74  // Increasing the number of working threads
75  int const amount = numberOfThreads - m_numberOfThreads;
76 
77  for (int i = 0; i < amount; ++i)
78  {
79  // See if we can reuse a paused worker
80  Worker* worker = m_paused.pop_front();
81 
82  if (worker != nullptr)
83  {
84  // If we got here then the worker thread is at [1]
85  // This will unblock their call to wait()
86  //
87  worker->notify();
88  }
89  else
90  {
91  worker = new Worker(*this, m_threadNames, instance++);
92  m_everyone.push_front(worker);
93  }
94  }
95  }
96  else
97  {
98  // Decreasing the number of working threads
99  int const amount = m_numberOfThreads - numberOfThreads;
100 
101  for (int i = 0; i < amount; ++i)
102  {
103  ++m_pauseCount;
104 
105  // Pausing a thread counts as one "internal task"
107  }
108  }
109 
110  m_numberOfThreads = numberOfThreads;
111 }
112 
113 void
115 {
117 
119  m_cv.wait(lk, [this] { return m_allPaused; });
120  lk.unlock();
121 
122  assert(numberOfCurrentlyRunningTasks() == 0);
123 }
124 
125 void
127 {
129 }
130 
131 int
133 {
134  return m_runningTaskCount.load();
135 }
136 
137 void
139 {
140  for (;;)
141  {
142  Worker* const worker = stack.pop_front();
143 
144  if (worker != nullptr)
145  {
146  // This call blocks until the thread orderly exits
147  delete worker;
148  }
149  else
150  {
151  break;
152  }
153  }
154 }
155 
156 //------------------------------------------------------------------------------
157 
159  Workers& workers,
160  std::string const& threadName,
161  int const instance)
162  : m_workers{workers}
163  , threadName_{threadName}
164  , instance_{instance}
165  , wakeCount_{0}
166  , shouldExit_{false}
167 {
168  thread_ = std::thread{&Workers::Worker::run, this};
169 }
170 
172 {
173  {
174  std::lock_guard lock{mutex_};
175  ++wakeCount_;
176  shouldExit_ = true;
177  }
178 
179  wakeup_.notify_one();
180  thread_.join();
181 }
182 
183 void
185 {
186  std::lock_guard lock{mutex_};
187  ++wakeCount_;
188  wakeup_.notify_one();
189 }
190 
191 void
193 {
194  bool shouldExit = true;
195  do
196  {
197  // Increment the count of active workers, and if
198  // we are the first one then reset the "all paused" event
199  //
200  if (++m_workers.m_activeCount == 1)
201  {
202  std::lock_guard lk{m_workers.m_mut};
203  m_workers.m_allPaused = false;
204  }
205 
206  for (;;)
207  {
208  // Put the name back in case the callback changed it
209  beast::setCurrentThreadName(threadName_);
210 
211  // Acquire a task or "internal task."
212  //
213  m_workers.m_semaphore.wait();
214 
215  // See if there's a pause request. This
216  // counts as an "internal task."
217  //
218  int pauseCount = m_workers.m_pauseCount.load();
219 
220  if (pauseCount > 0)
221  {
222  // Try to decrement
223  pauseCount = --m_workers.m_pauseCount;
224 
225  if (pauseCount >= 0)
226  {
227  // We got paused
228  break;
229  }
230  else
231  {
232  // Undo our decrement
233  ++m_workers.m_pauseCount;
234  }
235  }
236 
237  // We couldn't pause so we must have gotten
238  // unblocked in order to process a task.
239  //
240  ++m_workers.m_runningTaskCount;
241  m_workers.m_callback.processTask(instance_);
242  --m_workers.m_runningTaskCount;
243  }
244 
245  // Any worker that goes into the paused list must
246  // guarantee that it will eventually block on its
247  // event object.
248  //
249  m_workers.m_paused.push_front(this);
250 
251  // Decrement the count of active workers, and if we
252  // are the last one then signal the "all paused" event.
253  //
254  if (--m_workers.m_activeCount == 0)
255  {
256  std::lock_guard lk{m_workers.m_mut};
257  m_workers.m_allPaused = true;
258  m_workers.m_cv.notify_all();
259  }
260 
261  // Set inactive thread name.
262  beast::setCurrentThreadName("(" + threadName_ + ")");
263 
264  // [1] We will be here when the paused list is popped
265  //
266  // We block on our condition_variable, wakeup_, a requirement of being
267  // put into the paused list.
268  //
269  // wakeup_ will get signaled by either Worker::notify() or ~Worker.
270  {
271  std::unique_lock<std::mutex> lock{mutex_};
272  wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
273 
274  shouldExit = shouldExit_;
275  --wakeCount_;
276  }
277  } while (!shouldExit);
278 }
279 
280 } // namespace ripple
ripple::Workers::m_cv
std::condition_variable m_cv
Definition: Workers.h:221
ripple::Workers::perfLog_
perf::PerfLog * perfLog_
Definition: Workers.h:219
std::string
STL class.
ripple::Workers::stop
void stop()
Pause all threads and wait until they are paused.
Definition: Workers.cpp:114
ripple::Workers::deleteWorkers
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition: Workers.cpp:138
ripple::Workers::m_pauseCount
std::atomic< int > m_pauseCount
Definition: Workers.h:227
ripple::Workers::getNumberOfThreads
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
ripple::Workers::m_paused
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition: Workers.h:232
ripple::Workers::m_everyone
beast::LockFreeStack< Worker > m_everyone
Definition: Workers.h:230
ripple::Workers::setNumberOfThreads
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition: Workers.cpp:63
std::lock_guard
STL class.
ripple::Workers::Worker::~Worker
~Worker()
Definition: Workers.cpp:171
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:48
ripple::Workers::Worker::Worker
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition: Workers.cpp:158
ripple::Workers::Callback
Called to perform tasks as needed.
Definition: Workers.h:83
ripple::Workers::m_threadNames
std::string m_threadNames
Definition: Workers.h:220
ripple::Workers::m_numberOfThreads
int m_numberOfThreads
Definition: Workers.h:225
ripple::Workers::~Workers
~Workers()
Definition: Workers.cpp:45
ripple::Workers::m_runningTaskCount
std::atomic< int > m_runningTaskCount
Definition: Workers.h:229
std::thread
STL class.
std::atomic::load
T load(T... args)
ripple::perf::PerfLog::resizeJobs
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
std::unique_lock
STL class.
ripple::Workers::m_semaphore
semaphore m_semaphore
Definition: Workers.h:224
std::condition_variable::wait
T wait(T... args)
ripple::Workers::Workers
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition: Workers.cpp:27
ripple::basic_semaphore::notify
void notify()
Increment the count and unblock one waiting thread.
Definition: semaphore.h:48
ripple::Workers
Workers is effectively a thread pool.
Definition: Workers.h:79
ripple::Workers::Worker
Definition: Workers.h:183
ripple::Workers::Worker::notify
void notify()
Definition: Workers.cpp:184
beast::LockFreeStack::pop_front
Element * pop_front()
Pop an element off the stack.
Definition: LockFreeStack.h:239
ripple::Workers::Worker::run
void run()
Definition: Workers.cpp:192
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
cassert
ripple::Workers::addTask
void addTask()
Add a task to be performed.
Definition: Workers.cpp:126
ripple::Workers::m_mut
std::mutex m_mut
Definition: Workers.h:222
ripple::Workers::numberOfCurrentlyRunningTasks
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition: Workers.cpp:132
std::thread::join
T join(T... args)
ripple::Workers::m_allPaused
bool m_allPaused
Definition: Workers.h:223
beast::LockFreeStack
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Definition: LockFreeStack.h:146