20 #include <ripple/basics/PerfLog.h> 
   21 #include <ripple/basics/contract.h> 
   22 #include <ripple/core/JobQueue.h> 
   35     , m_invalidJobData(
JobTypes::instance().getInvalid(), collector, logs)
 
   37     , m_workers(*this, &perfLog, 
"JobQueue", threadCount)
 
   39     , m_collector(collector)
 
   41     JLOG(
m_journal.
info()) << 
"Using " << threadCount << 
"  threads";
 
   55                 std::piecewise_construct,
 
   58             assert(result.second == 
true);
 
   91         << __func__ << 
" : Adding job : " << name << 
" : " << type;
 
  104         auto const& job = *result.first;
 
  106         JobType const type(job.getType());
 
  113         if (data.waiting + data.running < 
getJobLimit(type))
 
  144     return (c == 
m_jobData.
end()) ? 0 : (c->second.waiting + c->second.running);
 
  158             ret += x.second.waiting;
 
  173     return std::make_unique<LoadEvent>(iter->second.load(), name, 
true);
 
  180         LogicError(
"JobQueue::addLoadEvents() called after JobQueue stopped");
 
  184     iter->second.load().addSamples(count, elapsed);
 
  191         return entry.second.load().isOver();
 
  198     using namespace std::chrono_literals;
 
  218         int waiting(data.waiting);
 
  219         int running(data.running);
 
  221         if ((stats.
count != 0) || (waiting != 0) ||
 
  226             pri[
"job_type"] = data.name();
 
  229                 pri[
"over_target"] = 
true;
 
  232                 pri[
"waiting"] = waiting;
 
  234             if (stats.
count != 0)
 
  235                 pri[
"per_second"] = 
static_cast<int>(stats.
count);
 
  244                 pri[
"in_progress"] = running;
 
  248     ret[
"job_types"] = priorities;
 
  278     using namespace std::chrono_literals;
 
  310         JobType const type = iter->getType();
 
  319             assert(data.waiting > 0);
 
  339     if (data.deferred > 0)
 
  341         assert(data.running + data.waiting >= 
getJobLimit(type));
 
  371                 ceil<microseconds>(start_time - job.
queue_time());
 
  380             if (x_time >= 10ms || q_time >= 10ms)
 
  
void finishJob(JobType type)
 
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
 
Manages partitions for logging.
 
Stream trace() const
Severity stream access functions.
 
@ arrayValue
array value (ordered list)
 
beast::insight::Event execute
 
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
 
int getJobLimit(JobType type)
 
Json::Value getJson(int c=0)
 
clock_type::time_point const  & queue_time() const
Returns the time when the job was queued.
 
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
 
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
 
int getJobCount(JobType t) const
Jobs waiting at this priority.
 
JobTypeData m_invalidJobData
 
beast::insight::Event dequeue
 
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
 
Value & append(const Value &value)
Append value to array at the end.
 
@ objectValue
object value (collection of name/value pairs).
 
beast::insight::Hook hook
 
beast::insight::Collector::ptr m_collector
 
Holds all the 'static' information about a job, which does not change.
 
static JobTypes const  & instance()
 
void processTask(int instance) override
Perform a task.
 
A generic endpoint for log messages.
 
JobTypeData & getJobTypeData(JobType type)
 
virtual void jobQueue(JobType const type)=0
Log queued job.
 
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
 
T forward_as_tuple(T... args)
 
std::atomic_bool stopping_
 
std::atomic_bool stopped_
 
void rendezvous()
Block until no jobs running.
 
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
 
void addTask()
Add a task to be performed.
 
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
 
std::chrono::milliseconds latencyAvg
 
beast::insight::Gauge job_count
 
std::condition_variable cv_
 
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
 
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
 
std::chrono::milliseconds latencyPeak
 
A reference to a handler for performing polled collection.
 
void getNextJob(Job &job)
 
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
 
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
 
T & get(EitherAmount &amt)