20 #include <ripple/app/ledger/Ledger.h>
21 #include <ripple/basics/chrono.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_value.h>
24 #include <ripple/nodestore/Database.h>
25 #include <ripple/protocol/HashPrefix.h>
26 #include <ripple/protocol/jss.h>
38 , scheduler_(scheduler)
39 , ledgersPerShard_(
get<
std::uint32_t>(
45 , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
46 , requestBundle_(
get<int>(config,
"rq_bundle", 4))
47 , readThreads_(
std::max(1, readThreads))
49 assert(readThreads != 0);
52 Throw<std::runtime_error>(
"Invalid ledgers_per_shard");
55 Throw<std::runtime_error>(
"Invalid earliest_seq");
57 if (requestBundle_ < 1 || requestBundle_ > 64)
58 Throw<std::runtime_error>(
"Invalid rq_bundle");
97 for (
auto it =
read.begin(); it !=
read.end(); ++it)
99 assert(!it->second.empty());
101 auto const& hash = it->first;
102 auto const& data = it->second;
103 auto const seqn = data[0].first;
113 for (
auto const& req : data)
116 (seqn == req.first) ||
isSameDB(req.first, seqn)
154 if (shardIndex > earliestShardIndex_)
155 return ledgersPerShard_;
157 if (shardIndex == earliestShardIndex_)
158 return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
160 assert(!
"Invalid shard index");
172 JLOG(
j_.
debug()) <<
"Clearing read queue because of stop request";
178 JLOG(
j_.
debug()) <<
"Waiting for stop request to complete...";
182 auto const start = steady_clock::now();
186 assert(steady_clock::now() - start < 30s);
190 JLOG(
j_.
debug()) <<
"Stop request completed in "
191 << duration_cast<std::chrono::milliseconds>(
192 steady_clock::now() - start)
207 read_[hash].emplace_back(ledgerSeq, std::move(cb));
217 auto storeBatch = [&, fname = __func__]() {
224 JLOG(
j_.
error()) <<
"Exception caught in function " << fname
225 <<
". Error: " << e.
what();
230 for (
auto const& nodeObject : batch)
231 sz += nodeObject->getData().size();
261 auto const begin{steady_clock::now()};
263 auto nodeObject{
fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
264 auto dur = steady_clock::now() - begin;
269 fetchSz_ += nodeObject->getData().size();
273 fetchReport.
elapsed = duration_cast<milliseconds>(dur);
284 JLOG(
j_.
error()) <<
"Source ledger sequence " << srcLedger.
info().
seq
290 return fail(
"Invalid hash");
292 return fail(
"Invalid account hash");
296 return fail(
"Source and destination databases are the same");
300 auto storeBatch = [&, fname = __func__]() {
302 for (
auto const& nodeObject : batch)
303 sz += nodeObject->getData().size();
307 dstBackend->storeBatch(batch);
312 std::string(
"Exception caught in function ") + fname +
313 ". Error: " + e.
what());
336 if (
auto nodeObject = srcDB.fetchNodeObject(
337 node.getHash().as_uint256(), srcLedger.
info().
seq))
353 return fail(
"Invalid state map");
357 return fail(
"Failed to store state map");
364 return fail(
"Invalid transaction map");
368 return fail(
"Failed to store transaction map");
371 if (!batch.
empty() && !storeBatch())
372 return fail(
"Failed to store");
404 obj[jss::node_writes_duration_us] =
std::to_string(c->writeDurationUs);
Holds a collection of configuration values.
@ ledgerMaster
ledger master data for signing
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
std::atomic< std::uint64_t > fetchDurationUs_
Persistency layer for NodeObject.
void read(nudb::detail::istream &is, std::size_t &u)
SHAMapHash getHash() const
Family const & family() const
const std::uint32_t ledgersPerShard_
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
std::atomic< std::uint64_t > fetchTotalCount_
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
std::atomic< std::uint32_t > fetchSz_
Contains information about a fetch operation.
std::atomic< bool > readStopping_
std::atomic< int > readThreads_
std::shared_ptr< SHAMap > snapShot(bool isMutable) const
std::atomic< std::uint32_t > fetchHitCount_
virtual NodeStore::Database & db()=0
std::atomic< std::uint64_t > storeCount_
@ batchWritePreallocationSize
std::atomic< std::uint64_t > storeSz_
LedgerInfo const & info() const override
Returns information about the ledger.
void importInternal(Backend &dstBackend, Database &srcDB)
SHAMap const & stateMap() const
std::condition_variable readCondVar_
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
A generic endpoint for log messages.
Scheduling for asynchronous backend activity.
virtual ~Database()
Destroy the node store.
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
void storeStats(std::uint64_t count, std::uint64_t sz)
SHAMap const & txMap() const
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
T emplace_back(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
void getCountsJson(Json::Value &obj)
const std::uint32_t earliestLedgerSeq_
int add32(std::uint32_t i)
Information about the notional ledger backing the view.
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
std::atomic< int > runningThreads_
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
std::chrono::milliseconds elapsed
T & get(EitherAmount &amt)
A backend used for the NodeStore.