20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/app/rdb/backend/SQLiteDatabase.h>
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/RangeSet.h>
26 #include <ripple/basics/chrono.h>
27 #include <ripple/basics/random.h>
28 #include <ripple/core/ConfigSections.h>
29 #include <ripple/nodestore/DummyScheduler.h>
30 #include <ripple/nodestore/impl/DatabaseShardImp.h>
31 #include <ripple/overlay/Overlay.h>
32 #include <ripple/overlay/predicates.h>
33 #include <ripple/protocol/HashPrefix.h>
34 #include <ripple/protocol/digest.h>
36 #include <boost/algorithm/string/predicate.hpp>
39 #include <sys/statvfs.h>
57 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192ull))
63 Throw<std::runtime_error>(
64 "Attempted to create DatabaseShardImp in reporting mode. Reporting "
65 "does not support shards. Remove shards info from config");
76 JLOG(
j_.
error()) <<
"already initialized";
82 JLOG(
j_.
error()) <<
"invalid configuration file settings";
88 using namespace boost::filesystem;
95 for (
auto const& path : paths)
99 if (!is_directory(path))
101 JLOG(
j_.
error()) << path <<
" must be a directory";
105 else if (!create_directories(path))
108 <<
"failed to create path: " + path.string();
120 ctx_ = std::make_unique<nudb::context>();
125 for (
auto const& path : paths)
127 for (
auto const& it : directory_iterator(path))
130 if (!is_directory(it))
134 auto const shardDir{it.path()};
135 auto dirName{shardDir.stem().string()};
137 dirName.begin(), dirName.end(), [](
auto c) {
138 return ::isdigit(static_cast<unsigned char>(c));
149 <<
"shard " << shardIndex
150 <<
" ignored, comes before earliest shard index "
159 <<
"shard " << shardIndex
160 <<
" previously failed database import, removing";
161 remove_all(shardDir);
165 auto shard{std::make_shared<Shard>(
166 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
170 shard->removeOnDestroy();
172 <<
"shard " << shardIndex <<
" removed, "
173 << (shard->isLegacy() ?
"legacy" :
"corrupted")
178 switch (shard->getState())
183 shards_.emplace(shardIndex, std::move(shard));
188 shards_.emplace(shardIndex, std::move(shard))
198 <<
"more than one shard being acquired";
202 shards_.emplace(shardIndex, std::move(shard));
208 <<
"shard " << shardIndex <<
" invalid state";
216 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
217 <<
". Error: " << e.
what();
240 return it->second->prepare();
255 JLOG(
j_.
debug()) <<
"no new shards to add";
263 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
268 if (!pathDesignation)
271 auto const needsHistoricalPath =
274 auto shard = [
this, shardIndex, needsHistoricalPath] {
276 return std::make_unique<Shard>(
287 auto const ledgerSeq{shard->prepare()};
290 shards_.emplace(*shardIndex, std::move(shard));
301 auto fail = [j =
j_, &shardIndexes](
304 auto multipleIndexPrequel = [&shardIndexes] {
307 shardIndexes.
begin(),
309 indexesAsString.
begin(),
310 [](uint32_t
const index) { return std::to_string(index); });
313 (shardIndexes.
size() > 1 ?
"s " :
" ") +
314 boost::algorithm::join(indexesAsString,
", ");
317 JLOG(j.error()) << (shardIndex ?
"shard " +
std::to_string(*shardIndex)
318 : multipleIndexPrequel())
323 if (shardIndexes.
empty())
324 return fail(
"invalid shard indexes");
330 return fail(
"cannot be stored at this time");
332 auto historicalShardsToPrepare = 0;
334 for (
auto const shardIndex : shardIndexes)
339 "comes before earliest shard index " +
350 return fail(
"invalid index", shardIndex);
357 return fail(
"invalid index", shardIndex);
361 return fail(
"is already stored", shardIndex);
365 "is already queued for import from the shard archive handler",
372 if (shard->index() == shardIndex)
374 "is being imported from the nodestore", shardIndex);
381 ++historicalShardsToPrepare;
388 return fail(
"maximum number of historical shards reached");
390 if (historicalShardsToPrepare)
395 return fail(
"insufficient storage space available");
398 if (
auto const recentShardsToPrepare =
399 shardIndexes.size() - historicalShardsToPrepare;
400 recentShardsToPrepare)
405 return fail(
"insufficient storage space available");
408 for (
auto const shardIndex : shardIndexes)
434 rs.insert(shardIndex);
446 boost::filesystem::path
const& srcDir)
450 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
458 using namespace boost::filesystem;
461 if (!is_directory(srcDir) || is_empty(srcDir))
464 "invalid source directory " + srcDir.string(),
471 std::string(
". Exception caught in function ") + __func__ +
472 ". Error: " + e.
what(),
485 return fail(
"already exists", lock);
489 return fail(
"was not prepared for import", lock);
491 auto const pathDesignation{
493 if (!pathDesignation)
494 return fail(
"failed to import", lock);
503 auto renameDir = [&, fname = __func__](path
const& src, path
const& dst) {
511 std::string(
". Exception caught in function ") + fname +
512 ". Error: " + e.
what(),
519 if (!renameDir(srcDir, dstDir))
523 auto shard{std::make_unique<Shard>(
524 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
530 renameDir(dstDir, srcDir);
534 auto const [it, inserted] = [&]() {
537 return shards_.emplace(shardIndex, std::move(shard));
543 renameDir(dstDir, srcDir);
561 auto const it{
shards_.find(shardIndex)};
568 switch (shard->getState())
573 if (shard->containsLedger(ledgerSeq))
586 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
590 auto ledger{std::make_shared<Ledger>(
595 if (ledger->info().seq != ledgerSeq)
598 "encountered invalid ledger sequence " +
std::to_string(ledgerSeq));
600 if (ledger->info().hash != hash)
603 "encountered invalid ledger hash " +
to_string(hash) +
608 if (!ledger->stateMap().fetchRoot(
609 SHAMapHash{ledger->info().accountHash},
nullptr))
612 "is missing root STATE node on hash " +
to_string(hash) +
616 if (ledger->info().txHash.isNonZero())
618 if (!ledger->txMap().fetchRoot(
622 "is missing root TXN node on hash " +
to_string(hash) +
632 auto const ledgerSeq{ledger->info().seq};
633 if (ledger->info().hash.isZero())
635 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
639 if (ledger->info().accountHash.isZero())
641 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
645 if (ledger->stateMap().getHash().isNonZero() &&
646 !ledger->stateMap().isValid())
648 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
652 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
654 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
668 <<
"shard " << shardIndex <<
" is not being acquired";
672 auto const it{
shards_.find(shardIndex)};
676 <<
"shard " << shardIndex <<
" is not being acquired";
682 if (shard->containsLedger(ledgerSeq))
684 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
707 for (
auto const& [_, shard] :
shards_)
717 for (
auto const& wptr : shards)
719 if (
auto const shard{wptr.lock()})
721 JLOG(
j_.
warn()) <<
" shard " << shard->index() <<
" unexpired";
776 JLOG(
j_.
error()) <<
"database import already in progress";
786 auto shouldHalt = [
this] {
787 bool expected =
true;
813 ledgerSeq = info->seq;
815 if (!ledger || ledgerSeq == 0)
817 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
818 " the SQLite database to import";
829 auto const earliestIndex = [&] {
836 return earliestIndex;
840 auto const latestLedgerSeq = loadLedger(
"desc");
841 if (!latestLedgerSeq)
844 auto const latestIndex = [&] {
854 if (latestIndex < earliestIndex)
856 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
857 " the SQLite database to import";
861 JLOG(
j_.
debug()) <<
"Importing ledgers for shards " << earliestIndex
862 <<
" through " << latestIndex;
869 earliestIndex, latestIndex, 0);
873 for (
std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
879 auto const pathDesignation = [
this, shardIndex] {
883 auto const pathDesignation =
886 return pathDesignation;
889 if (!pathDesignation)
899 <<
"shard " << shardIndex <<
" already being acquired";
907 <<
"shard " << shardIndex <<
" already being imported";
914 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already stored";
925 auto const ledgerHashes{
928 if (ledgerHashes.size() !=
maxLedgers(shardIndex))
936 if (!source.fetchNodeObject(ledgerHashes.at(n).ledgerHash, n))
938 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
939 <<
" mismatches node store";
951 bool const needsHistoricalPath =
954 auto const path = needsHistoricalPath
959 auto shard{std::make_shared<Shard>(
app_, *
this, shardIndex, path,
j_)};
982 JLOG(
j_.
error()) <<
"shard " << shardIndex
983 <<
" failed to create temp marker file";
984 shard->removeOnDestroy();
993 while (
auto const ledgerSeq = shard->prepare())
1000 if (!ledger || ledger->info().seq != ledgerSeq)
1003 auto const result{shard->storeLedger(ledger, recentStored)};
1008 if (!shard->setLedgerStored(ledger))
1011 if (!lastLedgerHash && ledgerSeq == lastSeq)
1012 lastLedgerHash = ledger->info().hash;
1014 recentStored = std::move(ledger);
1020 using namespace boost::filesystem;
1021 bool success{
false};
1033 if (shard->storeNodeObject(nodeObject))
1041 remove_all(markerFile);
1043 JLOG(
j_.
debug()) <<
"shard " << shardIndex
1044 <<
" was successfully imported"
1045 " from the NodeStore";
1047 shards_.emplace(shardIndex, std::move(shard))
1061 JLOG(
j_.
fatal()) <<
"shard index " << shardIndex
1062 <<
". Exception caught in function "
1063 << __func__ <<
". Error: " << e.
what();
1070 JLOG(
j_.
error()) <<
"shard " << shardIndex
1071 <<
" failed to import from the NodeStore";
1074 shard->removeOnDestroy();
1098 return shard->getWriteLoad();
1115 <<
"shard " << shardIndex <<
" is not being acquired";
1119 auto const it{
shards_.find(shardIndex)};
1123 <<
"shard " << shardIndex <<
" is not being acquired";
1129 auto const nodeObject{
1131 if (shard->storeNodeObject(nodeObject))
1138 auto const ledgerSeq{srcLedger->info().seq};
1148 <<
"shard " << shardIndex <<
" is not being acquired";
1152 auto const it{
shards_.find(shardIndex)};
1156 <<
"shard " << shardIndex <<
" is not being acquired";
1162 auto const result{shard->storeLedger(srcLedger,
nullptr)};
1164 if (result.error || result.count == 0 || result.size == 0)
1186 for (
auto const& weak : shards)
1188 if (
auto const shard{weak.lock()}; shard && shard->isOpen())
1197 JLOG(
j_.
trace()) <<
"Open shards exceed configured limit of "
1208 return lhsShard->getLastUse() < rhsShard->getLastUse();
1211 for (
auto it{openFinals.
cbegin()};
1214 if ((*it)->tryClose())
1215 it = openFinals.
erase(it);
1238 currentShard[jss::storedSeqs] = shard->getStoredSeqs();
1240 ret[jss::currentShard] = currentShard;
1243 ret[jss::message] =
"Database import halt initiated...";
1261 rpcINTERNAL,
"Database import already in progress");
1269 result[jss::message] =
"Database import initiated...";
1291 result[jss::message] =
"Database import halt initiated...";
1320 get_if_exists<std::uint32_t>(section, name, shardDBValue);
1323 get_if_exists<std::uint32_t>(
1326 return shardDBValue == nodeDBValue;
1335 "ledgers_per_shard" +
"' values");
1341 "earliest_seq" +
"' values");
1344 using namespace boost::filesystem;
1345 if (!get_if_exists<path>(section,
"path",
dir_))
1346 return fail(
"'path' missing");
1351 Section const& historicalShardPaths =
1352 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1354 auto values = historicalShardPaths.
values();
1356 std::sort(values.begin(), values.end());
1357 values.erase(
std::unique(values.begin(), values.end()), values.end());
1359 for (
auto const& s : values)
1361 auto const dir = path(s);
1365 "the 'path' cannot also be in the "
1366 "'historical_shard_path' section");
1376 return fail(
"'type' value unsupported");
1392 auto const it{
shards_.find(shardIndex)};
1398 return shard->fetchNodeObject(hash, fetchReport);
1407 return std::nullopt;
1409 auto const maxShardIndex{[
this, validLedgerSeq]() {
1418 if (
shards_.size() >= maxNumShards)
1419 return std::nullopt;
1421 if (maxShardIndex < 1024 ||
1422 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1440 return std::nullopt;
1452 for (
int i = 0; i < 40; ++i)
1463 return std::nullopt;
1469 bool const writeSQLite,
1479 auto shard{wptr.lock()};
1482 JLOG(
j_.
debug()) <<
"Shard removed before being finalized";
1486 if (!shard->finalize(writeSQLite, expectedHash))
1503 if (shard->index() < boundaryIndex)
1507 shard->getDir().parent_path() ==
dir_)
1510 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1511 <<
" is not stored at a historical path";
1517 assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1521 if (shard->index() == boundaryIndex)
1526 if (shard->getDir().parent_path() !=
dir_)
1528 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1529 <<
" is not stored at the path";
1557 for (
auto const& weak : shards)
1559 if (
auto const shard{weak.lock()}; shard)
1561 auto const [sz, fd] = shard->getFileInfo();
1584 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1595 <<
"maximum shard store size exceeds available storage space";
1619 auto const availableSpace =
1620 boost::filesystem::space(path).available;
1640 if (numShards <= shardCap)
1643 numShards -= shardCap;
1648 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
1649 <<
". Error: " << e.
what();
1661 if (!shard->setLedgerStored(ledger))
1671 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1681 <<
"shard " << shard->index() <<
" is no longer being acquired";
1705 shard->removeOnDestroy();
1735 shards_.begin(),
shards_.end(), [boundaryIndex](
auto const& entry) {
1736 return entry.first < boundaryIndex;
1749 auto const latestShardIndex =
1753 auto const removeShard = [
this](
std::uint32_t const shardIndex) ->
void {
1762 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1767 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1771 auto const keepShard = [
this, &lock, removeShard, separateHistoricalPath](
1775 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1776 removeShard(shardIndex);
1779 if (separateHistoricalPath &&
1782 JLOG(
j_.
error()) <<
"insufficient storage space available";
1783 removeShard(shardIndex);
1792 auto const moveShard = [
this,
1794 auto it{
shards_.find(shardIndex)};
1797 JLOG(
j_.
warn()) <<
"can't find shard to move to historical path";
1801 auto& shard{it->second};
1806 if (!shard->tryClose())
1808 JLOG(
j_.
warn()) <<
"can't close shard to move to historical path";
1816 boost::filesystem::rename(
1821 JLOG(
j_.
error()) <<
"shard " << shardIndex
1822 <<
" failed to move to historical storage";
1827 shard = std::make_shared<Shard>(
app_, *
this, shardIndex, dst,
j_);
1832 JLOG(
j_.
error()) <<
"shard " << shardIndex
1833 <<
" failed to open in historical storage";
1834 shard->removeOnDestroy();
1840 bool const curNotSynched =
1847 if (curNotSynched || prevNotSynched)
1852 if (keepShard(*prev) && separateHistoricalPath)
1855 prev = std::nullopt;
1861 if (cur == latestShardIndex - 1)
1868 if (keepShard(*cur) && separateHistoricalPath)
1885 auto const isHistoricalShard = shardIndex < boundaryIndex;
1894 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1896 return std::nullopt;
1900 JLOG(
j_.
error()) <<
"insufficient storage space available";
1902 return std::nullopt;
1908 boost::filesystem::path
1916 boost::filesystem::path historicalShardPath;
1925 if (potentialPaths.
empty())
1927 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1932 potentialPaths.
begin(),
1933 potentialPaths.
end(),
1934 &historicalShardPath,
1938 return historicalShardPath;
1953 struct statvfs buffer;
1954 if (statvfs(path.c_str(), &buffer))
1957 <<
"failed to acquire stats for 'historical_shard_path': "
1962 filesystemIDs[buffer.f_fsid].push_back(path.string());
1966 for (
auto const& entry : filesystemIDs)
1969 if (entry.second.size() > 1)
1974 <<
"The following paths correspond to the same filesystem: "
1975 << boost::algorithm::join(entry.second,
", ")
1976 <<
". Each configured historical storage path should"
1977 " be on a unique device or filesystem.";
1998 uniqueCapacities[boost::filesystem::space(path).available].push_back(
2001 for (
auto const& entry : uniqueCapacities)
2004 if (entry.second.size() > 1)
2009 <<
"Each of the following paths have " << entry.first
2010 <<
" bytes free, and may be located on the same device"
2012 << boost::algorithm::join(entry.second,
", ")
2013 <<
". Each configured historical storage path should"
2014 " be on a unique device or file system.";
2025 std::function<
bool(soci::session& session)>
const& callback)
2029 JLOG(
j_.
warn()) <<
"callForLedgerSQLByLedgerSeq ledger seq too early: "
2039 const uint32_t shardIndex,
2040 std::function<
bool(soci::session& session)>
const& callback)
2044 auto const it{
shards_.find(shardIndex)};
2048 it->second->callForLedgerSQL(callback);
2054 std::function<
bool(soci::session& session)>
const& callback)
2063 std::function<
bool(soci::session& session)>
const& callback)
2067 auto const it{
shards_.find(shardIndex)};
2071 it->second->callForTransactionSQL(callback);
2086 it =
shards_.lower_bound(*minShardIndex);
2090 for (; it != eit; it++)
2094 if (!visit(*it->second))
2109 minShardIndex, [&callback](
Shard& shard) ->
bool {
2121 minShardIndex, [&callback](
Shard& shard) ->
bool {
2142 for (; it != eit; it++)
2145 (!maxShardIndex || it->first <= *maxShardIndex))
2147 if (!visit(*it->second))
2180 auto shardInfo{std::make_unique<ShardInfo>()};
2181 for (
auto const& [_, shard] :
shards_)
2184 shard->index(), shard->getState(), shard->getPercentProgress());
2208 message, protocol::mtPEER_SHARD_INFO_V2)));
2245 if (section.empty())
2248 return std::make_unique<DatabaseShardImp>(app, scheduler, readThreads, j);