rippled
Database.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/Ledger.h>
21 #include <ripple/basics/chrono.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_value.h>
24 #include <ripple/nodestore/Database.h>
25 #include <ripple/protocol/HashPrefix.h>
26 #include <ripple/protocol/jss.h>
27 #include <chrono>
28 
29 namespace ripple {
30 namespace NodeStore {
31 
33  Scheduler& scheduler,
34  int readThreads,
35  Section const& config,
36  beast::Journal journal)
37  : j_(journal)
38  , scheduler_(scheduler)
39  , ledgersPerShard_(get<std::uint32_t>(
40  config,
41  "ledgers_per_shard",
43  , earliestLedgerSeq_(
44  get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
45  , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
46  , requestBundle_(get<int>(config, "rq_bundle", 4))
47  , readThreads_(std::max(1, readThreads))
48 {
49  assert(readThreads != 0);
50 
51  if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
52  Throw<std::runtime_error>("Invalid ledgers_per_shard");
53 
54  if (earliestLedgerSeq_ < 1)
55  Throw<std::runtime_error>("Invalid earliest_seq");
56 
57  if (requestBundle_ < 1 || requestBundle_ > 64)
58  Throw<std::runtime_error>("Invalid rq_bundle");
59 
60  for (int i = readThreads_.load(); i != 0; --i)
61  {
62  std::thread t(
63  [this](int i) {
65 
67  "db prefetch #" + std::to_string(i));
68 
69  decltype(read_) read;
70 
71  while (true)
72  {
73  {
75 
76  if (isStopping())
77  break;
78 
79  if (read_.empty())
80  {
82  readCondVar_.wait(lock);
84  }
85 
86  if (isStopping())
87  break;
88 
89  // extract multiple object at a time to minimize the
90  // overhead of acquiring the mutex.
91  for (int cnt = 0;
92  !read_.empty() && cnt != requestBundle_;
93  ++cnt)
94  read.insert(read_.extract(read_.begin()));
95  }
96 
97  for (auto it = read.begin(); it != read.end(); ++it)
98  {
99  assert(!it->second.empty());
100 
101  auto const& hash = it->first;
102  auto const& data = it->second;
103  auto const seqn = data[0].first;
104 
105  auto obj =
106  fetchNodeObject(hash, seqn, FetchType::async);
107 
108  // This could be further optimized: if there are
109  // multiple requests for sequence numbers mapping to
110  // multiple databases by sorting requests such that all
111  // indices mapping to the same database are grouped
112  // together and serviced by a single read.
113  for (auto const& req : data)
114  {
115  req.second(
116  (seqn == req.first) || isSameDB(req.first, seqn)
117  ? obj
118  : fetchNodeObject(
119  hash, req.first, FetchType::async));
120  }
121  }
122 
123  read.clear();
124  }
125 
126  --runningThreads_;
127  --readThreads_;
128  },
129  i);
130  t.detach();
131  }
132 }
133 
135 {
136  // NOTE!
137  // Any derived class should call the stop() method in its
138  // destructor. Otherwise, occasionally, the derived class may
139  // crash during shutdown when its members are accessed by one of
140  // these threads after the derived class is destroyed but before
141  // this base class is destroyed.
142  stop();
143 }
144 
145 bool
147 {
148  return readStopping_.load(std::memory_order_relaxed);
149 }
150 
152 Database::maxLedgers(std::uint32_t shardIndex) const noexcept
153 {
154  if (shardIndex > earliestShardIndex_)
155  return ledgersPerShard_;
156 
157  if (shardIndex == earliestShardIndex_)
158  return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;
159 
160  assert(!"Invalid shard index");
161  return 0;
162 }
163 
164 void
166 {
167  {
169 
170  if (!readStopping_.exchange(true, std::memory_order_relaxed))
171  {
172  JLOG(j_.debug()) << "Clearing read queue because of stop request";
173  read_.clear();
175  }
176  }
177 
178  JLOG(j_.debug()) << "Waiting for stop request to complete...";
179 
180  using namespace std::chrono;
181 
182  auto const start = steady_clock::now();
183 
184  while (readThreads_.load() != 0)
185  {
186  assert(steady_clock::now() - start < 30s);
188  }
189 
190  JLOG(j_.debug()) << "Stop request completed in "
191  << duration_cast<std::chrono::milliseconds>(
192  steady_clock::now() - start)
193  .count()
194  << " millseconds";
195 }
196 
197 void
199  uint256 const& hash,
200  std::uint32_t ledgerSeq,
201  std::function<void(std::shared_ptr<NodeObject> const&)>&& cb)
202 {
204 
205  if (!isStopping())
206  {
207  read_[hash].emplace_back(ledgerSeq, std::move(cb));
209  }
210 }
211 
212 void
214 {
215  Batch batch;
217  auto storeBatch = [&, fname = __func__]() {
218  try
219  {
220  dstBackend.storeBatch(batch);
221  }
222  catch (std::exception const& e)
223  {
224  JLOG(j_.error()) << "Exception caught in function " << fname
225  << ". Error: " << e.what();
226  return;
227  }
228 
229  std::uint64_t sz{0};
230  for (auto const& nodeObject : batch)
231  sz += nodeObject->getData().size();
232  storeStats(batch.size(), sz);
233  batch.clear();
234  };
235 
236  srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
237  assert(nodeObject);
238  if (!nodeObject) // This should never happen
239  return;
240 
241  batch.emplace_back(std::move(nodeObject));
242  if (batch.size() >= batchWritePreallocationSize)
243  storeBatch();
244  });
245 
246  if (!batch.empty())
247  storeBatch();
248 }
249 
250 // Perform a fetch and report the time it took
253  uint256 const& hash,
254  std::uint32_t ledgerSeq,
255  FetchType fetchType,
256  bool duplicate)
257 {
258  FetchReport fetchReport(fetchType);
259 
260  using namespace std::chrono;
261  auto const begin{steady_clock::now()};
262 
263  auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
264  auto dur = steady_clock::now() - begin;
265  fetchDurationUs_ += duration_cast<microseconds>(dur).count();
266  if (nodeObject)
267  {
268  ++fetchHitCount_;
269  fetchSz_ += nodeObject->getData().size();
270  }
272 
273  fetchReport.elapsed = duration_cast<milliseconds>(dur);
274  scheduler_.onFetch(fetchReport);
275  return nodeObject;
276 }
277 
278 bool
280  Ledger const& srcLedger,
281  std::shared_ptr<Backend> dstBackend)
282 {
283  auto fail = [&](std::string const& msg) {
284  JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq
285  << ". " << msg;
286  return false;
287  };
288 
289  if (srcLedger.info().hash.isZero())
290  return fail("Invalid hash");
291  if (srcLedger.info().accountHash.isZero())
292  return fail("Invalid account hash");
293 
294  auto& srcDB = const_cast<Database&>(srcLedger.stateMap().family().db());
295  if (&srcDB == this)
296  return fail("Source and destination databases are the same");
297 
298  Batch batch;
300  auto storeBatch = [&, fname = __func__]() {
301  std::uint64_t sz{0};
302  for (auto const& nodeObject : batch)
303  sz += nodeObject->getData().size();
304 
305  try
306  {
307  dstBackend->storeBatch(batch);
308  }
309  catch (std::exception const& e)
310  {
311  fail(
312  std::string("Exception caught in function ") + fname +
313  ". Error: " + e.what());
314  return false;
315  }
316 
317  storeStats(batch.size(), sz);
318  batch.clear();
319  return true;
320  };
321 
322  // Store ledger header
323  {
324  Serializer s(sizeof(std::uint32_t) + sizeof(LedgerInfo));
326  addRaw(srcLedger.info(), s);
327  auto nObj = NodeObject::createObject(
328  hotLEDGER, std::move(s.modData()), srcLedger.info().hash);
329  batch.emplace_back(std::move(nObj));
330  }
331 
332  bool error = false;
333  auto visit = [&](SHAMapTreeNode& node) {
334  if (!isStopping())
335  {
336  if (auto nodeObject = srcDB.fetchNodeObject(
337  node.getHash().as_uint256(), srcLedger.info().seq))
338  {
339  batch.emplace_back(std::move(nodeObject));
340  if (batch.size() < batchWritePreallocationSize || storeBatch())
341  return true;
342  }
343  }
344 
345  error = true;
346  return false;
347  };
348 
349  // Store the state map
350  if (srcLedger.stateMap().getHash().isNonZero())
351  {
352  if (!srcLedger.stateMap().isValid())
353  return fail("Invalid state map");
354 
355  srcLedger.stateMap().snapShot(false)->visitNodes(visit);
356  if (error)
357  return fail("Failed to store state map");
358  }
359 
360  // Store the transaction map
361  if (srcLedger.info().txHash.isNonZero())
362  {
363  if (!srcLedger.txMap().isValid())
364  return fail("Invalid transaction map");
365 
366  srcLedger.txMap().snapShot(false)->visitNodes(visit);
367  if (error)
368  return fail("Failed to store transaction map");
369  }
370 
371  if (!batch.empty() && !storeBatch())
372  return fail("Failed to store");
373 
374  return true;
375 }
376 
377 void
379 {
380  assert(obj.isObject());
381 
382  {
384  obj["read_queue"] = static_cast<Json::UInt>(read_.size());
385  }
386 
387  obj["read_threads_total"] = readThreads_.load();
388  obj["read_threads_running"] = runningThreads_.load();
389  obj["read_request_bundle"] = requestBundle_;
390 
391  obj[jss::node_writes] = std::to_string(storeCount_);
392  obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
393  obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
394  obj[jss::node_written_bytes] = std::to_string(storeSz_);
395  obj[jss::node_read_bytes] = std::to_string(fetchSz_);
396  obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
397 
398  if (auto c = getCounters())
399  {
400  obj[jss::node_read_errors] = std::to_string(c->readErrors);
401  obj[jss::node_read_retries] = std::to_string(c->readRetries);
402  obj[jss::node_write_retries] = std::to_string(c->writeRetries);
403  obj[jss::node_writes_delayed] = std::to_string(c->writesDelayed);
404  obj[jss::node_writes_duration_us] = std::to_string(c->writeDurationUs);
405  }
406 }
407 
408 } // namespace NodeStore
409 } // namespace ripple
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
ripple::SHAMap::isValid
bool isValid() const
Definition: SHAMap.h:625
ripple::DEFAULT_LEDGERS_PER_SHARD
static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD
The number of ledgers in a shard.
Definition: SystemParameters.h:76
ripple::NodeStore::Database::fetchDurationUs_
std::atomic< std::uint64_t > fetchDurationUs_
Definition: Database.h:360
ripple::NodeStore::Database
Persistency layer for NodeObject.
Definition: Database.h:51
ripple::NodeStore::read
void read(nudb::detail::istream &is, std::size_t &u)
Definition: varint.h:120
Json::Value::isObject
bool isObject() const
Definition: json_value.cpp:1027
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::SHAMap::getHash
SHAMapHash getHash() const
Definition: SHAMap.cpp:852
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
ripple::SHAMap::family
Family const & family() const
Definition: SHAMap.h:143
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
ripple::NodeStore::Database::ledgersPerShard_
const std::uint32_t ledgersPerShard_
Definition: Database.h:314
std::vector::reserve
T reserve(T... args)
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:91
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:162
Json::UInt
unsigned int UInt
Definition: json_forwards.h:27
std::vector< std::shared_ptr< NodeObject > >
std::vector::size
T size(T... args)
ripple::NodeStore::Database::read_
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:372
ripple::NodeStore::Database::requestBundle_
const int requestBundle_
Definition: Database.h:330
ripple::NodeObject::createObject
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
Definition: NodeObject.cpp:37
ripple::NodeStore::Database::fetchTotalCount_
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:359
ripple::NodeStore::Database::asyncFetch
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:198
ripple::NodeStore::Database::fetchSz_
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:306
std::lock_guard
STL class.
ripple::NodeStore::Database::stop
virtual void stop()
Definition: Database.cpp:165
ripple::NodeStore::FetchReport
Contains information about a fetch operation.
Definition: ripple/nodestore/Scheduler.h:32
std::function
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:83
ripple::NodeStore::Database::readStopping_
std::atomic< bool > readStopping_
Definition: Database.h:374
ripple::NodeStore::Database::readThreads_
std::atomic< int > readThreads_
Definition: Database.h:375
ripple::SHAMapHash::isNonZero
bool isNonZero() const
Definition: SHAMapHash.h:58
ripple::SHAMap::snapShot
std::shared_ptr< SHAMap > snapShot(bool isMutable) const
Definition: SHAMap.cpp:88
ripple::NodeStore::Database::fetchHitCount_
std::atomic< std::uint32_t > fetchHitCount_
Definition: Database.h:305
ripple::Family::db
virtual NodeStore::Database & db()=0
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:92
std::thread::detach
T detach(T... args)
ripple::NodeStore::Database::storeCount_
std::atomic< std::uint64_t > storeCount_
Definition: Database.h:357
ripple::NodeStore::batchWritePreallocationSize
@ batchWritePreallocationSize
Definition: nodestore/Types.h:34
ripple::NodeStore::Database::readLock_
std::mutex readLock_
Definition: Database.h:363
ripple::base_uint< 256 >
ripple::NodeStore::Database::storeSz_
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:358
ripple::Ledger::info
LedgerInfo const & info() const override
Returns information about the ledger.
Definition: Ledger.h:152
ripple::NodeStore::Database::importInternal
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:213
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:532
std::thread
STL class.
ripple::Ledger
Holds a ledger.
Definition: Ledger.h:76
std::atomic::load
T load(T... args)
chrono
ripple::Ledger::stateMap
SHAMap const & stateMap() const
Definition: Ledger.h:310
std::unique_lock
STL class.
ripple::NodeStore::Database::readCondVar_
std::condition_variable readCondVar_
Definition: Database.h:364
ripple::SHAMapTreeNode
Definition: SHAMapTreeNode.h:53
std::to_string
T to_string(T... args)
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::NodeStore::Scheduler::onFetch
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
std::condition_variable::wait
T wait(T... args)
ripple::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition: ripple/nodestore/Scheduler.h:60
ripple::NodeStore::Database::~Database
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:134
ripple::NodeStore::Database::for_each
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
ripple::NodeStore::Database::storeStats
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:333
ripple::NodeStore::FetchType
FetchType
Definition: ripple/nodestore/Scheduler.h:29
ripple::NodeStore::Database::isStopping
bool isStopping() const
Definition: Database.cpp:146
std::condition_variable::notify_one
T notify_one(T... args)
ripple::Serializer
Definition: Serializer.h:39
ripple::Ledger::txMap
SHAMap const & txMap() const
Definition: Ledger.h:322
ripple::NodeStore::FetchType::async
@ async
ripple::NodeStore::Database::getCounters
virtual std::optional< Backend::Counters< std::uint64_t > > getCounters() const
Retrieve backend read and write stats.
Definition: Database.h:401
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
std::atomic::exchange
T exchange(T... args)
ripple::NodeStore::Database::j_
const beast::Journal j_
Definition: Database.h:301
std
STL namespace.
ripple::XRP_LEDGER_EARLIEST_SEQ
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
Definition: SystemParameters.h:69
ripple::NodeStore::Database::getCountsJson
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:378
ripple::NodeStore::Database::earliestLedgerSeq_
const std::uint32_t earliestLedgerSeq_
Definition: Database.h:322
std::vector::empty
T empty(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:75
ripple::NodeStore::Database::fetchNodeObject
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:252
ripple::NodeStore::Database::scheduler_
Scheduler & scheduler_
Definition: Database.h:302
ripple::NodeStore::Database::runningThreads_
std::atomic< int > runningThreads_
Definition: Database.h:376
ripple::NodeStore::Database::isSameDB
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
ripple::NodeStore::Database::maxLedgers
std::uint32_t maxLedgers(std::uint32_t shardIndex) const noexcept
Calculates the maximum ledgers for a given shard index.
Definition: Database.cpp:152
ripple::NodeStore::Backend::storeBatch
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
ripple::NodeStore::FetchReport::elapsed
std::chrono::milliseconds elapsed
Definition: ripple/nodestore/Scheduler.h:38
std::condition_variable::notify_all
T notify_all(T... args)
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:93
std::this_thread::yield
T yield(T... args)
std::exception::what
T what(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::NodeStore::Database::Database
Database()=delete
std::chrono
ripple::NodeStore::Backend
A backend used for the NodeStore.
Definition: Backend.h:39