rippled
ReportingETL.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/rdb/backend/PostgresDatabase.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 
23 #include <ripple/beast/core/CurrentThreadName.h>
24 #include <ripple/json/json_reader.h>
25 #include <ripple/json/json_writer.h>
26 #include <boost/asio/connect.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/websocket.hpp>
30 #include <cctype>
31 #include <charconv>
32 #include <cstdlib>
33 #include <iostream>
34 #include <string>
35 #include <variant>
36 
37 namespace ripple {
38 
39 namespace detail {
42 toString(LedgerInfo const& info)
43 {
45  ss << "LedgerInfo { Sequence : " << info.seq
46  << " Hash : " << strHex(info.hash) << " TxHash : " << strHex(info.txHash)
47  << " AccountHash : " << strHex(info.accountHash)
48  << " ParentHash : " << strHex(info.parentHash) << " }";
49  return ss.str();
50 }
51 } // namespace detail
52 
53 void
57 {
59  size_t num = 0;
60  while (!stopping_ && (sle = writeQueue.pop()))
61  {
62  assert(sle);
63  if (!ledger->exists(sle->key()))
64  ledger->rawInsert(sle);
65 
66  if (flushInterval_ != 0 && (num % flushInterval_) == 0)
67  {
68  JLOG(journal_.debug()) << "Flushing! key = " << strHex(sle->key());
69  ledger->stateMap().flushDirty(hotACCOUNT_NODE);
70  }
71  ++num;
72  }
73 }
74 
78  org::xrpl::rpc::v1::GetLedgerResponse& data)
79 {
81  for (auto& txn : data.transactions_list().transactions())
82  {
83  auto& raw = txn.transaction_blob();
84 
85  SerialIter it{raw.data(), raw.size()};
86  STTx sttx{it};
87 
88  auto txSerializer = std::make_shared<Serializer>(sttx.getSerializer());
89 
90  TxMeta txMeta{
91  sttx.getTransactionID(), ledger->info().seq, txn.metadata_blob()};
92 
93  auto metaSerializer =
94  std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
95 
96  JLOG(journal_.trace())
97  << __func__ << " : "
98  << "Inserting transaction = " << sttx.getTransactionID();
99  uint256 nodestoreHash = ledger->rawTxInsertWithHash(
100  sttx.getTransactionID(), txSerializer, metaSerializer);
101  accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal_);
102  }
103  return accountTxData;
104 }
105 
107 ReportingETL::loadInitialLedger(uint32_t startingSequence)
108 {
109  // check that database is actually empty
110  auto ledger = std::const_pointer_cast<Ledger>(
112  if (ledger)
113  {
114  JLOG(journal_.fatal()) << __func__ << " : "
115  << "Database is not empty";
116  assert(false);
117  return {};
118  }
119 
120  // fetch the ledger from the network. This function will not return until
121  // either the fetch is successful, or the server is being shutdown. This
122  // only fetches the ledger header and the transactions+metadata
124  fetchLedgerData(startingSequence)};
125  if (!ledgerData)
126  return {};
127 
128  LedgerInfo lgrInfo =
129  deserializeHeader(makeSlice(ledgerData->ledger_header()), true);
130 
131  JLOG(journal_.debug()) << __func__ << " : "
132  << "Deserialized ledger header. "
133  << detail::toString(lgrInfo);
134 
135  ledger =
136  std::make_shared<Ledger>(lgrInfo, app_.config(), app_.getNodeFamily());
137  ledger->stateMap().clearSynching();
138  ledger->txMap().clearSynching();
139 
140 #ifdef RIPPLED_REPORTING
142  insertTransactions(ledger, *ledgerData);
143 #endif
144 
146 
148  std::thread asyncWriter{[this, &ledger, &writeQueue]() {
149  consumeLedgerData(ledger, writeQueue);
150  }};
151 
152  // download the full account state map. This function downloads full ledger
153  // data and pushes the downloaded data into the writeQueue. asyncWriter
154  // consumes from the queue and inserts the data into the Ledger object.
155  // Once the below call returns, all data has been pushed into the queue
156  loadBalancer_.loadInitialLedger(startingSequence, writeQueue);
157 
158  // null is used to represent the end of the queue
160  writeQueue.push(null);
161  // wait for the writer to finish
162  asyncWriter.join();
163 
164  if (!stopping_)
165  {
166  flushLedger(ledger);
167  if (app_.config().reporting())
168  {
169 #ifdef RIPPLED_REPORTING
170  dynamic_cast<PostgresDatabase*>(&app_.getRelationalDatabase())
171  ->writeLedgerAndTransactions(ledger->info(), accountTxData);
172 #endif
173  }
174  }
175  auto end = std::chrono::system_clock::now();
176  JLOG(journal_.debug()) << "Time to download and store ledger = "
177  << ((end - start).count()) / 1000000000.0;
178  return ledger;
179 }
180 
181 void
183 {
184  JLOG(journal_.debug()) << __func__ << " : "
185  << "Flushing ledger. "
186  << detail::toString(ledger->info());
187  // These are recomputed in setImmutable
188  auto& accountHash = ledger->info().accountHash;
189  auto& txHash = ledger->info().txHash;
190  auto& ledgerHash = ledger->info().hash;
191 
192  assert(
193  ledger->info().seq < XRP_LEDGER_EARLIEST_FEES ||
194  ledger->read(keylet::fees()));
195  ledger->setImmutable(false);
197 
198  auto numFlushed = ledger->stateMap().flushDirty(hotACCOUNT_NODE);
199 
200  auto numTxFlushed = ledger->txMap().flushDirty(hotTRANSACTION_NODE);
201 
202  {
203  Serializer s(128);
205  addRaw(ledger->info(), s);
207  hotLEDGER,
208  std::move(s.modData()),
209  ledger->info().hash,
210  ledger->info().seq);
211  }
212 
213  app_.getNodeStore().sync();
214 
215  auto end = std::chrono::system_clock::now();
216 
217  JLOG(journal_.debug()) << __func__ << " : "
218  << "Flushed " << numFlushed
219  << " nodes to nodestore from stateMap";
220  JLOG(journal_.debug()) << __func__ << " : "
221  << "Flushed " << numTxFlushed
222  << " nodes to nodestore from txMap";
223 
224  JLOG(journal_.debug()) << __func__ << " : "
225  << "Flush took "
226  << (end - start).count() / 1000000000.0
227  << " seconds";
228 
229  if (numFlushed == 0)
230  {
231  JLOG(journal_.fatal()) << __func__ << " : "
232  << "Flushed 0 nodes from state map";
233  assert(false);
234  }
235  if (numTxFlushed == 0)
236  {
237  JLOG(journal_.warn()) << __func__ << " : "
238  << "Flushed 0 nodes from tx map";
239  }
240 
241  // Make sure calculated hashes are correct
242  if (ledger->stateMap().getHash().as_uint256() != accountHash)
243  {
244  JLOG(journal_.fatal())
245  << __func__ << " : "
246  << "State map hash does not match. "
247  << "Expected hash = " << strHex(accountHash) << "Actual hash = "
248  << strHex(ledger->stateMap().getHash().as_uint256());
249  Throw<std::runtime_error>("state map hash mismatch");
250  }
251 
252  if (ledger->txMap().getHash().as_uint256() != txHash)
253  {
254  JLOG(journal_.fatal())
255  << __func__ << " : "
256  << "Tx map hash does not match. "
257  << "Expected hash = " << strHex(txHash) << "Actual hash = "
258  << strHex(ledger->txMap().getHash().as_uint256());
259  Throw<std::runtime_error>("tx map hash mismatch");
260  }
261 
262  if (ledger->info().hash != ledgerHash)
263  {
264  JLOG(journal_.fatal())
265  << __func__ << " : "
266  << "Ledger hash does not match. "
267  << "Expected hash = " << strHex(ledgerHash)
268  << "Actual hash = " << strHex(ledger->info().hash);
269  Throw<std::runtime_error>("ledger hash mismatch");
270  }
271 
272  JLOG(journal_.info()) << __func__ << " : "
273  << "Successfully flushed ledger! "
274  << detail::toString(ledger->info());
275 }
276 
277 void
279 {
280  app_.getOPs().pubLedger(ledger);
281 
282  setLastPublish();
283 }
284 
285 bool
286 ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
287 {
288  JLOG(journal_.info()) << __func__ << " : "
289  << "Attempting to publish ledger = "
290  << ledgerSequence;
291  size_t numAttempts = 0;
292  while (!stopping_)
293  {
294  auto ledger = app_.getLedgerMaster().getLedgerBySeq(ledgerSequence);
295 
296  if (!ledger)
297  {
298  JLOG(journal_.warn())
299  << __func__ << " : "
300  << "Trying to publish. Could not find ledger with sequence = "
301  << ledgerSequence;
302  // We try maxAttempts times to publish the ledger, waiting one
303  // second in between each attempt.
304  // If the ledger is not present in the database after maxAttempts,
305  // we attempt to take over as the writer. If the takeover fails,
306  // doContinuousETL will return, and this node will go back to
307  // publishing.
308  // If the node is in strict read only mode, we simply
309  // skip publishing this ledger and return false indicating the
310  // publish failed
311  if (numAttempts >= maxAttempts)
312  {
313  JLOG(journal_.error()) << __func__ << " : "
314  << "Failed to publish ledger after "
315  << numAttempts << " attempts.";
316  if (!readOnly_)
317  {
318  JLOG(journal_.info()) << __func__ << " : "
319  << "Attempting to become ETL writer";
320  return false;
321  }
322  else
323  {
324  JLOG(journal_.debug())
325  << __func__ << " : "
326  << "In strict read-only mode. "
327  << "Skipping publishing this ledger. "
328  << "Beginning fast forward.";
329  return false;
330  }
331  }
332  else
333  {
335  ++numAttempts;
336  }
337  continue;
338  }
339 
340  publishStrand_.post([this, ledger, fname = __func__]() {
341  app_.getOPs().pubLedger(ledger);
342  setLastPublish();
343  JLOG(journal_.info())
344  << fname << " : "
345  << "Published ledger. " << detail::toString(ledger->info());
346  });
347  return true;
348  }
349  return false;
350 }
351 
354 {
355  JLOG(journal_.debug()) << __func__ << " : "
356  << "Attempting to fetch ledger with sequence = "
357  << idx;
358 
360  loadBalancer_.fetchLedger(idx, false);
361  JLOG(journal_.trace()) << __func__ << " : "
362  << "GetLedger reply = " << response->DebugString();
363  return response;
364 }
365 
368 {
369  JLOG(journal_.debug()) << __func__ << " : "
370  << "Attempting to fetch ledger with sequence = "
371  << idx;
372 
374  loadBalancer_.fetchLedger(idx, true);
375  JLOG(journal_.trace()) << __func__ << " : "
376  << "GetLedger reply = " << response->DebugString();
377  return response;
378 }
379 
383  org::xrpl::rpc::v1::GetLedgerResponse& rawData)
384 {
385  JLOG(journal_.info()) << __func__ << " : "
386  << "Beginning ledger update";
387 
388  LedgerInfo lgrInfo =
389  deserializeHeader(makeSlice(rawData.ledger_header()), true);
390 
391  JLOG(journal_.debug()) << __func__ << " : "
392  << "Deserialized ledger header. "
393  << detail::toString(lgrInfo);
394 
395  next->setLedgerInfo(lgrInfo);
396 
397  next->stateMap().clearSynching();
398  next->txMap().clearSynching();
399 
401  insertTransactions(next, rawData)};
402 
403  JLOG(journal_.debug())
404  << __func__ << " : "
405  << "Inserted all transactions. Number of transactions = "
406  << rawData.transactions_list().transactions_size();
407 
408  for (auto& obj : rawData.ledger_objects().objects())
409  {
410  auto key = uint256::fromVoidChecked(obj.key());
411  if (!key)
412  throw std::runtime_error("Recevied malformed object ID");
413 
414  auto& data = obj.data();
415 
416  // indicates object was deleted
417  if (data.size() == 0)
418  {
419  JLOG(journal_.trace()) << __func__ << " : "
420  << "Erasing object = " << *key;
421  if (next->exists(*key))
422  next->rawErase(*key);
423  }
424  else
425  {
426  SerialIter it{data.data(), data.size()};
427  std::shared_ptr<SLE> sle = std::make_shared<SLE>(it, *key);
428 
429  if (next->exists(*key))
430  {
431  JLOG(journal_.trace()) << __func__ << " : "
432  << "Replacing object = " << *key;
433  next->rawReplace(sle);
434  }
435  else
436  {
437  JLOG(journal_.trace()) << __func__ << " : "
438  << "Inserting object = " << *key;
439  next->rawInsert(sle);
440  }
441  }
442  }
443  JLOG(journal_.debug())
444  << __func__ << " : "
445  << "Inserted/modified/deleted all objects. Number of objects = "
446  << rawData.ledger_objects().objects_size();
447 
448  if (!rawData.skiplist_included())
449  {
450  next->updateSkipList();
451  JLOG(journal_.warn())
452  << __func__ << " : "
453  << "tx process is not sending skiplist. This indicates that the tx "
454  "process is parsing metadata instead of doing a SHAMap diff. "
455  "Make sure tx process is running the same code as reporting to "
456  "use SHAMap diff instead of parsing metadata";
457  }
458 
459  JLOG(journal_.debug()) << __func__ << " : "
460  << "Finished ledger update. "
461  << detail::toString(next->info());
462  return {std::move(next), std::move(accountTxData)};
463 }
464 
465 // Database must be populated when this starts
467 ReportingETL::runETLPipeline(uint32_t startSequence)
468 {
469  /*
470  * Behold, mortals! This function spawns three separate threads, which talk
471  * to each other via 2 different thread safe queues and 1 atomic variable.
472  * All threads and queues are function local. This function returns when all
473  * of the threads exit. There are two termination conditions: the first is
474  * if the load thread encounters a write conflict. In this case, the load
475  * thread sets writeConflict, an atomic bool, to true, which signals the
476  * other threads to stop. The second termination condition is when the
477  * entire server is shutting down, which is detected in one of three ways:
478  * 1. isStopping() returns true if the server is shutting down
479  * 2. networkValidatedLedgers_.waitUntilValidatedByNetwork returns
480  * false, signaling the wait was aborted.
481  * 3. fetchLedgerDataAndDiff returns an empty optional, signaling the fetch
482  * was aborted.
483  * In all cases, the extract thread detects this condition,
484  * and pushes an empty optional onto the transform queue. The transform
485  * thread, upon popping an empty optional, pushes an empty optional onto the
486  * load queue, and then returns. The load thread, upon popping an empty
487  * optional, returns.
488  */
489 
490  JLOG(journal_.debug()) << __func__ << " : "
491  << "Starting etl pipeline";
492  writing_ = true;
493 
494  std::shared_ptr<Ledger> parent = std::const_pointer_cast<Ledger>(
495  app_.getLedgerMaster().getLedgerBySeq(startSequence - 1));
496  if (!parent)
497  {
498  assert(false);
499  Throw<std::runtime_error>("runETLPipeline: parent ledger is null");
500  }
501 
502  std::atomic_bool writeConflict = false;
503  std::optional<uint32_t> lastPublishedSequence;
504  constexpr uint32_t maxQueueSize = 1000;
505 
507  transformQueue{maxQueueSize};
508 
509  std::thread extracter{[this,
510  &startSequence,
511  &writeConflict,
512  &transformQueue]() {
513  beast::setCurrentThreadName("rippled: ReportingETL extract");
514  uint32_t currentSequence = startSequence;
515 
516  // there are two stopping conditions here.
517  // First, if there is a write conflict in the load thread, the ETL
518  // mechanism should stop.
519  // The other stopping condition is if the entire server is shutting
520  // down. This can be detected in a variety of ways. See the comment
521  // at the top of the function
523  currentSequence) &&
524  !writeConflict && !isStopping())
525  {
528  fetchLedgerDataAndDiff(currentSequence)};
529  // if the fetch is unsuccessful, stop. fetchLedger only returns
530  // false if the server is shutting down, or if the ledger was
531  // found in the database (which means another process already
532  // wrote the ledger that this process was trying to extract;
533  // this is a form of a write conflict). Otherwise,
534  // fetchLedgerDataAndDiff will keep trying to fetch the
535  // specified ledger until successful
536  if (!fetchResponse)
537  {
538  break;
539  }
540  auto end = std::chrono::system_clock::now();
541 
542  auto time = ((end - start).count()) / 1000000000.0;
543  auto tps =
544  fetchResponse->transactions_list().transactions_size() / time;
545 
546  JLOG(journal_.debug()) << "Extract phase time = " << time
547  << " . Extract phase tps = " << tps;
548 
549  transformQueue.push(std::move(fetchResponse));
550  ++currentSequence;
551  }
552  // empty optional tells the transformer to shut down
553  transformQueue.push({});
554  }};
555 
559  loadQueue{maxQueueSize};
560  std::thread transformer{[this,
561  &parent,
562  &writeConflict,
563  &loadQueue,
564  &transformQueue]() {
565  beast::setCurrentThreadName("rippled: ReportingETL transform");
566 
567  assert(parent);
568  parent = std::make_shared<Ledger>(*parent, NetClock::time_point{});
569  while (!writeConflict)
570  {
572  transformQueue.pop()};
573  // if fetchResponse is an empty optional, the extracter thread has
574  // stopped and the transformer should stop as well
575  if (!fetchResponse)
576  {
577  break;
578  }
579  if (isStopping())
580  continue;
581 
583  auto [next, accountTxData] =
584  buildNextLedger(parent, *fetchResponse);
585  auto end = std::chrono::system_clock::now();
586 
587  auto duration = ((end - start).count()) / 1000000000.0;
588  JLOG(journal_.debug()) << "transform time = " << duration;
589  // The below line needs to execute before pushing to the queue, in
590  // order to prevent this thread and the loader thread from accessing
591  // the same SHAMap concurrently
592  parent = std::make_shared<Ledger>(*next, NetClock::time_point{});
593  loadQueue.push(
594  std::make_pair(std::move(next), std::move(accountTxData)));
595  }
596  // empty optional tells the loader to shutdown
597  loadQueue.push({});
598  }};
599 
600  std::thread loader{[this,
601  &lastPublishedSequence,
602  &loadQueue,
603  &writeConflict]() {
604  beast::setCurrentThreadName("rippled: ReportingETL load");
605  size_t totalTransactions = 0;
606  double totalTime = 0;
607  while (!writeConflict)
608  {
612  result{loadQueue.pop()};
613  // if result is an empty optional, the transformer thread has
614  // stopped and the loader should stop as well
615  if (!result)
616  break;
617  if (isStopping())
618  continue;
619 
620  auto& ledger = result->first;
621  auto& accountTxData = result->second;
622 
624  // write to the key-value store
625  flushLedger(ledger);
626 
627  auto mid = std::chrono::system_clock::now();
628  // write to RDBMS
629  // if there is a write conflict, some other process has already
630  // written this ledger and has taken over as the ETL writer
631 #ifdef RIPPLED_REPORTING
632  if (!dynamic_cast<PostgresDatabase*>(&app_.getRelationalDatabase())
634  ledger->info(), accountTxData))
635  writeConflict = true;
636 #endif
637  auto end = std::chrono::system_clock::now();
638 
639  if (!writeConflict)
640  {
641  publishLedger(ledger);
642  lastPublishedSequence = ledger->info().seq;
643  }
644  // print some performance numbers
645  auto kvTime = ((mid - start).count()) / 1000000000.0;
646  auto relationalTime = ((end - mid).count()) / 1000000000.0;
647 
648  size_t numTxns = accountTxData.size();
649  totalTime += kvTime;
650  totalTransactions += numTxns;
651  JLOG(journal_.info())
652  << "Load phase of etl : "
653  << "Successfully published ledger! Ledger info: "
654  << detail::toString(ledger->info())
655  << ". txn count = " << numTxns
656  << ". key-value write time = " << kvTime
657  << ". relational write time = " << relationalTime
658  << ". key-value tps = " << numTxns / kvTime
659  << ". relational tps = " << numTxns / relationalTime
660  << ". total key-value tps = " << totalTransactions / totalTime;
661  }
662  }};
663 
664  // wait for all of the threads to stop
665  loader.join();
666  extracter.join();
667  transformer.join();
668  writing_ = false;
669 
670  JLOG(journal_.debug()) << __func__ << " : "
671  << "Stopping etl pipeline";
672 
673  return lastPublishedSequence;
674 }
675 
676 // main loop. The software begins monitoring the ledgers that are validated
677 // by the nework. The member networkValidatedLedgers_ keeps track of the
678 // sequences of ledgers validated by the network. Whenever a ledger is validated
679 // by the network, the software looks for that ledger in the database. Once the
680 // ledger is found in the database, the software publishes that ledger to the
681 // ledgers stream. If a network validated ledger is not found in the database
682 // after a certain amount of time, then the software attempts to take over
683 // responsibility of the ETL process, where it writes new ledgers to the
684 // database. The software will relinquish control of the ETL process if it
685 // detects that another process has taken over ETL.
686 void
688 {
689  auto ledger = std::const_pointer_cast<Ledger>(
691  if (!ledger)
692  {
693  JLOG(journal_.info()) << __func__ << " : "
694  << "Database is empty. Will download a ledger "
695  "from the network.";
696  if (startSequence_)
697  {
698  JLOG(journal_.info())
699  << __func__ << " : "
700  << "ledger sequence specified in config. "
701  << "Will begin ETL process starting with ledger "
702  << *startSequence_;
704  }
705  else
706  {
707  JLOG(journal_.info())
708  << __func__ << " : "
709  << "Waiting for next ledger to be validated by network...";
710  std::optional<uint32_t> mostRecentValidated =
712  if (mostRecentValidated)
713  {
714  JLOG(journal_.info()) << __func__ << " : "
715  << "Ledger " << *mostRecentValidated
716  << " has been validated. "
717  << "Downloading...";
718  ledger = loadInitialLedger(*mostRecentValidated);
719  }
720  else
721  {
722  JLOG(journal_.info()) << __func__ << " : "
723  << "The wait for the next validated "
724  << "ledger has been aborted. "
725  << "Exiting monitor loop";
726  return;
727  }
728  }
729  }
730  else
731  {
732  if (startSequence_)
733  {
734  Throw<std::runtime_error>(
735  "start sequence specified but db is already populated");
736  }
737  JLOG(journal_.info())
738  << __func__ << " : "
739  << "Database already populated. Picking up from the tip of history";
740  }
741  if (!ledger)
742  {
743  JLOG(journal_.error())
744  << __func__ << " : "
745  << "Failed to load initial ledger. Exiting monitor loop";
746  return;
747  }
748  else
749  {
750  publishLedger(ledger);
751  }
752  uint32_t nextSequence = ledger->info().seq + 1;
753 
754  JLOG(journal_.debug()) << __func__ << " : "
755  << "Database is populated. "
756  << "Starting monitor loop. sequence = "
757  << nextSequence;
758  while (!stopping_ &&
760  {
761  JLOG(journal_.info()) << __func__ << " : "
762  << "Ledger with sequence = " << nextSequence
763  << " has been validated by the network. "
764  << "Attempting to find in database and publish";
765  // Attempt to take over responsibility of ETL writer after 10 failed
766  // attempts to publish the ledger. publishLedger() fails if the
767  // ledger that has been validated by the network is not found in the
768  // database after the specified number of attempts. publishLedger()
769  // waits one second between each attempt to read the ledger from the
770  // database
771  //
772  // In strict read-only mode, when the software fails to find a
773  // ledger in the database that has been validated by the network,
774  // the software will only try to publish subsequent ledgers once,
775  // until one of those ledgers is found in the database. Once the
776  // software successfully publishes a ledger, the software will fall
777  // back to the normal behavior of trying several times to publish
778  // the ledger that has been validated by the network. In this
779  // manner, a reporting processing running in read-only mode does not
780  // need to restart if the database is wiped.
781  constexpr size_t timeoutSeconds = 10;
782  bool success = publishLedger(nextSequence, timeoutSeconds);
783  if (!success)
784  {
785  JLOG(journal_.warn())
786  << __func__ << " : "
787  << "Failed to publish ledger with sequence = " << nextSequence
788  << " . Beginning ETL";
789  // doContinousETLPipelined returns the most recent sequence
790  // published empty optional if no sequence was published
791  std::optional<uint32_t> lastPublished =
792  runETLPipeline(nextSequence);
793  JLOG(journal_.info()) << __func__ << " : "
794  << "Aborting ETL. Falling back to publishing";
795  // if no ledger was published, don't increment nextSequence
796  if (lastPublished)
797  nextSequence = *lastPublished + 1;
798  }
799  else
800  {
801  ++nextSequence;
802  }
803  }
804 }
805 
806 void
808 {
809  JLOG(journal_.debug()) << "Starting reporting in strict read only mode";
810  std::optional<uint32_t> mostRecent =
812  if (!mostRecent)
813  return;
814  uint32_t sequence = *mostRecent;
815  bool success = true;
816  while (!stopping_ &&
818  {
819  success = publishLedger(sequence, success ? 30 : 1);
820  ++sequence;
821  }
822 }
823 
824 void
826 {
827  worker_ = std::thread([this]() {
828  beast::setCurrentThreadName("rippled: ReportingETL worker");
829  if (readOnly_)
830  monitorReadOnly();
831  else
832  monitor();
833  });
834 }
835 
837  : app_(app)
838  , journal_(app.journal("ReportingETL"))
839  , publishStrand_(app_.getIOService())
840  , loadBalancer_(*this)
841 {
842  // if present, get endpoint from config
843  if (app_.config().exists("reporting"))
844  {
845 #ifndef RIPPLED_REPORTING
846  Throw<std::runtime_error>(
847  "Config file specifies reporting, but software was not built with "
848  "-Dreporting=1. To use reporting, configure CMake with "
849  "-Dreporting=1");
850 #endif
851  if (!app_.config().useTxTables())
852  Throw<std::runtime_error>(
853  "Reporting requires tx tables. Set use_tx_tables=1 in config "
854  "file, under [ledger_tx_tables] section");
855  Section section = app_.config().section("reporting");
856 
857  JLOG(journal_.debug()) << "Parsing config info";
858 
859  auto& vals = section.values();
860  for (auto& v : vals)
861  {
862  JLOG(journal_.debug()) << "val is " << v;
863  Section source = app_.config().section(v);
864 
865  auto optIp = source.get("source_ip");
866  if (!optIp)
867  continue;
868 
869  auto optWsPort = source.get("source_ws_port");
870  if (!optWsPort)
871  continue;
872 
873  auto optGrpcPort = source.get("source_grpc_port");
874  if (!optGrpcPort)
875  {
876  // add source without grpc port
877  // used in read-only mode to detect when new ledgers have
878  // been validated. Used for publishing
879  if (app_.config().reportingReadOnly())
880  loadBalancer_.add(*optIp, *optWsPort);
881  continue;
882  }
883 
884  loadBalancer_.add(*optIp, *optWsPort, *optGrpcPort);
885  }
886 
887  // this is true iff --reportingReadOnly was passed via command line
889 
890  // if --reportingReadOnly was not passed via command line, check config
891  // file. Command line takes precedence
892  if (!readOnly_)
893  {
894  auto const optRO = section.get("read_only");
895  if (optRO)
896  {
897  readOnly_ = (*optRO == "true" || *optRO == "1");
899  }
900  }
901 
902  // lambda throws a useful message if string to integer conversion fails
903  auto asciiToIntThrows =
904  [](auto& dest, std::string const& src, char const* onError) {
905  char const* const srcEnd = src.data() + src.size();
906  auto [ptr, err] = std::from_chars(src.data(), srcEnd, dest);
907 
908  if (err == std::errc())
909  // skip whitespace at end of string
910  while (ptr != srcEnd &&
911  std::isspace(static_cast<unsigned char>(*ptr)))
912  ++ptr;
913 
914  // throw if
915  // o conversion error or
916  // o entire string is not consumed
917  if (err != std::errc() || ptr != srcEnd)
918  Throw<std::runtime_error>(onError + src);
919  };
920 
921  // handle command line arguments
922  if (app_.config().START_UP == Config::StartUpType::FRESH && !readOnly_)
923  {
924  asciiToIntThrows(
927  "Expected integral START_LEDGER command line argument. Got: ");
928  }
929  // if not passed via command line, check config for start sequence
930  if (!startSequence_)
931  {
932  auto const optStartSeq = section.get("start_sequence");
933  if (optStartSeq)
934  {
935  // set a value so we can dereference
936  startSequence_ = 0;
937  asciiToIntThrows(
939  *optStartSeq,
940  "Expected integral start_sequence config entry. Got: ");
941  }
942  }
943 
944  auto const optFlushInterval = section.get("flush_interval");
945  if (optFlushInterval)
946  asciiToIntThrows(
948  *optFlushInterval,
949  "Expected integral flush_interval config entry. Got: ");
950 
951  auto const optNumMarkers = section.get("num_markers");
952  if (optNumMarkers)
953  asciiToIntThrows(
954  numMarkers_,
955  *optNumMarkers,
956  "Expected integral num_markers config entry. Got: ");
957  }
958 }
959 
960 } // namespace ripple
ripple::NetworkOPs::pubLedger
virtual void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted)=0
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::ReportingETL::flushInterval_
size_t flushInterval_
Used to determine when to write to the database during the initial ledger download.
Definition: ReportingETL.h:115
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::Application
Definition: Application.h:115
ripple::Application::getNodeFamily
virtual Family & getNodeFamily()=0
ripple::HashPrefix::ledgerMaster
@ ledgerMaster
ledger master data for signing
std::this_thread::sleep_for
T sleep_for(T... args)
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:241
ripple::ReportingETL::fetchLedgerData
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerData(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:353
ripple::ReportingETL::loadInitialLedger
std::shared_ptr< Ledger > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the datab...
Definition: ReportingETL.cpp:107
ripple::ReportingETL::startSequence_
std::optional< uint32_t > startSequence_
Ledger sequence to start ETL from.
Definition: ReportingETL.h:139
std::string
STL class.
std::shared_ptr< Ledger >
ripple::LedgerInfo::parentHash
uint256 parentHash
Definition: ReadView.h:94
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::Serializer::modData
Blob & modData()
Definition: Serializer.h:178
std::pair
ripple::ReportingETL::setLastPublish
void setLastPublish()
Definition: ReportingETL.h:155
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:91
ripple::ReportingETL::insertTransactions
std::vector< AccountTransactionsData > insertTransactions(std::shared_ptr< Ledger > &ledger, org::xrpl::rpc::v1::GetLedgerResponse &data)
Insert all of the extracted transactions into the ledger.
Definition: ReportingETL.cpp:76
ripple::ThreadSafeQueue::push
void push(T const &elt)
Definition: ETLHelpers.h:136
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:162
charconv
ripple::ReportingETL::flushLedger
void flushLedger(std::shared_ptr< Ledger > &ledger)
Write all new data to the key-value store.
Definition: ReportingETL.cpp:182
ripple::hotACCOUNT_NODE
@ hotACCOUNT_NODE
Definition: NodeObject.h:35
std::vector
STL class.
ripple::ReportingETL::loadBalancer_
ETLLoadBalancer loadBalancer_
Mechanism for communicating with ETL sources.
Definition: ReportingETL.h:96
std::chrono::seconds
ripple::ETLLoadBalancer::fetchLedger
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:699
ripple::NetworkValidatedLedgers::waitUntilValidatedByNetwork
bool waitUntilValidatedByNetwork(uint32_t sequence)
Waits for the sequence to be validated by the network.
Definition: ETLHelpers.h:89
ripple::NodeStore::Database::sync
virtual void sync()=0
std::stringstream
STL class.
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
ripple::NodeStore::Database::store
virtual void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t ledgerSeq)=0
Store the object.
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:83
ripple::hotTRANSACTION_NODE
@ hotTRANSACTION_NODE
Definition: NodeObject.h:36
ripple::base_uint< 256 >::fromVoidChecked
static std::optional< base_uint > fromVoidChecked(T const &from)
Definition: base_uint.h:319
iostream
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:92
ripple::NetworkValidatedLedgers::getMostRecent
std::optional< uint32_t > getMostRecent() const
Get most recently validated sequence.
Definition: ETLHelpers.h:67
ripple::XRP_LEDGER_EARLIEST_FEES
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
Definition: SystemParameters.h:73
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
ripple::ReportingETL::networkValidatedLedgers_
NetworkValidatedLedgers networkValidatedLedgers_
Mechanism for detecting when the network has validated a new ledger.
Definition: ReportingETL.h:100
ripple::TxMeta
Definition: TxMeta.h:32
ripple::base_uint< 256 >
ripple::ReportingETL::publishLedger
bool publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts=10)
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition: ReportingETL.cpp:286
ripple::ReportingETL::journal_
beast::Journal journal_
Definition: ReportingETL.h:75
ripple::Config::reporting
bool reporting() const
Definition: Config.h:337
std::thread
STL class.
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::ReportingETL::writing_
std::atomic_bool writing_
Whether the process is writing to the database. Used by server_info.
Definition: ReportingETL.h:134
ripple::ReportingETL::readOnly_
bool readOnly_
Whether the process is in strict read-only mode.
Definition: ReportingETL.h:131
ripple::ReportingETL::numMarkers_
size_t numMarkers_
This variable controls the number of GetLedgerData calls that will be executed in parallel during the...
Definition: ReportingETL.h:126
ripple::ETLLoadBalancer::loadInitialLedger
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:680
ripple::Application::config
virtual Config & config()=0
ripple::Application::getRelationalDatabase
virtual RelationalDatabase & getRelationalDatabase()=0
ripple::Config::useTxTables
bool useTxTables() const
Definition: Config.h:343
ripple::ReportingETL::monitorReadOnly
void monitorReadOnly()
Monitor the database for newly written ledgers.
Definition: ReportingETL.cpp:807
ripple::detail::toString
std::string toString(LedgerInfo const &info)
Convenience function for printing out basic ledger info.
Definition: ReportingETL.cpp:42
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::chrono::time_point
ripple::ReportingETL::publishStrand_
boost::asio::io_context::strand publishStrand_
Strand to ensure that ledgers are published in order.
Definition: ReportingETL.h:91
ripple::STTx
Definition: STTx.h:45
ripple::LedgerMaster::getLedgerBySeq
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
Definition: LedgerMaster.cpp:1818
std::errc
std::runtime_error
STL class.
ripple::SerialIter
Definition: Serializer.h:310
ripple::Config::START_UP
StartUpType START_UP
Definition: Config.h:154
std::atomic_bool
std::from_chars
T from_chars(T... args)
ripple::ReportingETL::fetchLedgerDataAndDiff
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerDataAndDiff(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:367
ripple::Serializer
Definition: Serializer.h:39
ripple::Config::setReportingReadOnly
void setReportingReadOnly(bool b)
Definition: Config.h:355
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ReportingETL::stopping_
std::atomic_bool stopping_
Whether the software is stopping.
Definition: ReportingETL.h:103
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::deserializeHeader
LedgerInfo deserializeHeader(Slice data, bool hasHash)
Deserialize a ledger header from a byte array.
Definition: InboundLedger.cpp:275
cstdlib
ripple::PostgresDatabase::writeLedgerAndTransactions
virtual bool writeLedgerAndTransactions(LedgerInfo const &info, std::vector< AccountTransactionsData > const &accountTxData)=0
writeLedgerAndTransactions Writes new ledger and transaction data into the database.
ripple::PostgresDatabase
Definition: PostgresDatabase.h:27
ripple::LedgerMaster::getValidatedLedger
std::shared_ptr< Ledger const > getValidatedLedger()
Definition: LedgerMaster.cpp:1664
ripple::Section::get
std::optional< T > get(std::string const &name) const
Definition: BasicConfig.h:138
cctype
ripple::ReportingETL::app_
Application & app_
Definition: ReportingETL.h:73
std::optional
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::ReportingETL::start
void start()
start all of the necessary components and begin ETL
Definition: ReportingETL.h:326
ripple::keylet::fees
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
Definition: Indexes.cpp:171
ripple::ReportingETL::worker_
std::thread worker_
Definition: ReportingETL.h:77
ripple::hotLEDGER
@ hotLEDGER
Definition: NodeObject.h:34
std::make_pair
T make_pair(T... args)
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:75
ripple::strHex
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
ripple::Config::reportingReadOnly
bool reportingReadOnly() const
Definition: Config.h:349
ripple::ReportingETL::buildNextLedger
std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > buildNextLedger(std::shared_ptr< Ledger > &parent, org::xrpl::rpc::v1::GetLedgerResponse &rawData)
Build the next ledger using the previous ledger and the extracted data.
Definition: ReportingETL.cpp:381
ripple::ReportingETL::monitor
void monitor()
Monitor the network for newly validated ledgers.
Definition: ReportingETL.cpp:687
ripple::ReportingETL::ReportingETL
ReportingETL(Application &app)
Definition: ReportingETL.cpp:836
ripple::Config::START_LEDGER
std::string START_LEDGER
Definition: Config.h:158
ripple::ReportingETL::isStopping
bool isStopping() const
Definition: ReportingETL.h:282
ripple::ReportingETL::consumeLedgerData
void consumeLedgerData(std::shared_ptr< Ledger > &ledger, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Consume data from a queue and insert that data into the ledger This function will continue to pull fr...
Definition: ReportingETL.cpp:54
ripple::ReportingETL::doWork
void doWork()
Definition: ReportingETL.cpp:825
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:93
ripple::ReportingETL::runETLPipeline
std::optional< uint32_t > runETLPipeline(uint32_t startSequence)
Run ETL.
Definition: ReportingETL.cpp:467
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:121
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:127
variant
string
ripple::ETLLoadBalancer::add
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:657
std::chrono::system_clock::now
T now(T... args)