rippled
DatabaseShardImp.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2017 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/app/rdb/backend/SQLiteDatabase.h>
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/RangeSet.h>
26 #include <ripple/basics/chrono.h>
27 #include <ripple/basics/random.h>
28 #include <ripple/core/ConfigSections.h>
29 #include <ripple/nodestore/DummyScheduler.h>
30 #include <ripple/nodestore/impl/DatabaseShardImp.h>
31 #include <ripple/overlay/Overlay.h>
32 #include <ripple/overlay/predicates.h>
33 #include <ripple/protocol/HashPrefix.h>
34 #include <ripple/protocol/digest.h>
35 
36 #include <boost/algorithm/string/predicate.hpp>
37 
38 #if BOOST_OS_LINUX
39 #include <sys/statvfs.h>
40 #endif
41 
42 namespace ripple {
43 
44 namespace NodeStore {
45 
47  Application& app,
48  Scheduler& scheduler,
49  int readThreads,
51  : DatabaseShard(
52  scheduler,
53  readThreads,
54  app.config().section(ConfigSection::shardDatabase()),
55  j)
56  , app_(app)
57  , avgShardFileSz_(ledgersPerShard_ * kilobytes(192ull))
58  , openFinalLimit_(
59  app.config().getValueFor(SizedItem::openFinalLimit, std::nullopt))
60 {
61  if (app.config().reporting())
62  {
63  Throw<std::runtime_error>(
64  "Attempted to create DatabaseShardImp in reporting mode. Reporting "
65  "does not support shards. Remove shards info from config");
66  }
67 }
68 
69 bool
71 {
72  {
73  std::lock_guard lock(mutex_);
74  if (init_)
75  {
76  JLOG(j_.error()) << "already initialized";
77  return false;
78  }
79 
80  if (!initConfig(lock))
81  {
82  JLOG(j_.error()) << "invalid configuration file settings";
83  return false;
84  }
85 
86  try
87  {
88  using namespace boost::filesystem;
89 
90  // Consolidate the main storage path and all historical paths
91  std::vector<path> paths{dir_};
92  paths.insert(
93  paths.end(), historicalPaths_.begin(), historicalPaths_.end());
94 
95  for (auto const& path : paths)
96  {
97  if (exists(path))
98  {
99  if (!is_directory(path))
100  {
101  JLOG(j_.error()) << path << " must be a directory";
102  return false;
103  }
104  }
105  else if (!create_directories(path))
106  {
107  JLOG(j_.error())
108  << "failed to create path: " + path.string();
109  return false;
110  }
111  }
112 
114  {
115  // Check historical paths for duplicated file systems
116  if (!checkHistoricalPaths(lock))
117  return false;
118  }
119 
120  ctx_ = std::make_unique<nudb::context>();
121  ctx_->start();
122 
123  // Find shards
124  std::uint32_t openFinals{0};
125  for (auto const& path : paths)
126  {
127  for (auto const& it : directory_iterator(path))
128  {
129  // Ignore files
130  if (!is_directory(it))
131  continue;
132 
133  // Ignore nonnumerical directory names
134  auto const shardDir{it.path()};
135  auto dirName{shardDir.stem().string()};
136  if (!std::all_of(
137  dirName.begin(), dirName.end(), [](auto c) {
138  return ::isdigit(static_cast<unsigned char>(c));
139  }))
140  {
141  continue;
142  }
143 
144  // Ignore values below the earliest shard index
145  auto const shardIndex{std::stoul(dirName)};
146  if (shardIndex < earliestShardIndex_)
147  {
148  JLOG(j_.debug())
149  << "shard " << shardIndex
150  << " ignored, comes before earliest shard index "
152  continue;
153  }
154 
155  // Check if a previous database import failed
156  if (is_regular_file(shardDir / databaseImportMarker_))
157  {
158  JLOG(j_.warn())
159  << "shard " << shardIndex
160  << " previously failed database import, removing";
161  remove_all(shardDir);
162  continue;
163  }
164 
165  auto shard{std::make_shared<Shard>(
166  app_, *this, shardIndex, shardDir.parent_path(), j_)};
167  if (!shard->init(scheduler_, *ctx_))
168  {
169  // Remove corrupted or legacy shard
170  shard->removeOnDestroy();
171  JLOG(j_.warn())
172  << "shard " << shardIndex << " removed, "
173  << (shard->isLegacy() ? "legacy" : "corrupted")
174  << " shard";
175  continue;
176  }
177 
178  switch (shard->getState())
179  {
181  if (++openFinals > openFinalLimit_)
182  shard->tryClose();
183  shards_.emplace(shardIndex, std::move(shard));
184  break;
185 
188  shards_.emplace(shardIndex, std::move(shard))
189  .first->second,
190  true,
191  std::nullopt);
192  break;
193 
194  case ShardState::acquire:
195  if (acquireIndex_ != 0)
196  {
197  JLOG(j_.error())
198  << "more than one shard being acquired";
199  return false;
200  }
201 
202  shards_.emplace(shardIndex, std::move(shard));
203  acquireIndex_ = shardIndex;
204  break;
205 
206  default:
207  JLOG(j_.error())
208  << "shard " << shardIndex << " invalid state";
209  return false;
210  }
211  }
212  }
213  }
214  catch (std::exception const& e)
215  {
216  JLOG(j_.fatal()) << "Exception caught in function " << __func__
217  << ". Error: " << e.what();
218  return false;
219  }
220 
221  init_ = true;
222  }
223 
224  updateFileStats();
225  return true;
226 }
227 
230 {
231  std::optional<std::uint32_t> shardIndex;
232 
233  {
234  std::lock_guard lock(mutex_);
235  assert(init_);
236 
237  if (acquireIndex_ != 0)
238  {
239  if (auto const it{shards_.find(acquireIndex_)}; it != shards_.end())
240  return it->second->prepare();
241 
242  // Should never get here
243  assert(false);
244  return std::nullopt;
245  }
246 
247  if (!canAdd_)
248  return std::nullopt;
249 
250  shardIndex = findAcquireIndex(validLedgerSeq, lock);
251  }
252 
253  if (!shardIndex)
254  {
255  JLOG(j_.debug()) << "no new shards to add";
256  {
257  std::lock_guard lock(mutex_);
258  canAdd_ = false;
259  }
260  return std::nullopt;
261  }
262 
263  auto const pathDesignation = [this, shardIndex = *shardIndex]() {
264  std::lock_guard lock(mutex_);
265  return prepareForNewShard(shardIndex, numHistoricalShards(lock), lock);
266  }();
267 
268  if (!pathDesignation)
269  return std::nullopt;
270 
271  auto const needsHistoricalPath =
272  *pathDesignation == PathDesignation::historical;
273 
274  auto shard = [this, shardIndex, needsHistoricalPath] {
275  std::lock_guard lock(mutex_);
276  return std::make_unique<Shard>(
277  app_,
278  *this,
279  *shardIndex,
280  (needsHistoricalPath ? chooseHistoricalPath(lock) : ""),
281  j_);
282  }();
283 
284  if (!shard->init(scheduler_, *ctx_))
285  return std::nullopt;
286 
287  auto const ledgerSeq{shard->prepare()};
288  {
289  std::lock_guard lock(mutex_);
290  shards_.emplace(*shardIndex, std::move(shard));
291  acquireIndex_ = *shardIndex;
292  updatePeers(lock);
293  }
294 
295  return ledgerSeq;
296 }
297 
298 bool
300 {
301  auto fail = [j = j_, &shardIndexes](
302  std::string const& msg,
303  std::optional<std::uint32_t> shardIndex = std::nullopt) {
304  auto multipleIndexPrequel = [&shardIndexes] {
305  std::vector<std::string> indexesAsString(shardIndexes.size());
307  shardIndexes.begin(),
308  shardIndexes.end(),
309  indexesAsString.begin(),
310  [](uint32_t const index) { return std::to_string(index); });
311 
312  return std::string("shard") +
313  (shardIndexes.size() > 1 ? "s " : " ") +
314  boost::algorithm::join(indexesAsString, ", ");
315  };
316 
317  JLOG(j.error()) << (shardIndex ? "shard " + std::to_string(*shardIndex)
318  : multipleIndexPrequel())
319  << " " << msg;
320  return false;
321  };
322 
323  if (shardIndexes.empty())
324  return fail("invalid shard indexes");
325 
326  std::lock_guard lock(mutex_);
327  assert(init_);
328 
329  if (!canAdd_)
330  return fail("cannot be stored at this time");
331 
332  auto historicalShardsToPrepare = 0;
333 
334  for (auto const shardIndex : shardIndexes)
335  {
336  if (shardIndex < earliestShardIndex_)
337  {
338  return fail(
339  "comes before earliest shard index " +
341  shardIndex);
342  }
343 
344  // If we are synced to the network, check if the shard index is
345  // greater or equal to the current or validated shard index.
346  auto seqCheck = [&](std::uint32_t ledgerSeq) {
347  if (ledgerSeq >= earliestLedgerSeq_ &&
348  shardIndex >= seqToShardIndex(ledgerSeq))
349  {
350  return fail("invalid index", shardIndex);
351  }
352  return true;
353  };
354  if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex() + 1) ||
356  {
357  return fail("invalid index", shardIndex);
358  }
359 
360  if (shards_.find(shardIndex) != shards_.end())
361  return fail("is already stored", shardIndex);
362 
363  if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
364  return fail(
365  "is already queued for import from the shard archive handler",
366  shardIndex);
367 
369  {
370  if (auto shard = databaseImportStatus_->currentShard.lock(); shard)
371  {
372  if (shard->index() == shardIndex)
373  return fail(
374  "is being imported from the nodestore", shardIndex);
375  }
376  }
377 
378  // Any shard earlier than the two most recent shards
379  // is a historical shard
380  if (shardIndex < shardBoundaryIndex())
381  ++historicalShardsToPrepare;
382  }
383 
384  auto const numHistShards = numHistoricalShards(lock);
385 
386  // Check shard count and available storage space
387  if (numHistShards + historicalShardsToPrepare > maxHistoricalShards_)
388  return fail("maximum number of historical shards reached");
389 
390  if (historicalShardsToPrepare)
391  {
392  // Check available storage space for historical shards
393  if (!sufficientStorage(
394  historicalShardsToPrepare, PathDesignation::historical, lock))
395  return fail("insufficient storage space available");
396  }
397 
398  if (auto const recentShardsToPrepare =
399  shardIndexes.size() - historicalShardsToPrepare;
400  recentShardsToPrepare)
401  {
402  // Check available storage space for recent shards
403  if (!sufficientStorage(
404  recentShardsToPrepare, PathDesignation::none, lock))
405  return fail("insufficient storage space available");
406  }
407 
408  for (auto const shardIndex : shardIndexes)
409  preparedIndexes_.emplace(shardIndex);
410 
411  updatePeers(lock);
412  return true;
413 }
414 
415 void
417 {
418  std::lock_guard lock(mutex_);
419  assert(init_);
420 
421  if (preparedIndexes_.erase(shardIndex))
422  updatePeers(lock);
423 }
424 
427 {
429  {
430  std::lock_guard lock(mutex_);
431  assert(init_);
432 
433  for (auto const& shardIndex : preparedIndexes_)
434  rs.insert(shardIndex);
435  }
436 
437  if (rs.empty())
438  return {};
439 
440  return ripple::to_string(rs);
441 };
442 
443 bool
445  std::uint32_t shardIndex,
446  boost::filesystem::path const& srcDir)
447 {
448  auto fail = [&](std::string const& msg,
449  std::lock_guard<std::mutex> const& lock) {
450  JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
451 
452  // Remove the failed import shard index so it can be retried
453  preparedIndexes_.erase(shardIndex);
454  updatePeers(lock);
455  return false;
456  };
457 
458  using namespace boost::filesystem;
459  try
460  {
461  if (!is_directory(srcDir) || is_empty(srcDir))
462  {
463  return fail(
464  "invalid source directory " + srcDir.string(),
466  }
467  }
468  catch (std::exception const& e)
469  {
470  return fail(
471  std::string(". Exception caught in function ") + __func__ +
472  ". Error: " + e.what(),
474  }
475 
476  auto const expectedHash{app_.getLedgerMaster().walkHashBySeq(
478  if (!expectedHash)
479  return fail("expected hash not found", std::lock_guard(mutex_));
480 
481  path dstDir;
482  {
483  std::lock_guard lock(mutex_);
484  if (shards_.find(shardIndex) != shards_.end())
485  return fail("already exists", lock);
486 
487  // Check shard was prepared for import
488  if (preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
489  return fail("was not prepared for import", lock);
490 
491  auto const pathDesignation{
492  prepareForNewShard(shardIndex, numHistoricalShards(lock), lock)};
493  if (!pathDesignation)
494  return fail("failed to import", lock);
495 
496  if (*pathDesignation == PathDesignation::historical)
497  dstDir = chooseHistoricalPath(lock);
498  else
499  dstDir = dir_;
500  }
501  dstDir /= std::to_string(shardIndex);
502 
503  auto renameDir = [&, fname = __func__](path const& src, path const& dst) {
504  try
505  {
506  rename(src, dst);
507  }
508  catch (std::exception const& e)
509  {
510  return fail(
511  std::string(". Exception caught in function ") + fname +
512  ". Error: " + e.what(),
514  }
515  return true;
516  };
517 
518  // Rename source directory to the shard database directory
519  if (!renameDir(srcDir, dstDir))
520  return false;
521 
522  // Create the new shard
523  auto shard{std::make_unique<Shard>(
524  app_, *this, shardIndex, dstDir.parent_path(), j_)};
525 
526  if (!shard->init(scheduler_, *ctx_) ||
527  shard->getState() != ShardState::complete)
528  {
529  shard.reset();
530  renameDir(dstDir, srcDir);
531  return fail("failed to import", std::lock_guard(mutex_));
532  }
533 
534  auto const [it, inserted] = [&]() {
535  std::lock_guard lock(mutex_);
536  preparedIndexes_.erase(shardIndex);
537  return shards_.emplace(shardIndex, std::move(shard));
538  }();
539 
540  if (!inserted)
541  {
542  shard.reset();
543  renameDir(dstDir, srcDir);
544  return fail("failed to import", std::lock_guard(mutex_));
545  }
546 
547  finalizeShard(it->second, true, expectedHash);
548  return true;
549 }
550 
553 {
554  auto const shardIndex{seqToShardIndex(ledgerSeq)};
555  {
557  {
558  std::lock_guard lock(mutex_);
559  assert(init_);
560 
561  auto const it{shards_.find(shardIndex)};
562  if (it == shards_.end())
563  return nullptr;
564  shard = it->second;
565  }
566 
567  // Ledger must be stored in a final or acquiring shard
568  switch (shard->getState())
569  {
571  break;
572  case ShardState::acquire:
573  if (shard->containsLedger(ledgerSeq))
574  break;
575  [[fallthrough]];
576  default:
577  return nullptr;
578  }
579  }
580 
581  auto const nodeObject{Database::fetchNodeObject(hash, ledgerSeq)};
582  if (!nodeObject)
583  return nullptr;
584 
585  auto fail = [&](std::string const& msg) -> std::shared_ptr<Ledger> {
586  JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
587  return nullptr;
588  };
589 
590  auto ledger{std::make_shared<Ledger>(
591  deserializePrefixedHeader(makeSlice(nodeObject->getData())),
592  app_.config(),
593  *app_.getShardFamily())};
594 
595  if (ledger->info().seq != ledgerSeq)
596  {
597  return fail(
598  "encountered invalid ledger sequence " + std::to_string(ledgerSeq));
599  }
600  if (ledger->info().hash != hash)
601  {
602  return fail(
603  "encountered invalid ledger hash " + to_string(hash) +
604  " on sequence " + std::to_string(ledgerSeq));
605  }
606 
607  ledger->setFull();
608  if (!ledger->stateMap().fetchRoot(
609  SHAMapHash{ledger->info().accountHash}, nullptr))
610  {
611  return fail(
612  "is missing root STATE node on hash " + to_string(hash) +
613  " on sequence " + std::to_string(ledgerSeq));
614  }
615 
616  if (ledger->info().txHash.isNonZero())
617  {
618  if (!ledger->txMap().fetchRoot(
619  SHAMapHash{ledger->info().txHash}, nullptr))
620  {
621  return fail(
622  "is missing root TXN node on hash " + to_string(hash) +
623  " on sequence " + std::to_string(ledgerSeq));
624  }
625  }
626  return ledger;
627 }
628 
629 void
631 {
632  auto const ledgerSeq{ledger->info().seq};
633  if (ledger->info().hash.isZero())
634  {
635  JLOG(j_.error()) << "zero ledger hash for ledger sequence "
636  << ledgerSeq;
637  return;
638  }
639  if (ledger->info().accountHash.isZero())
640  {
641  JLOG(j_.error()) << "zero account hash for ledger sequence "
642  << ledgerSeq;
643  return;
644  }
645  if (ledger->stateMap().getHash().isNonZero() &&
646  !ledger->stateMap().isValid())
647  {
648  JLOG(j_.error()) << "invalid state map for ledger sequence "
649  << ledgerSeq;
650  return;
651  }
652  if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
653  {
654  JLOG(j_.error()) << "invalid transaction map for ledger sequence "
655  << ledgerSeq;
656  return;
657  }
658 
659  auto const shardIndex{seqToShardIndex(ledgerSeq)};
661  {
662  std::lock_guard lock(mutex_);
663  assert(init_);
664 
665  if (shardIndex != acquireIndex_)
666  {
667  JLOG(j_.trace())
668  << "shard " << shardIndex << " is not being acquired";
669  return;
670  }
671 
672  auto const it{shards_.find(shardIndex)};
673  if (it == shards_.end())
674  {
675  JLOG(j_.error())
676  << "shard " << shardIndex << " is not being acquired";
677  return;
678  }
679  shard = it->second;
680  }
681 
682  if (shard->containsLedger(ledgerSeq))
683  {
684  JLOG(j_.trace()) << "shard " << shardIndex << " ledger already stored";
685  return;
686  }
687 
688  setStoredInShard(shard, ledger);
689 }
690 
693 {
694  std::lock_guard lock(mutex_);
695  return getShardInfo(lock);
696 }
697 
698 void
700 {
701  // Stop read threads in base before data members are destroyed
702  Database::stop();
704  {
705  std::lock_guard lock(mutex_);
706  shards.reserve(shards_.size());
707  for (auto const& [_, shard] : shards_)
708  {
709  shards.push_back(shard);
710  shard->stop();
711  }
712  shards_.clear();
713  }
714  taskQueue_.stop();
715 
716  // All shards should be expired at this point
717  for (auto const& wptr : shards)
718  {
719  if (auto const shard{wptr.lock()})
720  {
721  JLOG(j_.warn()) << " shard " << shard->index() << " unexpired";
722  }
723  }
724 
725  std::unique_lock lock(mutex_);
726 
727  // Notify the shard being imported
728  // from the node store to stop
730  {
731  // A node store import is in progress
732  if (auto importShard = databaseImportStatus_->currentShard.lock();
733  importShard)
734  importShard->stop();
735  }
736 
737  // Wait for the node store import thread
738  // if necessary
740  {
741  // Tells the import function to halt
742  haltDatabaseImport_ = true;
743 
744  // Wait for the function to exit
745  while (databaseImportStatus_)
746  {
747  // Unlock just in case the import
748  // function is waiting on the mutex
749  lock.unlock();
750 
752  lock.lock();
753  }
754 
755  // Calling join while holding the mutex_ without
756  // first making sure that doImportDatabase has
757  // exited could lead to deadlock via the mutex
758  // acquisition that occurs in that function
761  }
762 }
763 
764 void
766 {
767  std::lock_guard lock(mutex_);
768  assert(init_);
769 
770  // Only the application local node store can be imported
771  assert(&source == &app_.getNodeStore());
772 
774  {
775  assert(false);
776  JLOG(j_.error()) << "database import already in progress";
777  return;
778  }
779 
781 }
782 
783 void
785 {
786  auto shouldHalt = [this] {
787  bool expected = true;
788  return haltDatabaseImport_.compare_exchange_strong(expected, false) ||
789  isStopping();
790  };
791 
792  if (shouldHalt())
793  return;
794 
795  auto loadLedger =
796  [this](char const* const sortOrder) -> std::optional<std::uint32_t> {
798  std::uint32_t ledgerSeq{0};
800  if (sortOrder == std::string("asc"))
801  {
802  info = dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase())
803  ->getLimitedOldestLedgerInfo(earliestLedgerSeq());
804  }
805  else
806  {
807  info = dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase())
808  ->getLimitedNewestLedgerInfo(earliestLedgerSeq());
809  }
810  if (info)
811  {
812  ledger = loadLedgerHelper(*info, app_, false);
813  ledgerSeq = info->seq;
814  }
815  if (!ledger || ledgerSeq == 0)
816  {
817  JLOG(j_.error()) << "no suitable ledgers were found in"
818  " the SQLite database to import";
819  return std::nullopt;
820  }
821  return ledgerSeq;
822  };
823 
824  // Find earliest ledger sequence stored
825  auto const earliestLedgerSeq{loadLedger("asc")};
826  if (!earliestLedgerSeq)
827  return;
828 
829  auto const earliestIndex = [&] {
830  auto earliestIndex = seqToShardIndex(*earliestLedgerSeq);
831 
832  // Consider only complete shards
833  if (earliestLedgerSeq != firstLedgerSeq(earliestIndex))
834  ++earliestIndex;
835 
836  return earliestIndex;
837  }();
838 
839  // Find last ledger sequence stored
840  auto const latestLedgerSeq = loadLedger("desc");
841  if (!latestLedgerSeq)
842  return;
843 
844  auto const latestIndex = [&] {
845  auto latestIndex = seqToShardIndex(*latestLedgerSeq);
846 
847  // Consider only complete shards
848  if (latestLedgerSeq != lastLedgerSeq(latestIndex))
849  --latestIndex;
850 
851  return latestIndex;
852  }();
853 
854  if (latestIndex < earliestIndex)
855  {
856  JLOG(j_.error()) << "no suitable ledgers were found in"
857  " the SQLite database to import";
858  return;
859  }
860 
861  JLOG(j_.debug()) << "Importing ledgers for shards " << earliestIndex
862  << " through " << latestIndex;
863 
864  {
865  std::lock_guard lock(mutex_);
866 
867  assert(!databaseImportStatus_);
868  databaseImportStatus_ = std::make_unique<DatabaseImportStatus>(
869  earliestIndex, latestIndex, 0);
870  }
871 
872  // Import the shards
873  for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
874  ++shardIndex)
875  {
876  if (shouldHalt())
877  return;
878 
879  auto const pathDesignation = [this, shardIndex] {
880  std::lock_guard lock(mutex_);
881 
882  auto const numHistShards = numHistoricalShards(lock);
883  auto const pathDesignation =
884  prepareForNewShard(shardIndex, numHistShards, lock);
885 
886  return pathDesignation;
887  }();
888 
889  if (!pathDesignation)
890  break;
891 
892  {
893  std::lock_guard lock(mutex_);
894 
895  // Skip if being acquired
896  if (shardIndex == acquireIndex_)
897  {
898  JLOG(j_.debug())
899  << "shard " << shardIndex << " already being acquired";
900  continue;
901  }
902 
903  // Skip if being imported from the shard archive handler
904  if (preparedIndexes_.find(shardIndex) != preparedIndexes_.end())
905  {
906  JLOG(j_.debug())
907  << "shard " << shardIndex << " already being imported";
908  continue;
909  }
910 
911  // Skip if stored
912  if (shards_.find(shardIndex) != shards_.end())
913  {
914  JLOG(j_.debug()) << "shard " << shardIndex << " already stored";
915  continue;
916  }
917  }
918 
919  std::uint32_t const firstSeq = firstLedgerSeq(shardIndex);
920  std::uint32_t const lastSeq =
921  std::max(firstSeq, lastLedgerSeq(shardIndex));
922 
923  // Verify SQLite ledgers are in the node store
924  {
925  auto const ledgerHashes{
927  firstSeq, lastSeq)};
928  if (ledgerHashes.size() != maxLedgers(shardIndex))
929  continue;
930 
931  auto& source = app_.getNodeStore();
932  bool valid{true};
933 
934  for (std::uint32_t n = firstSeq; n <= lastSeq; ++n)
935  {
936  if (!source.fetchNodeObject(ledgerHashes.at(n).ledgerHash, n))
937  {
938  JLOG(j_.warn()) << "SQLite ledger sequence " << n
939  << " mismatches node store";
940  valid = false;
941  break;
942  }
943  }
944  if (!valid)
945  continue;
946  }
947 
948  if (shouldHalt())
949  return;
950 
951  bool const needsHistoricalPath =
952  *pathDesignation == PathDesignation::historical;
953 
954  auto const path = needsHistoricalPath
956  : dir_;
957 
958  // Create the new shard
959  auto shard{std::make_shared<Shard>(app_, *this, shardIndex, path, j_)};
960  if (!shard->init(scheduler_, *ctx_))
961  continue;
962 
963  {
964  std::lock_guard lock(mutex_);
965 
966  if (shouldHalt())
967  return;
968 
969  databaseImportStatus_->currentIndex = shardIndex;
970  databaseImportStatus_->currentShard = shard;
971  databaseImportStatus_->firstSeq = firstSeq;
972  databaseImportStatus_->lastSeq = lastSeq;
973  }
974 
975  // Create a marker file to signify a database import in progress
976  auto const shardDir{path / std::to_string(shardIndex)};
977  auto const markerFile{shardDir / databaseImportMarker_};
978  {
979  std::ofstream ofs{markerFile.string()};
980  if (!ofs.is_open())
981  {
982  JLOG(j_.error()) << "shard " << shardIndex
983  << " failed to create temp marker file";
984  shard->removeOnDestroy();
985  continue;
986  }
987  }
988 
989  // Copy the ledgers from node store
990  std::shared_ptr<Ledger> recentStored;
991  std::optional<uint256> lastLedgerHash;
992 
993  while (auto const ledgerSeq = shard->prepare())
994  {
995  if (shouldHalt())
996  return;
997 
998  // Not const so it may be moved later
999  auto ledger{loadByIndex(*ledgerSeq, app_, false)};
1000  if (!ledger || ledger->info().seq != ledgerSeq)
1001  break;
1002 
1003  auto const result{shard->storeLedger(ledger, recentStored)};
1004  storeStats(result.count, result.size);
1005  if (result.error)
1006  break;
1007 
1008  if (!shard->setLedgerStored(ledger))
1009  break;
1010 
1011  if (!lastLedgerHash && ledgerSeq == lastSeq)
1012  lastLedgerHash = ledger->info().hash;
1013 
1014  recentStored = std::move(ledger);
1015  }
1016 
1017  if (shouldHalt())
1018  return;
1019 
1020  using namespace boost::filesystem;
1021  bool success{false};
1022  if (lastLedgerHash && shard->getState() == ShardState::complete)
1023  {
1024  // Store shard final key
1025  Serializer s;
1026  s.add32(Shard::version);
1027  s.add32(firstLedgerSeq(shardIndex));
1028  s.add32(lastLedgerSeq(shardIndex));
1029  s.addBitString(*lastLedgerHash);
1030  auto const nodeObject{NodeObject::createObject(
1031  hotUNKNOWN, std::move(s.modData()), Shard::finalKey)};
1032 
1033  if (shard->storeNodeObject(nodeObject))
1034  {
1035  try
1036  {
1037  std::lock_guard lock(mutex_);
1038 
1039  // The database import process is complete and the
1040  // marker file is no longer required
1041  remove_all(markerFile);
1042 
1043  JLOG(j_.debug()) << "shard " << shardIndex
1044  << " was successfully imported"
1045  " from the NodeStore";
1046  finalizeShard(
1047  shards_.emplace(shardIndex, std::move(shard))
1048  .first->second,
1049  true,
1050  std::nullopt);
1051 
1052  // This variable is meant to capture the success
1053  // of everything up to the point of shard finalization.
1054  // If the shard fails to finalize, this condition will
1055  // be handled by the finalization function itself, and
1056  // not here.
1057  success = true;
1058  }
1059  catch (std::exception const& e)
1060  {
1061  JLOG(j_.fatal()) << "shard index " << shardIndex
1062  << ". Exception caught in function "
1063  << __func__ << ". Error: " << e.what();
1064  }
1065  }
1066  }
1067 
1068  if (!success)
1069  {
1070  JLOG(j_.error()) << "shard " << shardIndex
1071  << " failed to import from the NodeStore";
1072 
1073  if (shard)
1074  shard->removeOnDestroy();
1075  }
1076  }
1077 
1078  if (shouldHalt())
1079  return;
1080 
1081  updateFileStats();
1082 }
1083 
1086 {
1087  std::shared_ptr<Shard> shard;
1088  {
1089  std::lock_guard lock(mutex_);
1090  assert(init_);
1091 
1092  auto const it{shards_.find(acquireIndex_)};
1093  if (it == shards_.end())
1094  return 0;
1095  shard = it->second;
1096  }
1097 
1098  return shard->getWriteLoad();
1099 }
1100 
1101 void
1103  NodeObjectType type,
1104  Blob&& data,
1105  uint256 const& hash,
1106  std::uint32_t ledgerSeq)
1107 {
1108  auto const shardIndex{seqToShardIndex(ledgerSeq)};
1109  std::shared_ptr<Shard> shard;
1110  {
1111  std::lock_guard lock(mutex_);
1112  if (shardIndex != acquireIndex_)
1113  {
1114  JLOG(j_.trace())
1115  << "shard " << shardIndex << " is not being acquired";
1116  return;
1117  }
1118 
1119  auto const it{shards_.find(shardIndex)};
1120  if (it == shards_.end())
1121  {
1122  JLOG(j_.error())
1123  << "shard " << shardIndex << " is not being acquired";
1124  return;
1125  }
1126  shard = it->second;
1127  }
1128 
1129  auto const nodeObject{
1130  NodeObject::createObject(type, std::move(data), hash)};
1131  if (shard->storeNodeObject(nodeObject))
1132  storeStats(1, nodeObject->getData().size());
1133 }
1134 
1135 bool
1137 {
1138  auto const ledgerSeq{srcLedger->info().seq};
1139  auto const shardIndex{seqToShardIndex(ledgerSeq)};
1140  std::shared_ptr<Shard> shard;
1141  {
1142  std::lock_guard lock(mutex_);
1143  assert(init_);
1144 
1145  if (shardIndex != acquireIndex_)
1146  {
1147  JLOG(j_.trace())
1148  << "shard " << shardIndex << " is not being acquired";
1149  return false;
1150  }
1151 
1152  auto const it{shards_.find(shardIndex)};
1153  if (it == shards_.end())
1154  {
1155  JLOG(j_.error())
1156  << "shard " << shardIndex << " is not being acquired";
1157  return false;
1158  }
1159  shard = it->second;
1160  }
1161 
1162  auto const result{shard->storeLedger(srcLedger, nullptr)};
1163  storeStats(result.count, result.size);
1164  if (result.error || result.count == 0 || result.size == 0)
1165  return false;
1166 
1167  return setStoredInShard(shard, srcLedger);
1168 }
1169 
1170 void
1172 {
1174  {
1175  std::lock_guard lock(mutex_);
1176  assert(init_);
1177 
1178  shards.reserve(shards_.size());
1179  for (auto const& e : shards_)
1180  shards.push_back(e.second);
1181  }
1182 
1184  openFinals.reserve(openFinalLimit_);
1185 
1186  for (auto const& weak : shards)
1187  {
1188  if (auto const shard{weak.lock()}; shard && shard->isOpen())
1189  {
1190  if (shard->getState() == ShardState::finalized)
1191  openFinals.emplace_back(std::move(shard));
1192  }
1193  }
1194 
1195  if (openFinals.size() > openFinalLimit_)
1196  {
1197  JLOG(j_.trace()) << "Open shards exceed configured limit of "
1198  << openFinalLimit_ << " by "
1199  << (openFinals.size() - openFinalLimit_);
1200 
1201  // Try to close enough shards to be within the limit.
1202  // Sort ascending on last use so the oldest are removed first.
1203  std::sort(
1204  openFinals.begin(),
1205  openFinals.end(),
1206  [&](std::shared_ptr<Shard> const& lhsShard,
1207  std::shared_ptr<Shard> const& rhsShard) {
1208  return lhsShard->getLastUse() < rhsShard->getLastUse();
1209  });
1210 
1211  for (auto it{openFinals.cbegin()};
1212  it != openFinals.cend() && openFinals.size() > openFinalLimit_;)
1213  {
1214  if ((*it)->tryClose())
1215  it = openFinals.erase(it);
1216  else
1217  ++it;
1218  }
1219  }
1220 }
1221 
1224 {
1226  {
1228 
1229  ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex;
1230  ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex;
1231  ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex;
1232 
1233  Json::Value currentShard(Json::objectValue);
1234  currentShard[jss::firstSequence] = databaseImportStatus_->firstSeq;
1235  currentShard[jss::lastSequence] = databaseImportStatus_->lastSeq;
1236 
1237  if (auto shard = databaseImportStatus_->currentShard.lock(); shard)
1238  currentShard[jss::storedSeqs] = shard->getStoredSeqs();
1239 
1240  ret[jss::currentShard] = currentShard;
1241 
1242  if (haltDatabaseImport_)
1243  ret[jss::message] = "Database import halt initiated...";
1244 
1245  return ret;
1246  }
1247 
1248  return RPC::make_error(rpcINTERNAL, "Database import not running");
1249 }
1250 
1253 {
1254  std::lock_guard lock(mutex_);
1255 
1256  if (!init_)
1257  return RPC::make_error(rpcINTERNAL, "Shard store not initialized");
1258 
1260  return RPC::make_error(
1261  rpcINTERNAL, "Database import already in progress");
1262 
1263  if (isStopping())
1264  return RPC::make_error(rpcINTERNAL, "Node is shutting down");
1265 
1267 
1269  result[jss::message] = "Database import initiated...";
1270 
1271  return result;
1272 }
1273 
1276 {
1277  std::lock_guard lock(mutex_);
1278 
1279  if (!init_)
1280  return RPC::make_error(rpcINTERNAL, "Shard store not initialized");
1281 
1282  if (!databaseImporter_.joinable())
1283  return RPC::make_error(rpcINTERNAL, "Database import not running");
1284 
1285  if (isStopping())
1286  return RPC::make_error(rpcINTERNAL, "Node is shutting down");
1287 
1288  haltDatabaseImport_ = true;
1289 
1291  result[jss::message] = "Database import halt initiated...";
1292 
1293  return result;
1294 }
1295 
1298 {
1299  std::lock_guard lock(mutex_);
1300 
1301  if (!databaseImportStatus_)
1302  return {};
1303 
1304  return databaseImportStatus_->firstSeq;
1305 }
1306 
1307 bool
1309 {
1310  auto fail = [j = j_](std::string const& msg) {
1311  JLOG(j.error()) << "[" << ConfigSection::shardDatabase() << "] " << msg;
1312  return false;
1313  };
1314 
1315  Config const& config{app_.config()};
1316  Section const& section{config.section(ConfigSection::shardDatabase())};
1317 
1318  auto compare = [&](std::string const& name, std::uint32_t defaultValue) {
1319  std::uint32_t shardDBValue{defaultValue};
1320  get_if_exists<std::uint32_t>(section, name, shardDBValue);
1321 
1322  std::uint32_t nodeDBValue{defaultValue};
1323  get_if_exists<std::uint32_t>(
1324  config.section(ConfigSection::nodeDatabase()), name, nodeDBValue);
1325 
1326  return shardDBValue == nodeDBValue;
1327  };
1328 
1329  // If ledgers_per_shard or earliest_seq are specified,
1330  // they must be equally assigned in 'node_db'
1331  if (!compare("ledgers_per_shard", DEFAULT_LEDGERS_PER_SHARD))
1332  {
1333  return fail(
1334  "and [" + ConfigSection::nodeDatabase() + "] define different '" +
1335  "ledgers_per_shard" + "' values");
1336  }
1337  if (!compare("earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
1338  {
1339  return fail(
1340  "and [" + ConfigSection::nodeDatabase() + "] define different '" +
1341  "earliest_seq" + "' values");
1342  }
1343 
1344  using namespace boost::filesystem;
1345  if (!get_if_exists<path>(section, "path", dir_))
1346  return fail("'path' missing");
1347 
1348  {
1349  get_if_exists(section, "max_historical_shards", maxHistoricalShards_);
1350 
1351  Section const& historicalShardPaths =
1352  config.section(SECTION_HISTORICAL_SHARD_PATHS);
1353 
1354  auto values = historicalShardPaths.values();
1355 
1356  std::sort(values.begin(), values.end());
1357  values.erase(std::unique(values.begin(), values.end()), values.end());
1358 
1359  for (auto const& s : values)
1360  {
1361  auto const dir = path(s);
1362  if (dir_ == dir)
1363  {
1364  return fail(
1365  "the 'path' cannot also be in the "
1366  "'historical_shard_path' section");
1367  }
1368 
1370  }
1371  }
1372 
1373  // NuDB is the default and only supported permanent storage backend
1374  backendName_ = get(section, "type", "nudb");
1375  if (!boost::iequals(backendName_, "NuDB"))
1376  return fail("'type' value unsupported");
1377 
1378  return true;
1379 }
1380 
1383  uint256 const& hash,
1384  std::uint32_t ledgerSeq,
1385  FetchReport& fetchReport,
1386  bool duplicate)
1387 {
1388  auto const shardIndex{seqToShardIndex(ledgerSeq)};
1389  std::shared_ptr<Shard> shard;
1390  {
1391  std::lock_guard lock(mutex_);
1392  auto const it{shards_.find(shardIndex)};
1393  if (it == shards_.end())
1394  return nullptr;
1395  shard = it->second;
1396  }
1397 
1398  return shard->fetchNodeObject(hash, fetchReport);
1399 }
1400 
1403  std::uint32_t validLedgerSeq,
1405 {
1406  if (validLedgerSeq < earliestLedgerSeq_)
1407  return std::nullopt;
1408 
1409  auto const maxShardIndex{[this, validLedgerSeq]() {
1410  auto shardIndex{seqToShardIndex(validLedgerSeq)};
1411  if (validLedgerSeq != lastLedgerSeq(shardIndex))
1412  --shardIndex;
1413  return shardIndex;
1414  }()};
1415  auto const maxNumShards{maxShardIndex - earliestShardIndex_ + 1};
1416 
1417  // Check if the shard store has all shards
1418  if (shards_.size() >= maxNumShards)
1419  return std::nullopt;
1420 
1421  if (maxShardIndex < 1024 ||
1422  static_cast<float>(shards_.size()) / maxNumShards > 0.5f)
1423  {
1424  // Small or mostly full index space to sample
1425  // Find the available indexes and select one at random
1427  available.reserve(maxNumShards - shards_.size());
1428 
1429  for (auto shardIndex = earliestShardIndex_; shardIndex <= maxShardIndex;
1430  ++shardIndex)
1431  {
1432  if (shards_.find(shardIndex) == shards_.end() &&
1433  preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
1434  {
1435  available.push_back(shardIndex);
1436  }
1437  }
1438 
1439  if (available.empty())
1440  return std::nullopt;
1441 
1442  if (available.size() == 1)
1443  return available.front();
1444 
1445  return available[rand_int(
1446  0u, static_cast<std::uint32_t>(available.size() - 1))];
1447  }
1448 
1449  // Large, sparse index space to sample
1450  // Keep choosing indexes at random until an available one is found
1451  // chances of running more than 30 times is less than 1 in a billion
1452  for (int i = 0; i < 40; ++i)
1453  {
1454  auto const shardIndex{rand_int(earliestShardIndex_, maxShardIndex)};
1455  if (shards_.find(shardIndex) == shards_.end() &&
1456  preparedIndexes_.find(shardIndex) == preparedIndexes_.end())
1457  {
1458  return shardIndex;
1459  }
1460  }
1461 
1462  assert(false);
1463  return std::nullopt;
1464 }
1465 
1466 void
1468  std::shared_ptr<Shard>& shard,
1469  bool const writeSQLite,
1470  std::optional<uint256> const& expectedHash)
1471 {
1472  taskQueue_.addTask([this,
1473  wptr = std::weak_ptr<Shard>(shard),
1474  writeSQLite,
1475  expectedHash]() {
1476  if (isStopping())
1477  return;
1478 
1479  auto shard{wptr.lock()};
1480  if (!shard)
1481  {
1482  JLOG(j_.debug()) << "Shard removed before being finalized";
1483  return;
1484  }
1485 
1486  if (!shard->finalize(writeSQLite, expectedHash))
1487  {
1488  if (isStopping())
1489  return;
1490 
1491  // Invalid or corrupt shard, remove it
1492  removeFailedShard(shard);
1493  return;
1494  }
1495 
1496  if (isStopping())
1497  return;
1498 
1499  {
1500  auto const boundaryIndex{shardBoundaryIndex()};
1501  std::lock_guard lock(mutex_);
1502 
1503  if (shard->index() < boundaryIndex)
1504  {
1505  // This is a historical shard
1506  if (!historicalPaths_.empty() &&
1507  shard->getDir().parent_path() == dir_)
1508  {
1509  // Shard wasn't placed at a separate historical path
1510  JLOG(j_.warn()) << "shard " << shard->index()
1511  << " is not stored at a historical path";
1512  }
1513  }
1514  else
1515  {
1516  // Not a historical shard. Shift recent shards if necessary
1517  assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1518  relocateOutdatedShards(lock);
1519 
1520  // Set the appropriate recent shard index
1521  if (shard->index() == boundaryIndex)
1522  secondLatestShardIndex_ = shard->index();
1523  else
1524  latestShardIndex_ = shard->index();
1525 
1526  if (shard->getDir().parent_path() != dir_)
1527  {
1528  JLOG(j_.warn()) << "shard " << shard->index()
1529  << " is not stored at the path";
1530  }
1531  }
1532 
1533  updatePeers(lock);
1534  }
1535 
1536  updateFileStats();
1537  });
1538 }
1539 
1540 void
1542 {
1544  {
1545  std::lock_guard lock(mutex_);
1546  if (shards_.empty())
1547  return;
1548 
1549  shards.reserve(shards_.size());
1550  for (auto const& e : shards_)
1551  shards.push_back(e.second);
1552  }
1553 
1554  std::uint64_t sumSz{0};
1555  std::uint32_t sumFd{0};
1556  std::uint32_t numShards{0};
1557  for (auto const& weak : shards)
1558  {
1559  if (auto const shard{weak.lock()}; shard)
1560  {
1561  auto const [sz, fd] = shard->getFileInfo();
1562  sumSz += sz;
1563  sumFd += fd;
1564  ++numShards;
1565  }
1566  }
1567 
1568  std::lock_guard lock(mutex_);
1569  fileSz_ = sumSz;
1570  fdRequired_ = sumFd;
1571  avgShardFileSz_ = (numShards == 0 ? fileSz_ : fileSz_ / numShards);
1572 
1573  if (!canAdd_)
1574  return;
1575 
1576  if (auto const count = numHistoricalShards(lock);
1577  count >= maxHistoricalShards_)
1578  {
1580  {
1581  // In order to avoid excessive output, don't produce
1582  // this warning if the server isn't configured to
1583  // store historical shards.
1584  JLOG(j_.warn()) << "maximum number of historical shards reached";
1585  }
1586 
1587  canAdd_ = false;
1588  }
1589  else if (!sufficientStorage(
1590  maxHistoricalShards_ - count,
1592  lock))
1593  {
1594  JLOG(j_.warn())
1595  << "maximum shard store size exceeds available storage space";
1596 
1597  canAdd_ = false;
1598  }
1599 }
1600 
1601 bool
1603  std::uint32_t numShards,
1604  PathDesignation pathDesignation,
1605  std::lock_guard<std::mutex> const&) const
1606 {
1607  try
1608  {
1609  std::vector<std::uint64_t> capacities;
1610 
1611  if (pathDesignation == PathDesignation::historical &&
1613  {
1614  capacities.reserve(historicalPaths_.size());
1615 
1616  for (auto const& path : historicalPaths_)
1617  {
1618  // Get the available storage for each historical path
1619  auto const availableSpace =
1620  boost::filesystem::space(path).available;
1621 
1622  capacities.push_back(availableSpace);
1623  }
1624  }
1625  else
1626  {
1627  // Get the available storage for the main shard path
1628  capacities.push_back(boost::filesystem::space(dir_).available);
1629  }
1630 
1631  for (std::uint64_t const capacity : capacities)
1632  {
1633  // Leverage all the historical shard paths to
1634  // see if collectively they can fit the specified
1635  // number of shards. For this to work properly,
1636  // each historical path must correspond to a separate
1637  // physical device or filesystem.
1638 
1639  auto const shardCap = capacity / avgShardFileSz_;
1640  if (numShards <= shardCap)
1641  return true;
1642 
1643  numShards -= shardCap;
1644  }
1645  }
1646  catch (std::exception const& e)
1647  {
1648  JLOG(j_.fatal()) << "Exception caught in function " << __func__
1649  << ". Error: " << e.what();
1650  return false;
1651  }
1652 
1653  return false;
1654 }
1655 
1656 bool
1658  std::shared_ptr<Shard>& shard,
1659  std::shared_ptr<Ledger const> const& ledger)
1660 {
1661  if (!shard->setLedgerStored(ledger))
1662  {
1663  // Invalid or corrupt shard, remove it
1664  removeFailedShard(shard);
1665  return false;
1666  }
1667 
1668  if (shard->getState() == ShardState::complete)
1669  {
1670  std::lock_guard lock(mutex_);
1671  if (auto const it{shards_.find(shard->index())}; it != shards_.end())
1672  {
1673  if (shard->index() == acquireIndex_)
1674  acquireIndex_ = 0;
1675 
1676  finalizeShard(it->second, false, std::nullopt);
1677  }
1678  else
1679  {
1680  JLOG(j_.debug())
1681  << "shard " << shard->index() << " is no longer being acquired";
1682  }
1683  }
1684 
1685  updateFileStats();
1686  return true;
1687 }
1688 
1689 void
1691 {
1692  {
1693  std::lock_guard lock(mutex_);
1694 
1695  if (shard->index() == acquireIndex_)
1696  acquireIndex_ = 0;
1697 
1698  if (shard->index() == latestShardIndex_)
1699  latestShardIndex_ = std::nullopt;
1700 
1701  if (shard->index() == secondLatestShardIndex_)
1702  secondLatestShardIndex_ = std::nullopt;
1703  }
1704 
1705  shard->removeOnDestroy();
1706 
1707  // Reset the shared_ptr to invoke the shard's
1708  // destructor and remove it from the server
1709  shard.reset();
1710  updateFileStats();
1711 }
1712 
1715 {
1716  auto const validIndex = app_.getLedgerMaster().getValidLedgerIndex();
1717 
1718  if (validIndex < earliestLedgerSeq_)
1719  return 0;
1720 
1721  // Shards with an index earlier than the recent shard boundary index
1722  // are considered historical. The three shards at or later than
1723  // this index consist of the two most recently validated shards
1724  // and the shard still in the process of being built by live
1725  // transactions.
1726  return seqToShardIndex(validIndex) - 1;
1727 }
1728 
1731  std::lock_guard<std::mutex> const& lock) const
1732 {
1733  auto const boundaryIndex{shardBoundaryIndex()};
1734  return std::count_if(
1735  shards_.begin(), shards_.end(), [boundaryIndex](auto const& entry) {
1736  return entry.first < boundaryIndex;
1737  });
1738 }
1739 
1740 void
1742  std::lock_guard<std::mutex> const& lock)
1743 {
1744  auto& cur{latestShardIndex_};
1745  auto& prev{secondLatestShardIndex_};
1746  if (!cur && !prev)
1747  return;
1748 
1749  auto const latestShardIndex =
1751  auto const separateHistoricalPath = !historicalPaths_.empty();
1752 
1753  auto const removeShard = [this](std::uint32_t const shardIndex) -> void {
1754  canAdd_ = false;
1755 
1756  if (auto it = shards_.find(shardIndex); it != shards_.end())
1757  {
1758  if (it->second)
1759  removeFailedShard(it->second);
1760  else
1761  {
1762  JLOG(j_.warn()) << "can't find shard to remove";
1763  }
1764  }
1765  else
1766  {
1767  JLOG(j_.warn()) << "can't find shard to remove";
1768  }
1769  };
1770 
1771  auto const keepShard = [this, &lock, removeShard, separateHistoricalPath](
1772  std::uint32_t const shardIndex) -> bool {
1774  {
1775  JLOG(j_.error()) << "maximum number of historical shards reached";
1776  removeShard(shardIndex);
1777  return false;
1778  }
1779  if (separateHistoricalPath &&
1781  {
1782  JLOG(j_.error()) << "insufficient storage space available";
1783  removeShard(shardIndex);
1784  return false;
1785  }
1786 
1787  return true;
1788  };
1789 
1790  // Move a shard from the main shard path to a historical shard
1791  // path by copying the contents, and creating a new shard.
1792  auto const moveShard = [this,
1793  &lock](std::uint32_t const shardIndex) -> void {
1794  auto it{shards_.find(shardIndex)};
1795  if (it == shards_.end())
1796  {
1797  JLOG(j_.warn()) << "can't find shard to move to historical path";
1798  return;
1799  }
1800 
1801  auto& shard{it->second};
1802 
1803  // Close any open file descriptors before moving the shard
1804  // directory. Don't call removeOnDestroy since that would
1805  // attempt to close the fds after the directory has been moved.
1806  if (!shard->tryClose())
1807  {
1808  JLOG(j_.warn()) << "can't close shard to move to historical path";
1809  return;
1810  }
1811 
1812  auto const dst{chooseHistoricalPath(lock)};
1813  try
1814  {
1815  // Move the shard directory to the new path
1816  boost::filesystem::rename(
1817  shard->getDir().string(), dst / std::to_string(shardIndex));
1818  }
1819  catch (...)
1820  {
1821  JLOG(j_.error()) << "shard " << shardIndex
1822  << " failed to move to historical storage";
1823  return;
1824  }
1825 
1826  // Create a shard instance at the new location
1827  shard = std::make_shared<Shard>(app_, *this, shardIndex, dst, j_);
1828 
1829  // Open the new shard
1830  if (!shard->init(scheduler_, *ctx_))
1831  {
1832  JLOG(j_.error()) << "shard " << shardIndex
1833  << " failed to open in historical storage";
1834  shard->removeOnDestroy();
1835  shard.reset();
1836  }
1837  };
1838 
1839  // See if either of the recent shards needs to be updated
1840  bool const curNotSynched =
1841  latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
1842  bool const prevNotSynched = secondLatestShardIndex_ &&
1843  *secondLatestShardIndex_ != latestShardIndex - 1;
1844 
1845  // A new shard has been published. Move outdated
1846  // shards to historical storage as needed
1847  if (curNotSynched || prevNotSynched)
1848  {
1849  if (prev)
1850  {
1851  // Move the formerly second latest shard to historical storage
1852  if (keepShard(*prev) && separateHistoricalPath)
1853  moveShard(*prev);
1854 
1855  prev = std::nullopt;
1856  }
1857 
1858  if (cur)
1859  {
1860  // The formerly latest shard is now the second latest
1861  if (cur == latestShardIndex - 1)
1862  prev = cur;
1863 
1864  // The formerly latest shard is no longer a 'recent' shard
1865  else
1866  {
1867  // Move the formerly latest shard to historical storage
1868  if (keepShard(*cur) && separateHistoricalPath)
1869  moveShard(*cur);
1870  }
1871 
1872  cur = std::nullopt;
1873  }
1874  }
1875 }
1876 
1877 auto
1879  std::uint32_t shardIndex,
1882 {
1883  // Any shard earlier than the two most recent shards is a historical shard
1884  auto const boundaryIndex{shardBoundaryIndex()};
1885  auto const isHistoricalShard = shardIndex < boundaryIndex;
1886 
1887  auto const designation = isHistoricalShard && !historicalPaths_.empty()
1890 
1891  // Check shard count and available storage space
1892  if (isHistoricalShard && numHistoricalShards >= maxHistoricalShards_)
1893  {
1894  JLOG(j_.error()) << "maximum number of historical shards reached";
1895  canAdd_ = false;
1896  return std::nullopt;
1897  }
1898  if (!sufficientStorage(1, designation, lock))
1899  {
1900  JLOG(j_.error()) << "insufficient storage space available";
1901  canAdd_ = false;
1902  return std::nullopt;
1903  }
1904 
1905  return designation;
1906 }
1907 
1908 boost::filesystem::path
1910 {
1911  // If not configured with separate historical paths,
1912  // use the main path (dir_) by default.
1913  if (historicalPaths_.empty())
1914  return dir_;
1915 
1916  boost::filesystem::path historicalShardPath;
1917  std::vector<boost::filesystem::path> potentialPaths;
1918 
1919  for (boost::filesystem::path const& path : historicalPaths_)
1920  {
1921  if (boost::filesystem::space(path).available >= avgShardFileSz_)
1922  potentialPaths.push_back(path);
1923  }
1924 
1925  if (potentialPaths.empty())
1926  {
1927  JLOG(j_.error()) << "failed to select a historical shard path";
1928  return "";
1929  }
1930 
1931  std::sample(
1932  potentialPaths.begin(),
1933  potentialPaths.end(),
1934  &historicalShardPath,
1935  1,
1936  default_prng());
1937 
1938  return historicalShardPath;
1939 }
1940 
1941 bool
1943 {
1944 #if BOOST_OS_LINUX
1945  // Each historical shard path must correspond
1946  // to a directory on a distinct device or file system.
1947  // Currently, this constraint is enforced only on Linux.
1950 
1951  for (auto const& path : historicalPaths_)
1952  {
1953  struct statvfs buffer;
1954  if (statvfs(path.c_str(), &buffer))
1955  {
1956  JLOG(j_.error())
1957  << "failed to acquire stats for 'historical_shard_path': "
1958  << path;
1959  return false;
1960  }
1961 
1962  filesystemIDs[buffer.f_fsid].push_back(path.string());
1963  }
1964 
1965  bool ret = true;
1966  for (auto const& entry : filesystemIDs)
1967  {
1968  // Check to see if any of the paths are stored on the same file system
1969  if (entry.second.size() > 1)
1970  {
1971  // Two or more historical storage paths
1972  // correspond to the same file system.
1973  JLOG(j_.error())
1974  << "The following paths correspond to the same filesystem: "
1975  << boost::algorithm::join(entry.second, ", ")
1976  << ". Each configured historical storage path should"
1977  " be on a unique device or filesystem.";
1978 
1979  ret = false;
1980  }
1981  }
1982 
1983  return ret;
1984 
1985 #else
1986  // The requirement that each historical storage path
1987  // corresponds to a distinct device or file system is
1988  // enforced only on Linux, so on other platforms
1989  // keep track of the available capacities for each
1990  // path. Issue a warning if we suspect any of the paths
1991  // may violate this requirement.
1992 
1993  // Map byte counts to each path that shares that byte count.
1995  uniqueCapacities(historicalPaths_.size());
1996 
1997  for (auto const& path : historicalPaths_)
1998  uniqueCapacities[boost::filesystem::space(path).available].push_back(
1999  path.string());
2000 
2001  for (auto const& entry : uniqueCapacities)
2002  {
2003  // Check to see if any paths have the same amount of available bytes.
2004  if (entry.second.size() > 1)
2005  {
2006  // Two or more historical storage paths may
2007  // correspond to the same device or file system.
2008  JLOG(j_.warn())
2009  << "Each of the following paths have " << entry.first
2010  << " bytes free, and may be located on the same device"
2011  " or file system: "
2012  << boost::algorithm::join(entry.second, ", ")
2013  << ". Each configured historical storage path should"
2014  " be on a unique device or file system.";
2015  }
2016  }
2017 #endif
2018 
2019  return true;
2020 }
2021 
2022 bool
2024  LedgerIndex ledgerSeq,
2025  std::function<bool(soci::session& session)> const& callback)
2026 {
2027  if (ledgerSeq < earliestLedgerSeq_)
2028  {
2029  JLOG(j_.warn()) << "callForLedgerSQLByLedgerSeq ledger seq too early: "
2030  << ledgerSeq;
2031  return false;
2032  }
2033 
2034  return callForLedgerSQLByShardIndex(seqToShardIndex(ledgerSeq), callback);
2035 }
2036 
2037 bool
2039  const uint32_t shardIndex,
2040  std::function<bool(soci::session& session)> const& callback)
2041 {
2042  std::lock_guard lock(mutex_);
2043 
2044  auto const it{shards_.find(shardIndex)};
2045 
2046  return it != shards_.end() &&
2047  it->second->getState() == ShardState::finalized &&
2048  it->second->callForLedgerSQL(callback);
2049 }
2050 
2051 bool
2053  LedgerIndex ledgerSeq,
2054  std::function<bool(soci::session& session)> const& callback)
2055 {
2057  seqToShardIndex(ledgerSeq), callback);
2058 }
2059 
2060 bool
2062  std::uint32_t const shardIndex,
2063  std::function<bool(soci::session& session)> const& callback)
2064 {
2065  std::lock_guard lock(mutex_);
2066 
2067  auto const it{shards_.find(shardIndex)};
2068 
2069  return it != shards_.end() &&
2070  it->second->getState() == ShardState::finalized &&
2071  it->second->callForTransactionSQL(callback);
2072 }
2073 
2074 bool
2076  std::optional<std::uint32_t> minShardIndex,
2077  std::function<bool(Shard& shard)> const& visit)
2078 {
2079  std::lock_guard lock(mutex_);
2080 
2082 
2083  if (!minShardIndex)
2084  it = shards_.begin();
2085  else
2086  it = shards_.lower_bound(*minShardIndex);
2087 
2088  eit = shards_.end();
2089 
2090  for (; it != eit; it++)
2091  {
2092  if (it->second->getState() == ShardState::finalized)
2093  {
2094  if (!visit(*it->second))
2095  return false;
2096  }
2097  }
2098 
2099  return true;
2100 }
2101 
2102 bool
2104  std::optional<std::uint32_t> minShardIndex,
2105  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2106  callback)
2107 {
2108  return iterateShardsForward(
2109  minShardIndex, [&callback](Shard& shard) -> bool {
2110  return shard.callForLedgerSQL(callback);
2111  });
2112 }
2113 
2114 bool
2116  std::optional<std::uint32_t> minShardIndex,
2117  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2118  callback)
2119 {
2120  return iterateShardsForward(
2121  minShardIndex, [&callback](Shard& shard) -> bool {
2122  return shard.callForTransactionSQL(callback);
2123  });
2124 }
2125 
2126 bool
2128  std::optional<std::uint32_t> maxShardIndex,
2129  std::function<bool(Shard& shard)> const& visit)
2130 {
2131  std::lock_guard lock(mutex_);
2132 
2133  std::map<std::uint32_t, std::shared_ptr<Shard>>::reverse_iterator it, eit;
2134 
2135  if (!maxShardIndex)
2136  it = shards_.rbegin();
2137  else
2138  it = std::make_reverse_iterator(shards_.upper_bound(*maxShardIndex));
2139 
2140  eit = shards_.rend();
2141 
2142  for (; it != eit; it++)
2143  {
2144  if (it->second->getState() == ShardState::finalized &&
2145  (!maxShardIndex || it->first <= *maxShardIndex))
2146  {
2147  if (!visit(*it->second))
2148  return false;
2149  }
2150  }
2151 
2152  return true;
2153 }
2154 
2155 bool
2157  std::optional<std::uint32_t> maxShardIndex,
2158  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2159  callback)
2160 {
2161  return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool {
2162  return shard.callForLedgerSQL(callback);
2163  });
2164 }
2165 
2166 bool
2168  std::optional<std::uint32_t> maxShardIndex,
2169  std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
2170  callback)
2171 {
2172  return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool {
2173  return shard.callForTransactionSQL(callback);
2174  });
2175 }
2176 
2179 {
2180  auto shardInfo{std::make_unique<ShardInfo>()};
2181  for (auto const& [_, shard] : shards_)
2182  {
2183  shardInfo->update(
2184  shard->index(), shard->getState(), shard->getPercentProgress());
2185  }
2186 
2187  for (auto const shardIndex : preparedIndexes_)
2188  shardInfo->update(shardIndex, ShardState::queued, 0);
2189 
2190  return shardInfo;
2191 }
2192 
2193 size_t
2195 {
2196  std::lock_guard lock(mutex_);
2197  return taskQueue_.size();
2198 }
2199 
2200 void
2202 {
2203  if (!app_.config().standalone() &&
2205  {
2206  auto const message{getShardInfo(lock)->makeMessage(app_)};
2207  app_.overlay().foreach(send_always(std::make_shared<Message>(
2208  message, protocol::mtPEER_SHARD_INFO_V2)));
2209  }
2210 }
2211 
2212 void
2214 {
2215  // Run the lengthy node store import process in the background
2216  // on a dedicated thread.
2217  databaseImporter_ = std::thread([this] {
2218  doImportDatabase();
2219 
2220  std::lock_guard lock(mutex_);
2221 
2222  // Make sure to clear this in case the import
2223  // exited early.
2224  databaseImportStatus_.reset();
2225 
2226  // Detach the thread so subsequent attempts
2227  // to start the import won't get held up by
2228  // the old thread of execution
2230  });
2231 }
2232 
2233 //------------------------------------------------------------------------------
2234 
2237  Application& app,
2238  Scheduler& scheduler,
2239  int readThreads,
2240  beast::Journal j)
2241 {
2242  // The shard store is optional. Future changes will require it.
2243  Section const& section{
2245  if (section.empty())
2246  return nullptr;
2247 
2248  return std::make_unique<DatabaseShardImp>(app, scheduler, readThreads, j);
2249 }
2250 
2251 } // namespace NodeStore
2252 } // namespace ripple
ripple::SQLiteDatabase
Definition: SQLiteDatabase.h:27
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::NodeStore::DatabaseShardImp::iterateLedgerSQLsForward
bool iterateLedgerSQLsForward(std::optional< std::uint32_t > minShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateLedgerSQLsForward Checks out ledger databases for all shards in ascending order starting from ...
Definition: DatabaseShardImp.cpp:2103
ripple::SizedItem::openFinalLimit
@ openFinalLimit
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::NodeStore::Database::lastLedgerSeq
std::uint32_t lastLedgerSeq(std::uint32_t shardIndex) const noexcept
Calculates the last ledger sequence for a given shard index.
Definition: Database.h:271
ripple::Application
Definition: Application.h:115
ripple::hotUNKNOWN
@ hotUNKNOWN
Definition: NodeObject.h:33
std::this_thread::sleep_for
T sleep_for(T... args)
ripple::NodeStore::make_ShardStore
std::unique_ptr< DatabaseShard > make_ShardStore(Application &app, Scheduler &scheduler, int readThreads, beast::Journal j)
Definition: DatabaseShardImp.cpp:2236
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:241
ripple::NodeStore::DatabaseShardImp::mutex_
std::mutex mutex_
Definition: DatabaseShardImp.h:233
ripple::NodeStore::DatabaseShardImp::app_
Application & app_
Definition: DatabaseShardImp.h:232
ripple::ShardState::complete
@ complete
ripple::DEFAULT_LEDGERS_PER_SHARD
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
Definition: SystemParameters.h:76
ripple::NodeStore::DatabaseShardImp::storeLedger
bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger) override
Store a ledger from a different database.
Definition: DatabaseShardImp.cpp:1136
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:51
std::string
STL class.
std::shared_ptr< Ledger >
ripple::loadByIndex
std::shared_ptr< Ledger > loadByIndex(std::uint32_t ledgerIndex, Application &app, bool acquire)
Definition: Ledger.cpp:1123
ripple::SizedItem
SizedItem
Definition: Config.h:48
ripple::NodeStore::DatabaseShardImp::shards_
std::map< std::uint32_t, std::shared_ptr< Shard > > shards_
Definition: DatabaseShardImp.h:243
std::exception
STL class.
std::stoul
T stoul(T... args)
ripple::NodeStore::DatabaseShardImp::PathDesignation
PathDesignation
Definition: DatabaseShardImp.h:196
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
ripple::NodeStore::DatabaseShardImp::callForLedgerSQLByLedgerSeq
bool callForLedgerSQLByLedgerSeq(LedgerIndex ledgerSeq, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the SQLite db holding the corresponding ledger.
Definition: DatabaseShardImp.cpp:2023
ripple::NodeStore::TaskQueue::size
size_t size() const
Return the queue size.
Definition: TaskQueue.cpp:48
std::vector::reserve
T reserve(T... args)
ripple::NodeStore::DatabaseShardImp::removePreShard
void removePreShard(std::uint32_t shardIndex) override
Remove a previously prepared shard index for import.
Definition: DatabaseShardImp.cpp:416
ripple::LedgerMaster::getValidLedgerIndex
LedgerIndex getValidLedgerIndex()
Definition: LedgerMaster.cpp:213
ripple::NodeStore::Database::fdRequired_
int fdRequired_
Definition: Database.h:303
ripple::NodeStore::DatabaseShardImp::fileSz_
std::uint64_t fileSz_
Definition: DatabaseShardImp.h:267
ripple::InboundLedger::Reason::GENERIC
@ GENERIC
std::vector
STL class.
std::set::find
T find(T... args)
ripple::ConfigSection::shardDatabase
static std::string shardDatabase()
Definition: ConfigSections.h:38
ripple::NodeStore::Shard::callForLedgerSQL
bool callForLedgerSQL(std::function< bool(Args... args)> const &callback)
Invoke a callback on the ledger SQLite db.
Definition: nodestore/impl/Shard.h:228
ripple::NodeStore::DatabaseShardImp::stop
void stop() override
Definition: DatabaseShardImp.cpp:699
std::vector::size
T size(T... args)
ripple::NodeObjectType
NodeObjectType
The types of node objects.
Definition: NodeObject.h:32
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
std::chrono::milliseconds
ripple::NodeStore::DatabaseShardImp::taskQueue_
TaskQueue taskQueue_
Definition: DatabaseShardImp.h:240
ripple::NodeStore::DatabaseShardImp::setStored
void setStored(std::shared_ptr< Ledger const > const &ledger) override
Notifies the database that the given ledger has been fully acquired and stored.
Definition: DatabaseShardImp.cpp:630
std::set::emplace
T emplace(T... args)
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
ripple::kilobytes
constexpr auto kilobytes(T value) noexcept
Definition: ByteUtilities.h:27
ripple::NetworkOPs::getOperatingMode
virtual OperatingMode getOperatingMode() const =0
ripple::NodeStore::Database::stop
virtual void stop()
Definition: Database.cpp:165
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
ripple::NodeStore::DatabaseShardImp::getDatabaseImportSequence
std::optional< std::uint32_t > getDatabaseImportSequence() const override
Returns the first ledger sequence of the shard currently being imported from the NodeStore.
Definition: DatabaseShardImp.cpp:1297
std::function
std::all_of
T all_of(T... args)
ripple::NodeStore::Shard::finalKey
static const uint256 finalKey
Definition: nodestore/impl/Shard.h:251
std::atomic_bool::compare_exchange_strong
T compare_exchange_strong(T... args)
ripple::LedgerMaster::walkHashBySeq
std::optional< LedgerHash > walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason)
Walk to a ledger's hash using the skip list.
Definition: LedgerMaster.cpp:1754
ripple::NodeStore::DatabaseShardImp::importDatabase
void importDatabase(Database &source) override
Import the application local node store.
Definition: DatabaseShardImp.cpp:765
ripple::deserializePrefixedHeader
LedgerInfo deserializePrefixedHeader(Slice data, bool hasHash)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
Definition: InboundLedger.cpp:299
ripple::NodeStore::DatabaseShardImp::databaseImporter_
std::thread databaseImporter_
Definition: DatabaseShardImp.h:292
ripple::NodeStore::DatabaseShardImp::openFinalLimit_
const std::uint32_t openFinalLimit_
Definition: DatabaseShardImp.h:273
std::sort
T sort(T... args)
std::shared_ptr::reset
T reset(T... args)
ripple::SHAMapHash
Definition: SHAMapHash.h:32
ripple::NodeStore::DatabaseShardImp::iterateShardsForward
bool iterateShardsForward(std::optional< std::uint32_t > minShardIndex, std::function< bool(Shard &shard)> const &visit)
iterateShardsForward Visits all shards starting from given in ascending order and calls given callbac...
Definition: DatabaseShardImp.cpp:2075
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::NodeStore::DatabaseShardImp::sweep
void sweep() override
Remove expired entries from the positive and negative caches.
Definition: DatabaseShardImp.cpp:1171
ripple::NodeStore::DatabaseShardImp::stopNodeToShard
Json::Value stopNodeToShard() override
Terminates a NodeStore to ShardStore import and returns the result in a JSON object.
Definition: DatabaseShardImp.cpp:1275
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
std::thread::detach
T detach(T... args)
ripple::send_always
Sends a message to all peers.
Definition: predicates.h:31
ripple::NodeStore::DatabaseShardImp::PathDesignation::historical
@ historical
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:384
ripple::NodeStore::Shard::version
static constexpr std::uint32_t version
Definition: nodestore/impl/Shard.h:246
ripple::NodeStore::DatabaseShardImp::getDatabaseImportStatus
Json::Value getDatabaseImportStatus() const override
Returns a JSON object detailing the status of an ongoing database import if one is running,...
Definition: DatabaseShardImp.cpp:1223
std::vector::push_back
T push_back(T... args)
ripple::NodeStore::DatabaseShardImp::secondLatestShardIndex_
std::optional< std::uint32_t > secondLatestShardIndex_
Definition: DatabaseShardImp.h:286
ripple::NodeStore::DatabaseShardImp::avgShardFileSz_
std::uint64_t avgShardFileSz_
Definition: DatabaseShardImp.h:270
ripple::NodeStore::DatabaseShardImp::callForTransactionSQLByLedgerSeq
bool callForTransactionSQLByLedgerSeq(LedgerIndex ledgerSeq, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the transaction SQLite db for the corresponding ledger.
Definition: DatabaseShardImp.cpp:2052
ripple::base_uint< 256 >
ripple::NodeStore::DatabaseShardImp::updatePeers
void updatePeers(std::lock_guard< std::mutex > const &lock) const
Definition: DatabaseShardImp.cpp:2201
std::sample
T sample(T... args)
ripple::NodeStore::DatabaseShardImp::getPreShards
std::string getPreShards() override
Get shard indexes being imported.
Definition: DatabaseShardImp.cpp:426
ripple::NodeStore::DatabaseShardImp::databaseImportStatus_
std::unique_ptr< DatabaseImportStatus > databaseImportStatus_
Definition: DatabaseShardImp.h:289
ripple::NodeStore::DatabaseShardImp::getWriteLoad
std::int32_t getWriteLoad() const override
Retrieve the estimated number of pending write operations.
Definition: DatabaseShardImp.cpp:1085
std::thread::joinable
T joinable(T... args)
ripple::NodeStore::DatabaseShardImp::findAcquireIndex
std::optional< std::uint32_t > findAcquireIndex(std::uint32_t validLedgerSeq, std::lock_guard< std::mutex > const &)
Definition: DatabaseShardImp.cpp:1402
ripple::NodeStore::Database::firstLedgerSeq
std::uint32_t firstLedgerSeq(std::uint32_t shardIndex) const noexcept
Calculates the first ledger sequence for a given shard index.
Definition: Database.h:257
ripple::Config::reporting
bool reporting() const
Definition: Config.h:337
ripple::OperatingMode::DISCONNECTED
@ DISCONNECTED
not ready to process requests
ripple::NodeStore::DatabaseShardImp::iterateTransactionSQLsBack
bool iterateTransactionSQLsBack(std::optional< std::uint32_t > maxShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateTransactionSQLsBack Checks out transaction databases for all shards in descending order starti...
Definition: DatabaseShardImp.cpp:2167
ripple::NodeStore::DatabaseShardImp::haltDatabaseImport_
std::atomic_bool haltDatabaseImport_
Definition: DatabaseShardImp.h:295
ripple::NodeStore::DatabaseShardImp::chooseHistoricalPath
boost::filesystem::path chooseHistoricalPath(std::lock_guard< std::mutex > const &) const
Definition: DatabaseShardImp.cpp:1909
ripple::rand_int
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
Definition: ripple/basics/random.h:115
ripple::NodeStore::DatabaseShardImp::sufficientStorage
bool sufficientStorage(std::uint32_t numShards, PathDesignation pathDesignation, std::lock_guard< std::mutex > const &) const
Definition: DatabaseShardImp.cpp:1602
ripple::NodeStore::DatabaseShardImp::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq, FetchReport &fetchReport, bool duplicate) override
Definition: DatabaseShardImp.cpp:1382
ripple::NodeStore::DatabaseShardImp::init_
bool init_
Definition: DatabaseShardImp.h:234
std::thread
STL class.
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::NodeStore::TaskQueue::addTask
void addTask(std::function< void()> task)
Adds a task to the queue.
Definition: TaskQueue.cpp:38
ripple::NodeStore::DatabaseShardImp::callForLedgerSQLByShardIndex
bool callForLedgerSQLByShardIndex(std::uint32_t const shardIndex, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the ledger SQLite db for the corresponding shard.
Definition: DatabaseShardImp.cpp:2038
ripple::Config
Definition: Config.h:89
ripple::NodeStore::DatabaseShardImp::doImportDatabase
void doImportDatabase()
Definition: DatabaseShardImp.cpp:784
std::ofstream
STL class.
ripple::Application::config
virtual Config & config()=0
ripple::NodeStore::DatabaseShardImp::dir_
boost::filesystem::path dir_
Definition: DatabaseShardImp.h:252
ripple::Config::standalone
bool standalone() const
Definition: Config.h:332
std::unique_lock
STL class.
ripple::Application::getRelationalDatabase
virtual RelationalDatabase & getRelationalDatabase()=0
ripple::NodeStore::DatabaseShardImp::removeFailedShard
void removeFailedShard(std::shared_ptr< Shard > &shard)
Definition: DatabaseShardImp.cpp:1690
ripple::NodeStore::DatabaseShard
A collection of historical shards.
Definition: DatabaseShard.h:37
std::to_string
T to_string(T... args)
ripple::NodeStore::DatabaseShardImp::getNumTasks
size_t getNumTasks() const override
Returns the number of queued tasks.
Definition: DatabaseShardImp.cpp:2194
ripple::NodeStore::DatabaseShardImp::importShard
bool importShard(std::uint32_t shardIndex, boost::filesystem::path const &srcDir) override
Import a shard from the shard archive handler into the shard database.
Definition: DatabaseShardImp.cpp:444
ripple::default_prng
beast::xor_shift_engine & default_prng()
Return the default random engine.
Definition: ripple/basics/random.h:65
ripple::NodeStore::DatabaseShardImp::store
void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t ledgerSeq) override
Store the object.
Definition: DatabaseShardImp.cpp:1102
ripple::NodeStore::TaskQueue::stop
void stop()
Definition: TaskQueue.cpp:32
ripple::NodeStore::DatabaseShardImp::PathDesignation::none
@ none
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::ShardState::finalized
@ finalized
std::set::erase
T erase(T... args)
ripple::NodeStore::DatabaseShardImp::initConfig
bool initConfig(std::lock_guard< std::mutex > const &)
Definition: DatabaseShardImp.cpp:1308
ripple::ConfigSection
Definition: ConfigSections.h:28
ripple::NodeStore::DatabaseShardImp::latestShardIndex_
std::optional< std::uint32_t > latestShardIndex_
Definition: DatabaseShardImp.h:285
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
ripple::NodeStore::DatabaseShardImp::acquireIndex_
std::uint32_t acquireIndex_
Definition: DatabaseShardImp.h:249
ripple::Overlay::foreach
void foreach(Function f) const
Visit every active peer.
Definition: Overlay.h:198
std::map
STL class.
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:60
std::transform
T transform(T... args)
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:333
ripple::NodeStore::DatabaseShardImp::startNodeToShard
Json::Value startNodeToShard() override
Initiates a NodeStore to ShardStore import and returns the result in a JSON object.
Definition: DatabaseShardImp.cpp:1252
ripple::NodeStore::DatabaseShardImp::preparedIndexes_
std::set< std::uint32_t > preparedIndexes_
Definition: DatabaseShardImp.h:246
ripple::NodeStore::DatabaseShardImp::init
bool init() override
Initialize the database.
Definition: DatabaseShardImp.cpp:70
std::weak_ptr
STL class.
ripple::NodeStore::Database::isStopping
bool isStopping() const
Definition: Database.cpp:146
ripple::rpcINTERNAL
@ rpcINTERNAL
Definition: ErrorCodes.h:130
ripple::Serializer
Definition: Serializer.h:39
ripple::NodeStore::DatabaseShardImp::historicalPaths_
std::vector< boost::filesystem::path > historicalPaths_
Definition: DatabaseShardImp.h:264
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Serializer::addBitString
int addBitString(base_uint< Bits, Tag > const &v)
Definition: Serializer.h:97
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::NodeStore::DatabaseShardImp::checkHistoricalPaths
bool checkHistoricalPaths(std::lock_guard< std::mutex > const &) const
Definition: DatabaseShardImp.cpp:1942
ripple::NodeStore::DatabaseShardImp::maxHistoricalShards_
std::uint32_t maxHistoricalShards_
Definition: DatabaseShardImp.h:261
ripple::NodeStore::Shard::callForTransactionSQL
bool callForTransactionSQL(std::function< bool(Args... args)> const &callback)
Invoke a callback on the transaction SQLite db.
Definition: nodestore/impl/Shard.h:240
ripple::ShardState::acquire
@ acquire
ripple::Application::getShardFamily
virtual Family * getShardFamily()=0
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:301
ripple::NodeStore::DatabaseShardImp::callForTransactionSQLByShardIndex
bool callForTransactionSQLByShardIndex(std::uint32_t const shardIndex, std::function< bool(soci::session &session)> const &callback) override
Invoke a callback on the transaction SQLite db for the corresponding shard.
Definition: DatabaseShardImp.cpp:2061
ripple::NodeStore::DatabaseShardImp::fetchLedger
std::shared_ptr< Ledger > fetchLedger(uint256 const &hash, std::uint32_t ledgerSeq) override
Fetch a ledger from the shard store.
Definition: DatabaseShardImp.cpp:552
std::vector::begin
T begin(T... args)
ripple::NodeStore::Database::seqToShardIndex
std::uint32_t seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
Calculates the shard index for a given ledger sequence.
Definition: Database.h:283
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:69
ripple::NodeStore::DatabaseShardImp::iterateShardsBack
bool iterateShardsBack(std::optional< std::uint32_t > maxShardIndex, std::function< bool(Shard &shard)> const &visit)
iterateShardsBack Visits all shards starting from given in descending order and calls given callback ...
Definition: DatabaseShardImp.cpp:2127
ripple::NodeStore::DatabaseShardImp::numHistoricalShards
std::uint32_t numHistoricalShards(std::lock_guard< std::mutex > const &lock) const
Definition: DatabaseShardImp.cpp:1730
ripple::LedgerMaster::getCurrentLedgerIndex
LedgerIndex getCurrentLedgerIndex()
Definition: LedgerMaster.cpp:207
ripple::NodeStore::DatabaseShardImp::relocateOutdatedShards
void relocateOutdatedShards(std::lock_guard< std::mutex > const &lock)
Definition: DatabaseShardImp.cpp:1741
ripple::NodeStore::DatabaseShardImp::iterateLedgerSQLsBack
bool iterateLedgerSQLsBack(std::optional< std::uint32_t > maxShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateLedgerSQLsBack Checks out ledger databases for all shards in descending order starting from gi...
Definition: DatabaseShardImp.cpp:2156
ripple::NodeStore::DatabaseShardImp::updateFileStats
void updateFileStats()
Definition: DatabaseShardImp.cpp:1541
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
ripple::Application::overlay
virtual Overlay & overlay()=0
ripple::NodeStore::DatabaseShardImp::shardBoundaryIndex
std::uint32_t shardBoundaryIndex() const
Definition: DatabaseShardImp.cpp:1714
std::count_if
T count_if(T... args)
std::vector::empty
T empty(T... args)
ripple::NodeStore::DatabaseShardImp::prepareShards
bool prepareShards(std::vector< std::uint32_t > const &shardIndexes) override
Prepare one or more shard indexes to be imported into the database.
Definition: DatabaseShardImp.cpp:299
std::unique
T unique(T... args)
std::optional< std::uint32_t >
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::NodeStore::Database::earliestShardIndex_
const std::uint32_t earliestShardIndex_
Definition: Database.h:325
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::NodeStore::DatabaseShardImp::startDatabaseImportThread
void startDatabaseImportThread(std::lock_guard< std::mutex > const &)
Definition: DatabaseShardImp.cpp:2213
ripple::NodeStore::DatabaseShardImp::setStoredInShard
bool setStoredInShard(std::shared_ptr< Shard > &shard, std::shared_ptr< Ledger const > const &ledger)
Definition: DatabaseShardImp.cpp:1657
ripple::NodeStore::DatabaseShardImp::canAdd_
bool canAdd_
Definition: DatabaseShardImp.h:255
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
std::vector::end
T end(T... args)
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:252
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:302
ripple::RangeSet
boost::icl::interval_set< T, std::less, ClosedInterval< T > > RangeSet
A set of closed intervals over the domain T.
Definition: RangeSet.h:69
ripple::NodeStore::Database::earliestLedgerSeq
std::uint32_t earliestLedgerSeq() const noexcept
Definition: Database.h:238
ripple::NodeStore::DatabaseShardImp::finalizeShard
void finalizeShard(std::shared_ptr< Shard > &shard, bool writeSQLite, std::optional< uint256 > const &expectedHash)
Definition: DatabaseShardImp.cpp:1467
std::max
T max(T... args)
ripple::NodeStore::Shard
Definition: nodestore/impl/Shard.h:55
ripple::NodeStore::Database::maxLedgers
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:152
ripple::NodeStore::DatabaseShardImp::DatabaseShardImp
DatabaseShardImp()=delete
std::make_reverse_iterator
T make_reverse_iterator(T... args)
std::unique_ptr
STL class.
ripple::loadLedgerHelper
std::shared_ptr< Ledger > loadLedgerHelper(LedgerInfo const &info, Application &app, bool acquire)
Definition: Ledger.cpp:1076
ripple::NodeStore::DatabaseShardImp::databaseImportMarker_
static constexpr auto databaseImportMarker_
Definition: DatabaseShardImp.h:276
std::unordered_map
STL class.
ripple::RelationalDatabase::getHashesByIndex
virtual std::optional< LedgerHashPair > getHashesByIndex(LedgerIndex ledgerIndex)=0
getHashesByIndex Returns the hashes of the ledger and its parent as specified by the ledgerIndex.
ripple::RPC::make_error
Json::Value make_error(error_code_i code)
Returns a new json object that reflects the error code.
Definition: ErrorCodes.cpp:178
ripple::PublisherStatus::available
@ available
ripple::NodeStore::DatabaseShardImp::prepareForNewShard
std::optional< PathDesignation > prepareForNewShard(std::uint32_t shardIndex, std::uint32_t numHistoricalShards, std::lock_guard< std::mutex > const &lock)
Definition: DatabaseShardImp.cpp:1878
ripple::ConfigSection::nodeDatabase
static std::string nodeDatabase()
Definition: ConfigSections.h:33
std::thread::join
T join(T... args)
std::exception::what
T what(T... args)
ripple::ShardState::queued
@ queued
ripple::NodeStore::DatabaseShardImp::iterateTransactionSQLsForward
bool iterateTransactionSQLsForward(std::optional< std::uint32_t > minShardIndex, std::function< bool(soci::session &session, std::uint32_t shardIndex)> const &callback) override
iterateTransactionSQLsForward Checks out transaction databases for all shards in ascending order star...
Definition: DatabaseShardImp.cpp:2115
ripple::HashPrefix::shardInfo
@ shardInfo
shard info for signing
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::NodeStore::DatabaseShardImp::prepareLedger
std::optional< std::uint32_t > prepareLedger(std::uint32_t validLedgerSeq) override
Prepare to store a new ledger in the shard being acquired.
Definition: DatabaseShardImp.cpp:229
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::NodeStore::DatabaseShardImp::ctx_
std::unique_ptr< nudb::context > ctx_
Definition: DatabaseShardImp.h:237
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:127
ripple::NodeStore::DatabaseShardImp::backendName_
std::string backendName_
Definition: DatabaseShardImp.h:258
ripple::NodeStore::DatabaseShardImp::getShardInfo
std::unique_ptr< ShardInfo > getShardInfo() const override
Query information about shards held.
Definition: DatabaseShardImp.cpp:692