20 #ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
21 #define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
23 #include <ripple/basics/LocalValue.h>
24 #include <ripple/core/ClosureCounter.h>
25 #include <ripple/core/JobTypeData.h>
26 #include <ripple/core/JobTypes.h>
27 #include <ripple/core/impl/Workers.h>
28 #include <ripple/json/json_value.h>
29 #include <boost/coroutine/all.hpp>
30 #include <boost/range/begin.hpp>
31 #include <boost/range/end.hpp>
70 boost::coroutines::asymmetric_coroutine<void>::pull_type
coro_;
71 boost::coroutines::asymmetric_coroutine<void>::push_type*
yield_;
163 decltype(std::declval<JobHandler&&>()()),
168 if (
auto optionalCountedJob =
405 #include <ripple/core/Coro.ipp>
417 auto coro = std::make_shared<Coro>(
423 coro->expectEarlyExit();
void finishJob(JobType type)
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Manages partitions for logging.
bool post()
Schedule coroutine execution.
int getJobLimit(JobType type)
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
Json::Value getJson(int c=0)
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Called to perform tasks as needed.
Coroutines must run to completion.
int getJobCount(JobType t) const
Jobs waiting at this priority.
JobTypeData m_invalidJobData
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
void resume()
Resume coroutine execution.
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
beast::insight::Hook hook
std::condition_variable cv_
beast::insight::Collector::ptr m_collector
void processTask(int instance) override
Perform a task.
A generic endpoint for log messages.
JobTypeData & getJobTypeData(JobType type)
void yield() const
Suspend coroutine execution.
std::atomic_bool stopping_
std::atomic_bool stopped_
void rendezvous()
Block until no jobs running.
void join()
Waits until coroutine returns from the user function.
Workers is effectively a thread pool.
A metric for measuring an integral value.
A pool of threads to perform work.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
beast::insight::Gauge job_count
std::condition_variable cv_
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
A reference to a handler for performing polled collection.
void getNextJob(Job &job)
Coro & operator=(Coro const &)=delete
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.