rippled
JobQueue.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
21 #define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
22 
23 #include <ripple/basics/LocalValue.h>
24 #include <ripple/core/ClosureCounter.h>
25 #include <ripple/core/JobTypeData.h>
26 #include <ripple/core/JobTypes.h>
27 #include <ripple/core/impl/Workers.h>
28 #include <ripple/json/json_value.h>
29 #include <boost/coroutine/all.hpp>
30 #include <boost/range/begin.hpp> // workaround for boost 1.72 bug
31 #include <boost/range/end.hpp> // workaround for boost 1.72 bug
32 
33 namespace ripple {
34 
35 namespace perf {
36 class PerfLog;
37 }
38 
39 class Logs;
41 {
42  explicit Coro_create_t() = default;
43 };
44 
55 class JobQueue : private Workers::Callback
56 {
57 public:
59  class Coro : public std::enable_shared_from_this<Coro>
60  {
61  private:
66  bool running_;
70  boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
71  boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
72 #ifndef NDEBUG
73  bool finished_ = false;
74 #endif
75 
76  public:
77  // Private: Used in the implementation
78  template <class F>
80 
81  // Not copy-constructible or assignable
82  Coro(Coro const&) = delete;
83  Coro&
84  operator=(Coro const&) = delete;
85 
86  ~Coro();
87 
97  void
98  yield() const;
99 
113  bool
114  post();
115 
125  void
126  resume();
127 
129  bool
130  runnable() const;
131 
133  void
134  expectEarlyExit();
135 
137  void
138  join();
139  };
140 
141  using JobFunction = std::function<void()>;
142 
143  JobQueue(
144  int threadCount,
145  beast::insight::Collector::ptr const& collector,
146  beast::Journal journal,
147  Logs& logs,
148  perf::PerfLog& perfLog);
149  ~JobQueue();
150 
160  template <
161  typename JobHandler,
162  typename = std::enable_if_t<std::is_same<
163  decltype(std::declval<JobHandler&&>()()),
164  void>::value>>
165  bool
166  addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
167  {
168  if (auto optionalCountedJob =
169  jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
170  {
171  return addRefCountedJob(type, name, std::move(*optionalCountedJob));
172  }
173  return false;
174  }
175 
185  template <class F>
187  postCoro(JobType t, std::string const& name, F&& f);
188 
191  int
192  getJobCount(JobType t) const;
193 
196  int
197  getJobCountTotal(JobType t) const;
198 
201  int
202  getJobCountGE(JobType t) const;
203 
207  makeLoadEvent(JobType t, std::string const& name);
208 
211  void
212  addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
213 
214  // Cannot be const because LoadMonitor has no const methods.
215  bool
216  isOverloaded();
217 
218  // Cannot be const because LoadMonitor has no const methods.
220  getJson(int c = 0);
221 
223  void
224  rendezvous();
225 
226  void
227  stop();
228 
229  bool
230  isStopping() const
231  {
232  return stopping_;
233  }
234 
235  // We may be able to move away from this, but we can keep it during the
236  // transition.
237  bool
238  isStopped() const;
239 
240 private:
241  friend class Coro;
242 
244 
254 
255  // The number of jobs currently in processTask()
257 
258  // The number of suspended coroutines
259  int nSuspend_ = 0;
260 
262 
263  // Statistics tracking
268 
270 
271  void
272  collect();
273  JobTypeData&
274  getJobTypeData(JobType type);
275 
276  // Adds a reference counted job to the JobQueue.
277  //
278  // param type The type of job.
279  // param name Name of the job.
280  // param func std::function with signature void (Job&). Called when the
281  // job is executed.
282  //
283  // return true if func added to queue.
284  bool
286  JobType type,
287  std::string const& name,
288  JobFunction const& func);
289 
290  // Returns the next Job we should run now.
291  //
292  // RunnableJob:
293  // A Job in the JobSet whose slots count for its type is greater than zero.
294  //
295  // Pre-conditions:
296  // mJobSet must not be empty.
297  // mJobSet holds at least one RunnableJob
298  //
299  // Post-conditions:
300  // job is a valid Job object.
301  // job is removed from mJobQueue.
302  // Waiting job count of its type is decremented
303  // Running job count of its type is incremented
304  //
305  // Invariants:
306  // The calling thread owns the JobLock
307  void
308  getNextJob(Job& job);
309 
310  // Indicates that a running Job has completed its task.
311  //
312  // Pre-conditions:
313  // Job must not exist in mJobSet.
314  // The JobType must not be invalid.
315  //
316  // Post-conditions:
317  // The running count of that JobType is decremented
318  // A new task is signaled if there are more waiting Jobs than the limit, if
319  // any.
320  //
321  // Invariants:
322  // <none>
323  void
324  finishJob(JobType type);
325 
326  // Runs the next appropriate waiting Job.
327  //
328  // Pre-conditions:
329  // A RunnableJob must exist in the JobSet
330  //
331  // Post-conditions:
332  // The chosen RunnableJob will have Job::doJob() called.
333  //
334  // Invariants:
335  // <none>
336  void
337  processTask(int instance) override;
338 
339  // Returns the limit of running jobs for the given job type.
340  // For jobs with no limit, we return the largest int. Hopefully that
341  // will be enough.
342  int
343  getJobLimit(JobType type);
344 };
345 
346 /*
347  An RPC command is received and is handled via ServerHandler(HTTP) or
348  Handler(websocket), depending on the connection type. The handler then calls
349  the JobQueue::postCoro() method to create a coroutine and run it at a later
350  point. This frees up the handler thread and allows it to continue handling
351  other requests while the RPC command completes its work asynchronously.
352 
353  postCoro() creates a Coro object. When the Coro ctor is called, and its
354  coro_ member is initialized (a boost::coroutines::pull_type), execution
355  automatically passes to the coroutine, which we don't want at this point,
356  since we are still in the handler thread context. It's important to note
357  here that construction of a boost pull_type automatically passes execution to
358  the coroutine. A pull_type object automatically generates a push_type that is
359  passed as a parameter (do_yield) in the signature of the function the
360  pull_type was created with. This function is immediately called during coro_
361  construction and within it, Coro::yield_ is assigned the push_type
362  parameter (do_yield) address and called (yield()) so we can return execution
363  back to the caller's stack.
364 
365  postCoro() then calls Coro::post(), which schedules a job on the job
366  queue to continue execution of the coroutine in a JobQueue worker thread at
367  some later time. When the job runs, we lock on the Coro::mutex_ and call
368  coro_ which continues where we had left off. Since we the last thing we did
369  in coro_ was call yield(), the next thing we continue with is calling the
370  function param f, that was passed into Coro ctor. It is within this
371  function body that the caller specifies what he would like to do while
372  running in the coroutine and allow them to suspend and resume execution.
373  A task that relies on other events to complete, such as path finding, calls
374  Coro::yield() to suspend its execution while waiting on those events to
375  complete and continue when signaled via the Coro::post() method.
376 
377  There is a potential race condition that exists here where post() can get
378  called before yield() after f is called. Technically the problem only occurs
379  if the job that post() scheduled is executed before yield() is called.
380  If the post() job were to be executed before yield(), undefined behavior
381  would occur. The lock ensures that coro_ is not called again until we exit
382  the coroutine. At which point a scheduled resume() job waiting on the lock
383  would gain entry, harmlessly call coro_ and immediately return as we have
384  already completed the coroutine.
385 
386  The race condition occurs as follows:
387 
388  1- The coroutine is running.
389  2- The coroutine is about to suspend, but before it can do so, it must
390  arrange for some event to wake it up.
391  3- The coroutine arranges for some event to wake it up.
392  4- Before the coroutine can suspend, that event occurs and the
393  resumption of the coroutine is scheduled on the job queue. 5- Again, before
394  the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
395  Again, before the coroutine can suspend, the resumption code runs the
396  coroutine.
397  The coroutine is now running in two threads.
398 
399  The lock prevents this from happening as step 6 will block until the
400  lock is released which only happens after the coroutine completes.
401 */
402 
403 } // namespace ripple
404 
405 #include <ripple/core/Coro.ipp>
406 
407 namespace ripple {
408 
409 template <class F>
411 JobQueue::postCoro(JobType t, std::string const& name, F&& f)
412 {
413  /* First param is a detail type to make construction private.
414  Last param is the function the coroutine runs. Signature of
415  void(std::shared_ptr<Coro>).
416  */
417  auto coro = std::make_shared<Coro>(
418  Coro_create_t{}, *this, t, name, std::forward<F>(f));
419  if (!coro->post())
420  {
421  // The Coro was not successfully posted. Disable it so it's destructor
422  // can run with no negative side effects. Then destroy it.
423  coro->expectEarlyExit();
424  coro.reset();
425  }
426  return coro;
427 }
428 
429 } // namespace ripple
430 
431 #endif
ripple::Coro_create_t::Coro_create_t
Coro_create_t()=default
ripple::JobQueue::finishJob
void finishJob(JobType type)
Definition: JobQueue.cpp:332
std::is_same
ripple::JobQueue::m_jobSet
std::set< Job > m_jobSet
Definition: JobQueue.h:248
ripple::JobQueue::nSuspend_
int nSuspend_
Definition: JobQueue.h:259
std::string
STL class.
std::shared_ptr< Collector >
ripple::JobQueue::JobQueue
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:27
ripple::JobQueue::postCoro
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition: JobQueue.h:411
ripple::Logs
Manages partitions for logging.
Definition: Log.h:48
ripple::JobQueue::Coro::post
bool post()
Schedule coroutine execution.
ripple::JobQueue::getJobLimit
int getJobLimit(JobType type)
Definition: JobQueue.cpp:404
std::chrono::milliseconds
ripple::JobQueue::Coro::coro_
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
Definition: JobQueue.h:70
ripple::JobQueue::Coro::expectEarlyExit
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
ripple::JobQueue::getJson
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:196
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:48
ripple::JobQueue::addRefCountedJob
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:78
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
std::function< void()>
ripple::JobQueue::Coro::jq_
JobQueue & jq_
Definition: JobQueue.h:63
ripple::Workers::Callback
Called to perform tasks as needed.
Definition: Workers.h:83
ripple::JobQueue::m_journal
beast::Journal m_journal
Definition: JobQueue.h:245
ripple::JobQueue::Coro
Coroutines must run to completion.
Definition: JobQueue.h:59
ripple::JobQueue::Coro::~Coro
~Coro()
ripple::JobQueue::getJobCount
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:128
ripple::JobQueue::jobCounter_
JobCounter jobCounter_
Definition: JobQueue.h:249
ripple::JobQueue::m_invalidJobData
JobTypeData m_invalidJobData
Definition: JobQueue.h:253
ripple::JobQueue::getJobCountTotal
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:138
ripple::JobQueue::Coro::resume
void resume()
Resume coroutine execution.
ripple::JobQueue::Coro::lvs_
detail::LocalValues lvs_
Definition: JobQueue.h:62
ripple::JobQueue::Coro::Coro
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
ripple::JobQueue::~JobQueue
~JobQueue()
Definition: JobQueue.cpp:64
ripple::JobQueue::hook
beast::insight::Hook hook
Definition: JobQueue.h:267
std::enable_if_t
ripple::JobQueue::Coro::cv_
std::condition_variable cv_
Definition: JobQueue.h:69
ripple::JobQueue::m_mutex
std::mutex m_mutex
Definition: JobQueue.h:246
ripple::JobQueue::m_collector
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:265
ripple::JobQueue::isOverloaded
bool isOverloaded()
Definition: JobQueue.cpp:188
ripple::JobQueue::isStopping
bool isStopping() const
Definition: JobQueue.h:230
ripple::JobQueue::processTask
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:351
std::enable_shared_from_this
ripple::JobQueue::m_jobData
JobDataMap m_jobData
Definition: JobQueue.h:252
ripple::JobTypeData
Definition: JobTypeData.h:29
ripple::Job
Definition: Job.h:96
ripple::ClosureCounter< void >
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint64_t
ripple::JobQueue::getJobTypeData
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:261
ripple::JobQueue::m_lastJob
std::uint64_t m_lastJob
Definition: JobQueue.h:247
ripple::JobQueue::isStopped
bool isStopped() const
Definition: JobQueue.cpp:297
std::atomic_bool
ripple::JobQueue::Coro::yield
void yield() const
Suspend coroutine execution.
std::map< JobType, JobTypeData >
ripple::JobQueue::stopping_
std::atomic_bool stopping_
Definition: JobQueue.h:250
ripple::JobQueue::stopped_
std::atomic_bool stopped_
Definition: JobQueue.h:251
ripple::JobQueue::rendezvous
void rendezvous()
Block until no jobs running.
Definition: JobQueue.cpp:254
ripple::JobQueue::Coro::join
void join()
Waits until coroutine returns from the user function.
ripple::JobQueue::Coro::type_
JobType type_
Definition: JobQueue.h:64
ripple::Workers
Workers is effectively a thread pool.
Definition: Workers.h:79
beast::insight::Gauge
A metric for measuring an integral value.
Definition: Gauge.h:39
ripple::JobQueue
A pool of threads to perform work.
Definition: JobQueue.h:55
ripple::JobQueue::stop
void stop()
Definition: JobQueue.cpp:275
ripple::JobQueue::getJobCountGE
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:148
ripple::JobQueue::Coro::name_
std::string name_
Definition: JobQueue.h:65
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::JobQueue::Coro::yield_
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
Definition: JobQueue.h:71
ripple::JobQueue::perfLog_
perf::PerfLog & perfLog_
Definition: JobQueue.h:264
ripple::JobQueue::Coro::finished_
bool finished_
Definition: JobQueue.h:73
std::condition_variable
ripple::Coro_create_t
Definition: JobQueue.h:40
std::mutex
STL class.
ripple::JobType
JobType
Definition: Job.h:35
ripple::JobQueue::job_count
beast::insight::Gauge job_count
Definition: JobQueue.h:266
ripple::JobQueue::Coro::mutex_run_
std::mutex mutex_run_
Definition: JobQueue.h:68
ripple::JobQueue::Coro::mutex_
std::mutex mutex_
Definition: JobQueue.h:67
ripple::JobQueue::m_processCount
int m_processCount
Definition: JobQueue.h:256
ripple::JobQueue::cv_
std::condition_variable cv_
Definition: JobQueue.h:269
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:165
ripple::JobQueue::m_workers
Workers m_workers
Definition: JobQueue.h:261
ripple::JobQueue::addLoadEvents
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:177
ripple::detail::LocalValues
Definition: LocalValue.h:31
std::unique_ptr
STL class.
beast::insight::Hook
A reference to a handler for performing polled collection.
Definition: Hook.h:31
std::set
STL class.
ripple::JobQueue::getNextJob
void getNextJob(Job &job)
Definition: JobQueue.cpp:303
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::JobQueue::Coro::operator=
Coro & operator=(Coro const &)=delete
ripple::JobQueue::Coro::running_
bool running_
Definition: JobQueue.h:66
ripple::JobQueue::collect
void collect()
Definition: JobQueue.cpp:71
ripple::JobQueue::Coro::runnable
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
ripple::ClosureCounter::wrap
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Definition: ClosureCounter.h:192