rippled
JobQueue.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/PerfLog.h>
21 #include <ripple/basics/contract.h>
22 #include <ripple/core/JobQueue.h>
23 #include <mutex>
24 
25 namespace ripple {
26 
28  int threadCount,
29  beast::insight::Collector::ptr const& collector,
30  beast::Journal journal,
31  Logs& logs,
32  perf::PerfLog& perfLog)
33  : m_journal(journal)
34  , m_lastJob(0)
35  , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
36  , m_processCount(0)
37  , m_workers(*this, &perfLog, "JobQueue", threadCount)
38  , perfLog_(perfLog)
39  , m_collector(collector)
40 {
41  JLOG(m_journal.info()) << "Using " << threadCount << " threads";
42 
43  hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
44  job_count = m_collector->make_gauge("job_count");
45 
46  {
48 
49  for (auto const& x : JobTypes::instance())
50  {
51  JobTypeInfo const& jt = x.second;
52 
53  // And create dynamic information for all jobs
54  auto const result(m_jobData.emplace(
55  std::piecewise_construct,
58  assert(result.second == true);
59  (void)result.second;
60  }
61  }
62 }
63 
65 {
66  // Must unhook before destroying
68 }
69 
70 void
72 {
74  job_count = m_jobSet.size();
75 }
76 
77 bool
79  JobType type,
80  std::string const& name,
81  JobFunction const& func)
82 {
83  assert(type != jtINVALID);
84 
85  auto iter(m_jobData.find(type));
86  assert(iter != m_jobData.end());
87  if (iter == m_jobData.end())
88  return false;
89 
90  JLOG(m_journal.debug())
91  << __func__ << " : Adding job : " << name << " : " << type;
92  JobTypeData& data(iter->second);
93 
94  // FIXME: Workaround incorrect client shutdown ordering
95  // do not add jobs to a queue with no threads
96  assert(
97  (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
99 
100  {
101  std::lock_guard lock(m_mutex);
102  auto result =
103  m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
104  auto const& job = *result.first;
105 
106  JobType const type(job.getType());
107  assert(type != jtINVALID);
108  assert(m_jobSet.find(job) != m_jobSet.end());
109  perfLog_.jobQueue(type);
110 
111  JobTypeData& data(getJobTypeData(type));
112 
113  if (data.waiting + data.running < getJobLimit(type))
114  {
115  m_workers.addTask();
116  }
117  else
118  {
119  // defer the task until we go below the limit
120  ++data.deferred;
121  }
122  ++data.waiting;
123  }
124  return true;
125 }
126 
127 int
129 {
130  std::lock_guard lock(m_mutex);
131 
132  JobDataMap::const_iterator c = m_jobData.find(t);
133 
134  return (c == m_jobData.end()) ? 0 : c->second.waiting;
135 }
136 
137 int
139 {
140  std::lock_guard lock(m_mutex);
141 
142  JobDataMap::const_iterator c = m_jobData.find(t);
143 
144  return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
145 }
146 
147 int
149 {
150  // return the number of jobs at this priority level or greater
151  int ret = 0;
152 
153  std::lock_guard lock(m_mutex);
154 
155  for (auto const& x : m_jobData)
156  {
157  if (x.first >= t)
158  ret += x.second.waiting;
159  }
160 
161  return ret;
162 }
163 
166 {
167  JobDataMap::iterator iter(m_jobData.find(t));
168  assert(iter != m_jobData.end());
169 
170  if (iter == m_jobData.end())
171  return {};
172 
173  return std::make_unique<LoadEvent>(iter->second.load(), name, true);
174 }
175 
176 void
178 {
179  if (isStopped())
180  LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
181 
182  JobDataMap::iterator iter(m_jobData.find(t));
183  assert(iter != m_jobData.end());
184  iter->second.load().addSamples(count, elapsed);
185 }
186 
187 bool
189 {
190  return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
191  return entry.second.load().isOver();
192  });
193 }
194 
197 {
198  using namespace std::chrono_literals;
200 
201  ret["threads"] = m_workers.getNumberOfThreads();
202 
203  Json::Value priorities = Json::arrayValue;
204 
205  std::lock_guard lock(m_mutex);
206 
207  for (auto& x : m_jobData)
208  {
209  assert(x.first != jtINVALID);
210 
211  if (x.first == jtGENERIC)
212  continue;
213 
214  JobTypeData& data(x.second);
215 
216  LoadMonitor::Stats stats(data.stats());
217 
218  int waiting(data.waiting);
219  int running(data.running);
220 
221  if ((stats.count != 0) || (waiting != 0) ||
222  (stats.latencyPeak != 0ms) || (running != 0))
223  {
224  Json::Value& pri = priorities.append(Json::objectValue);
225 
226  pri["job_type"] = data.name();
227 
228  if (stats.isOverloaded)
229  pri["over_target"] = true;
230 
231  if (waiting != 0)
232  pri["waiting"] = waiting;
233 
234  if (stats.count != 0)
235  pri["per_second"] = static_cast<int>(stats.count);
236 
237  if (stats.latencyPeak != 0ms)
238  pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
239 
240  if (stats.latencyAvg != 0ms)
241  pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
242 
243  if (running != 0)
244  pri["in_progress"] = running;
245  }
246  }
247 
248  ret["job_types"] = priorities;
249 
250  return ret;
251 }
252 
253 void
255 {
257  cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
258 }
259 
262 {
263  JobDataMap::iterator c(m_jobData.find(type));
264  assert(c != m_jobData.end());
265 
266  // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
267  // and use something sane.
268  if (c == m_jobData.end())
269  return m_invalidJobData;
270 
271  return c->second;
272 }
273 
274 void
276 {
277  stopping_ = true;
278  using namespace std::chrono_literals;
279  jobCounter_.join("JobQueue", 1s, m_journal);
280  {
281  // After the JobCounter is joined, all jobs have finished executing
282  // (i.e. returned from `Job::doJob`) and no more are being accepted,
283  // but there may still be some threads between the return of
284  // `Job::doJob` and the return of `JobQueue::processTask`. That is why
285  // we must wait on the condition variable to make these assertions.
287  cv_.wait(
288  lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
289  assert(m_processCount == 0);
290  assert(m_jobSet.empty());
291  assert(nSuspend_ == 0);
292  stopped_ = true;
293  }
294 }
295 
296 bool
298 {
299  return stopped_;
300 }
301 
302 void
304 {
305  assert(!m_jobSet.empty());
306 
308  for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
309  {
310  JobType const type = iter->getType();
311  assert(type != jtINVALID);
312 
313  JobTypeData& data(getJobTypeData(type));
314  assert(data.running <= getJobLimit(type));
315 
316  // Run this job if we're running below the limit.
317  if (data.running < getJobLimit(data.type()))
318  {
319  assert(data.waiting > 0);
320  --data.waiting;
321  ++data.running;
322  break;
323  }
324  }
325 
326  assert(iter != m_jobSet.end());
327  job = *iter;
328  m_jobSet.erase(iter);
329 }
330 
331 void
333 {
334  assert(type != jtINVALID);
335 
336  JobTypeData& data = getJobTypeData(type);
337 
338  // Queue a deferred task if possible
339  if (data.deferred > 0)
340  {
341  assert(data.running + data.waiting >= getJobLimit(type));
342 
343  --data.deferred;
344  m_workers.addTask();
345  }
346 
347  --data.running;
348 }
349 
350 void
352 {
353  JobType type;
354 
355  {
356  using namespace std::chrono;
357  Job::clock_type::time_point const start_time(Job::clock_type::now());
358  {
359  Job job;
360  {
361  std::lock_guard lock(m_mutex);
362  getNextJob(job);
363  ++m_processCount;
364  }
365  type = job.getType();
366  JobTypeData& data(getJobTypeData(type));
367  JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
368 
369  // The amount of time that the job was in the queue
370  auto const q_time =
371  ceil<microseconds>(start_time - job.queue_time());
372  perfLog_.jobStart(type, q_time, start_time, instance);
373 
374  job.doJob();
375 
376  // The amount of time it took to execute the job
377  auto const x_time =
378  ceil<microseconds>(Job::clock_type::now() - start_time);
379 
380  if (x_time >= 10ms || q_time >= 10ms)
381  {
382  getJobTypeData(type).dequeue.notify(q_time);
383  getJobTypeData(type).execute.notify(x_time);
384  }
385  perfLog_.jobFinish(type, x_time, instance);
386  }
387  }
388 
389  {
390  std::lock_guard lock(m_mutex);
391  // Job should be destroyed before stopping
392  // otherwise destructors with side effects can access
393  // parent objects that are already destroyed.
394  finishJob(type);
395  if (--m_processCount == 0 && m_jobSet.empty())
396  cv_.notify_all();
397  }
398 
399  // Note that when Job::~Job is called, the last reference
400  // to the associated LoadEvent object (in the Job) may be destroyed.
401 }
402 
403 int
405 {
406  JobTypeInfo const& j(JobTypes::instance().get(type));
407  assert(j.type() != jtINVALID);
408 
409  return j.limit();
410 }
411 
412 } // namespace ripple
ripple::JobQueue::finishJob
void finishJob(JobType type)
Definition: JobQueue.cpp:332
ripple::JobQueue::m_jobSet
std::set< Job > m_jobSet
Definition: JobQueue.h:248
ripple::JobQueue::nSuspend_
int nSuspend_
Definition: JobQueue.h:259
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr< Collector >
ripple::JobQueue::JobQueue
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:27
ripple::jtCLIENT
@ jtCLIENT
Definition: Job.h:45
ripple::Logs
Manages partitions for logging.
Definition: Log.h:48
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::JobTypes
Definition: JobTypes.h:32
Json::arrayValue
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
ripple::JobTypeData::execute
beast::insight::Event execute
Definition: JobTypeData.h:52
std::map::find
T find(T... args)
ripple::Workers::getNumberOfThreads
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
ripple::JobQueue::getJobLimit
int getJobLimit(JobType type)
Definition: JobQueue.cpp:404
std::chrono::milliseconds
ripple::JobQueue::getJson
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:196
ripple::LoadMonitor::Stats::count
std::uint64_t count
Definition: LoadMonitor.h:60
ripple::LoadMonitor::Stats::isOverloaded
bool isOverloaded
Definition: LoadMonitor.h:63
std::map::emplace
T emplace(T... args)
ripple::Job::queue_time
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition: Job.cpp:56
std::lock_guard
STL class.
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:48
ripple::JobQueue::addRefCountedJob
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:78
std::function< void()>
std::any_of
T any_of(T... args)
ripple::JobQueue::m_journal
beast::Journal m_journal
Definition: JobQueue.h:245
ripple::JobTypeInfo::limit
int limit() const
Definition: JobTypeInfo.h:76
ripple::JobQueue::getJobCount
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:128
ripple::JobQueue::jobCounter_
JobCounter jobCounter_
Definition: JobQueue.h:249
ripple::JobQueue::m_invalidJobData
JobTypeData m_invalidJobData
Definition: JobQueue.h:253
ripple::JobTypeData::dequeue
beast::insight::Event dequeue
Definition: JobTypeData.h:51
ripple::jtCLIENT_WEBSOCKET
@ jtCLIENT_WEBSOCKET
Definition: Job.h:52
ripple::JobQueue::getJobCountTotal
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:138
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:882
ripple::LoadMonitor::Stats
Definition: LoadMonitor.h:56
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::JobQueue::~JobQueue
~JobQueue()
Definition: JobQueue.cpp:64
ripple::JobQueue::hook
beast::insight::Hook hook
Definition: JobQueue.h:267
ripple::JobQueue::m_mutex
std::mutex m_mutex
Definition: JobQueue.h:246
ripple::JobTypeInfo::type
JobType type() const
Definition: JobTypeInfo.h:64
ripple::JobQueue::m_collector
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:265
ripple::JobQueue::isOverloaded
bool isOverloaded()
Definition: JobQueue.cpp:188
std::unique_lock
STL class.
ripple::JobTypeInfo
Holds all the 'static' information about a job, which does not change.
Definition: JobTypeInfo.h:28
ripple::jtGENERIC
@ jtGENERIC
Definition: Job.h:88
ripple::jtINVALID
@ jtINVALID
Definition: Job.h:37
ripple::JobTypes::instance
static JobTypes const & instance()
Definition: JobTypes.h:125
ripple::JobQueue::processTask
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:351
ripple::JobQueue::m_jobData
JobDataMap m_jobData
Definition: JobQueue.h:252
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::JobTypeData
Definition: JobTypeData.h:29
ripple::Job
Definition: Job.h:96
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::condition_variable::wait
T wait(T... args)
ripple::JobQueue::getJobTypeData
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:261
ripple::JobQueue::m_lastJob
std::uint64_t m_lastJob
Definition: JobQueue.h:247
ripple::perf::PerfLog::jobQueue
virtual void jobQueue(JobType const type)=0
Log queued job.
ripple::perf::PerfLog::jobStart
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
ripple::JobQueue::isStopped
bool isStopped() const
Definition: JobQueue.cpp:297
std::forward_as_tuple
T forward_as_tuple(T... args)
ripple::JobQueue::stopping_
std::atomic_bool stopping_
Definition: JobQueue.h:250
ripple::JobQueue::stopped_
std::atomic_bool stopped_
Definition: JobQueue.h:251
ripple::JobQueue::rendezvous
void rendezvous()
Block until no jobs running.
Definition: JobQueue.cpp:254
ripple::JobQueue::stop
void stop()
Definition: JobQueue.cpp:275
ripple::JobQueue::getJobCountGE
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:148
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::map::begin
T begin(T... args)
ripple::JobQueue::perfLog_
perf::PerfLog & perfLog_
Definition: JobQueue.h:264
ripple::LogicError
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
Definition: contract.cpp:48
std::chrono::milliseconds::count
T count(T... args)
ripple::Workers::addTask
void addTask()
Add a task to be performed.
Definition: Workers.cpp:126
ripple::ClosureCounter::join
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
Definition: ClosureCounter.h:166
mutex
ripple::LoadMonitor::Stats::latencyAvg
std::chrono::milliseconds latencyAvg
Definition: LoadMonitor.h:61
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::JobType
JobType
Definition: Job.h:35
ripple::JobQueue::job_count
beast::insight::Gauge job_count
Definition: JobQueue.h:266
std::map::end
T end(T... args)
ripple::Job::getType
JobType getType() const
Definition: Job.cpp:50
ripple::JobQueue::m_processCount
int m_processCount
Definition: JobQueue.h:256
ripple::JobQueue::cv_
std::condition_variable cv_
Definition: JobQueue.h:269
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:165
ripple::JobQueue::m_workers
Workers m_workers
Definition: JobQueue.h:261
ripple::JobQueue::addLoadEvents
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:177
ripple::Job::doJob
void doJob()
Definition: Job.cpp:62
std::unique_ptr
STL class.
ripple::LoadMonitor::Stats::latencyPeak
std::chrono::milliseconds latencyPeak
Definition: LoadMonitor.h:62
beast::insight::Hook
A reference to a handler for performing polled collection.
Definition: Hook.h:31
std::condition_variable::notify_all
T notify_all(T... args)
std::set
STL class.
ripple::JobQueue::getNextJob
void getNextJob(Job &job)
Definition: JobQueue.cpp:303
ripple::perf::PerfLog::jobFinish
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
Json::Value
Represents a JSON value.
Definition: json_value.h:145
beast::insight::Event::notify
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition: Event.h:64
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::JobQueue::collect
void collect()
Definition: JobQueue.cpp:71
std::chrono
std::chrono::steady_clock::now
T now(T... args)