20 #include <ripple/app/ledger/InboundLedger.h>
21 #include <ripple/app/main/DBInit.h>
22 #include <ripple/app/rdb/backend/detail/Shard.h>
23 #include <ripple/basics/StringUtilities.h>
24 #include <ripple/core/ConfigSections.h>
25 #include <ripple/nodestore/Manager.h>
26 #include <ripple/nodestore/impl/DeterministicShard.h>
27 #include <ripple/nodestore/impl/Shard.h>
28 #include <ripple/protocol/digest.h>
40 :
Shard(app, db, index,
"", j)
48 boost::filesystem::path
const& dir,
53 , firstSeq_(db.firstLedgerSeq(index))
54 , lastSeq_(
std::max(firstSeq_, db.lastLedgerSeq(index)))
55 , maxLedgers_(db.maxLedgers(index))
56 , dir_((dir.empty() ? db.getRootDir() : dir) /
std::
to_string(index_))
71 <<
" backend in use, unable to remove directory";
84 boost::filesystem::remove_all(
dir_);
89 <<
". Exception caught in function " << __func__
90 <<
". Error: " << e.
what();
102 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" failed to find factory for "
106 section.set(
"path",
dir_.string());
111 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" already initialized";
114 backend_ = factory->createInstance(
132 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
136 return backend_->isOpen();
154 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
157 if (!backend_->isOpen())
167 <<
". Exception caught in function " << __func__
168 <<
". Error: " << e.
what();
172 lgrSQLiteDB_.reset();
174 acquireInfo_.reset();
189 <<
" prepare called when not acquiring";
197 <<
" missing acquire SQLite database";
201 if (acquireInfo_->storedSeqs.empty())
212 if (nodeObject->getHash() !=
finalKey)
227 backend_->store(nodeObject);
232 <<
". Exception caught in function " << __func__
233 <<
". Error: " << e.
what();
254 status = backend_->fetch(hash.
data(), &nodeObject);
259 <<
". Exception caught in function " << __func__
260 <<
". Error: " << e.
what();
271 <<
"shard " <<
index_ <<
". Corrupt node object at hash "
277 <<
"shard " <<
index_ <<
". Unknown status=" << status
278 <<
" fetching node object at hash " <<
to_string(hash);
303 JLOG(
j_.
trace()) <<
"shard " <<
index_ <<
". Ledger already stored";
308 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
". Source ledger sequence "
309 << srcLedger->info().seq <<
". " << msg;
314 if (srcLedger->info().hash.isZero())
315 return fail(
"Invalid hash");
316 if (srcLedger->info().accountHash.isZero())
317 return fail(
"Invalid account hash");
319 auto& srcDB{
const_cast<Database&
>(srcLedger->stateMap().family().db())};
321 return fail(
"Source and destination databases are the same");
325 return fail(
"Failed to lock backend");
329 auto storeBatch = [&]() {
331 for (
auto const& nodeObject : batch)
332 sz += nodeObject->getData().size();
337 backend_->storeBatch(batch);
342 std::string(
". Exception caught in function ") + __func__ +
343 ". Error: " + e.
what());
347 result.
count += batch.size();
357 addRaw(srcLedger->info(), s);
367 if (
auto nodeObject = srcDB.fetchNodeObject(
368 node.getHash().as_uint256(), srcLedger->info().seq))
381 if (srcLedger->stateMap().getHash().isNonZero())
383 if (!srcLedger->stateMap().isValid())
384 return fail(
"Invalid state map");
386 if (next && next->info().parentHash == srcLedger->info().hash)
388 auto have = next->stateMap().snapShot(
false);
389 srcLedger->stateMap().snapShot(
false)->visitDifferences(
393 srcLedger->stateMap().snapShot(
false)->visitNodes(visit);
395 return fail(
"Failed to store state map");
399 if (srcLedger->info().txHash.isNonZero())
401 if (!srcLedger->txMap().isValid())
402 return fail(
"Invalid transaction map");
404 srcLedger->txMap().snapShot(
false)->visitNodes(visit);
406 return fail(
"Failed to store transaction map");
409 if (!batch.
empty() && !storeBatch())
410 return fail(
"Failed to store");
430 auto const ledgerSeq{ledger->info().seq};
431 if (ledgerSeq < firstSeq_ || ledgerSeq >
lastSeq_)
432 return fail(
"Invalid ledger sequence " +
std::to_string(ledgerSeq));
445 return fail(
"Missing acquire SQLite database");
447 if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))
450 JLOG(
j_.
debug()) <<
"shard " <<
index_ <<
" ledger sequence "
451 << ledgerSeq <<
" already stored";
457 return fail(
"Failed to store ledger");
462 acquireInfo_->storedSeqs.insert(ledgerSeq);
466 auto session{acquireInfo_->SQLiteDB->checkoutDb()};
467 soci::blob sociBlob(*session);
472 auto const sHash{
to_string(ledger->info().hash)};
473 *session <<
"UPDATE Shard "
474 "SET LastLedgerHash = :lastLedgerHash,"
475 "StoredLedgerSeqs = :storedLedgerSeqs "
476 "WHERE ShardIndex = :shardIndex;",
477 soci::use(sHash), soci::use(sociBlob), soci::use(
index_);
481 *session <<
"UPDATE Shard "
482 "SET StoredLedgerSeqs = :storedLedgerSeqs "
483 "WHERE ShardIndex = :shardIndex;",
484 soci::use(sociBlob), soci::use(
index_);
489 acquireInfo_->storedSeqs.erase(ledgerSeq);
491 std::string(
". Exception caught in function ") + __func__ +
492 ". Error: " + e.
what());
496 progress_ = boost::icl::length(acquireInfo_->storedSeqs);
501 JLOG(
j_.
trace()) <<
"shard " <<
index_ <<
" stored ledger sequence "
509 if (ledgerSeq < firstSeq_ || ledgerSeq >
lastSeq_)
518 <<
" missing acquire SQLite database";
521 return boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq);
524 std::chrono::steady_clock::time_point
535 return {fileSz_, fdRequired_};
545 return backend_->getWriteLoad();
566 << (hash.isZero() ?
""
568 << (ledgerSeq == 0 ?
""
569 :
". Ledger sequence " +
586 backend_->fetch(
finalKey.
data(), &nodeObject) == Status::ok)
590 nodeObject->getData().data(), nodeObject->getData().size());
592 return fail(
"invalid version");
595 return fail(
"out of range ledger sequences");
598 return fail(
"invalid last ledger hash");
605 return fail(
"missing acquire SQLite database");
608 *acquireInfo_->SQLiteDB->checkoutDb(),
index_);
611 return fail(
"missing or invalid ShardIndex");
614 return fail(
"missing LastLedgerHash");
616 if (!hash.parseHex(*seqshash.hash) || hash.isZero())
617 return fail(
"invalid LastLedgerHash");
619 if (!seqshash.sequences)
620 return fail(
"missing StoredLedgerSeqs");
622 auto& storedSeqs{acquireInfo_->storedSeqs};
623 if (!
from_string(storedSeqs, *seqshash.sequences) ||
624 boost::icl::first(storedSeqs) !=
firstSeq_ ||
625 boost::icl::last(storedSeqs) !=
lastSeq_ ||
628 return fail(
"invalid StoredLedgerSeqs");
635 std::string(
". Exception caught in function ") + __func__ +
636 ". Error: " + e.
what());
641 if (referenceHash && *referenceHash != hash)
642 return fail(
"invalid last ledger hash");
648 auto const lastLedgerHash{hash};
651 auto const treeNodeCache{shardFamily.getTreeNodeCache(
lastSeq_)};
654 fullBelowCache->reset();
655 treeNodeCache->reset();
666 return fail(
"Failed to create deterministic shard");
678 return fail(
"invalid ledger");
680 ledger = std::make_shared<Ledger>(
684 if (ledger->info().seq != ledgerSeq)
685 return fail(
"invalid ledger sequence");
686 if (ledger->info().hash != hash)
687 return fail(
"invalid ledger hash");
689 ledger->stateMap().setLedgerSeq(ledgerSeq);
690 ledger->txMap().setLedgerSeq(ledgerSeq);
691 ledger->setImmutable();
692 if (!ledger->stateMap().fetchRoot(
693 SHAMapHash{ledger->info().accountHash},
nullptr))
695 return fail(
"missing root STATE node");
697 if (ledger->info().txHash.isNonZero() &&
698 !ledger->txMap().fetchRoot(
701 return fail(
"missing root TXN node");
705 return fail(
"failed to verify ledger");
707 if (!dShard->store(nodeObject))
708 return fail(
"failed to store node object");
711 return fail(
"failed storing to SQLite databases");
714 ledger->info().seq == ledgerSeq &&
718 hash = ledger->info().parentHash;
719 next = std::move(ledger);
726 fullBelowCache->reset();
727 treeNodeCache->reset();
766 auto const nodeObject{
768 if (!dShard->store(nodeObject))
769 return fail(
"failed to store node object");
776 backend_->store(nodeObject);
792 lgrSQLiteDB_.reset();
798 acquireInfo_.reset();
806 remove(
dir_ /
"nudb.key");
807 remove(
dir_ /
"nudb.dat");
808 rename(dShard->getDir() /
"nudb.key",
dir_ /
"nudb.key");
809 rename(dShard->getDir() /
"nudb.dat",
dir_ /
"nudb.dat");
813 return fail(
"failed to open");
823 std::string(
". Exception caught in function ") + __func__ +
824 ". Error: " + e.
what());
833 using namespace boost::filesystem;
835 auto preexist{
false};
838 lgrSQLiteDB_.reset();
840 acquireInfo_.reset();
854 auto createAcquireInfo = [
this, &config]() REQUIRES(
mutex_) {
856 setup.
startUp = config.standalone() ? config.LOAD : config.START_UP;
861 acquireInfo_ = std::make_unique<AcquireInfo>();
873 preexist = exists(
dir_);
874 backend_->open(!preexist);
887 acquireInfo_->SQLiteDB->getSession(),
index_);
890 return fail(
"invalid acquire SQLite database");
894 auto& storedSeqs{acquireInfo_->storedSeqs};
896 return fail(
"invalid StoredLedgerSeqs");
898 if (boost::icl::first(storedSeqs) <
firstSeq_ ||
899 boost::icl::last(storedSeqs) >
lastSeq_)
901 return fail(
"invalid StoredLedgerSeqs");
905 progress_ = boost::icl::length(storedSeqs);
914 if (backend_->fetch(
finalKey.
data(), &nodeObject) != Status::ok)
917 return fail(
"incompatible, missing backend final key");
922 nodeObject->getData().data(), nodeObject->getData().size());
924 return fail(
"invalid version");
927 return fail(
"out of range ledger sequences");
930 return fail(
"invalid last ledger hash");
946 std::string(
". Exception caught in function ") + __func__ +
947 ". Error: " + e.
what());
963 setup.
startUp = config.standalone() ? config.LOAD : config.START_UP;
973 lgrSQLiteDB_.reset();
985 lgrSQLiteDB_ = std::move(lgr);
986 lgrSQLiteDB_->getSession() << boost::str(
987 boost::format(
"PRAGMA cache_size=-%d;") %
991 txSQLiteDB_ = std::move(tx);
992 txSQLiteDB_->getSession() << boost::str(
993 boost::format(
"PRAGMA cache_size=-%d;") %
1009 lgrSQLiteDB_ = std::move(lgr);
1010 lgrSQLiteDB_->getSession() << boost::str(
1011 boost::format(
"PRAGMA cache_size=-%d;") %
1014 txSQLiteDB_ = std::move(tx);
1015 txSQLiteDB_->getSession() << boost::str(
1016 boost::format(
"PRAGMA cache_size=-%d;") %
1025 <<
". Exception caught in function " << __func__
1026 <<
". Error: " << e.
what();
1044 *txSQLiteDB_->checkoutDb(),
1045 *lgrSQLiteDB_->checkoutDb(),
1058 if (!acquireInfo_->storedSeqs.empty())
1059 s =
to_string(acquireInfo_->storedSeqs);
1062 acquireInfo_->SQLiteDB->getSession(),
1072 <<
". Exception caught in function " << __func__
1073 <<
". Error: " << e.
what();
1087 using namespace boost::filesystem;
1088 for (
auto const& d : directory_iterator(
dir_))
1090 if (is_regular_file(d))
1092 fileSz_ += file_size(d);
1100 <<
". Exception caught in function " << __func__
1101 <<
". Error: " << e.
what();
1112 JLOG(j.error()) <<
"shard " <<
index <<
". " << msg
1113 << (ledger->info().hash.isZero() ?
""
1114 :
". Ledger hash " +
1116 << (ledger->info().seq == 0 ?
""
1117 :
". Ledger sequence " +
1122 if (ledger->info().hash.isZero())
1123 return fail(
"Invalid ledger hash");
1124 if (ledger->info().accountHash.isZero())
1125 return fail(
"Invalid ledger account hash");
1128 auto visit = [
this, &error, &dShard](
SHAMapTreeNode const& node) {
1132 auto nodeObject{
verifyFetch(node.getHash().as_uint256())};
1133 if (!nodeObject || !dShard->store(nodeObject))
1140 if (ledger->stateMap().getHash().isNonZero())
1142 if (!ledger->stateMap().isValid())
1143 return fail(
"Invalid state map");
1147 if (next && next->info().parentHash == ledger->info().hash)
1148 ledger->stateMap().visitDifferences(&next->stateMap(), visit);
1150 ledger->stateMap().visitNodes(visit);
1155 std::string(
". Exception caught in function ") + __func__ +
1156 ". Error: " + e.
what());
1162 return fail(
"Invalid state map");
1166 if (ledger->info().txHash.isNonZero())
1168 if (!ledger->txMap().isValid())
1169 return fail(
"Invalid transaction map");
1173 ledger->txMap().visitNodes(visit);
1178 std::string(
". Exception caught in function ") + __func__ +
1179 ". Error: " + e.
what());
1185 return fail(
"Invalid transaction map");
1197 JLOG(j.error()) <<
"shard " <<
index <<
". " << msg
1198 <<
". Node object hash " <<
to_string(hash);
1206 switch (backend_->fetch(hash.data(), &nodeObject))
1210 if (nodeObject->getHash() !=
1212 return fail(
"Node object hash does not match payload");
1215 return fail(
"Missing node object");
1217 return fail(
"Corrupt node object");
1219 return fail(
"Unknown error");
1225 std::string(
". Exception caught in function ") + __func__ +
1226 ". Error: " + e.
what());
1239 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
1242 if (!backend_->isOpen())
1255 std::function<
bool(soci::session& session)>
const& callback,
1258 return callback(*db);
1267 return callback(*db,
index_);