rippled
PostgresDatabase.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/AcceptedLedger.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/ledger/LedgerToJson.h>
23 #include <ripple/app/ledger/TransactionMaster.h>
24 #include <ripple/app/main/Application.h>
25 #include <ripple/app/misc/Manifest.h>
26 #include <ripple/app/misc/impl/AccountTxPaging.h>
27 #include <ripple/app/rdb/backend/PostgresDatabase.h>
28 #include <ripple/app/rdb/backend/detail/Node.h>
29 #include <ripple/basics/BasicConfig.h>
30 #include <ripple/basics/StringUtilities.h>
31 #include <ripple/core/DatabaseCon.h>
32 #include <ripple/core/Pg.h>
33 #include <ripple/core/SociDB.h>
34 #include <ripple/json/json_reader.h>
35 #include <ripple/json/to_string.h>
36 #include <ripple/nodestore/DatabaseShard.h>
37 #include <boost/algorithm/string.hpp>
38 #include <boost/range/adaptor/transformed.hpp>
39 #include <soci/sqlite3/soci-sqlite3.h>
40 
41 namespace ripple {
42 
43 class PgPool;
44 
48 
50 {
51 public:
53  Application& app,
54  Config const& config,
55  JobQueue& jobQueue)
56  : app_(app)
57  , j_(app_.journal("PgPool"))
58  , pgPool_(
59 #ifdef RIPPLED_REPORTING
60  make_PgPool(config.section("ledger_tx_tables"), j_)
61 #endif
62  )
63  {
64  assert(config.reporting());
65 #ifdef RIPPLED_REPORTING
66  if (config.reporting() && !config.reportingReadOnly()) // use pg
67  {
68  initSchema(pgPool_);
69  }
70 #endif
71  }
72 
73  void
74  stop() override
75  {
76 #ifdef RIPPLED_REPORTING
77  pgPool_->stop();
78 #endif
79  }
80 
81  void
82  sweep() override;
83 
85  getMinLedgerSeq() override;
86 
88  getMaxLedgerSeq() override;
89 
91  getCompleteLedgers() override;
92 
94  getValidatedLedgerAge() override;
95 
96  bool
98  LedgerInfo const& info,
99  std::vector<AccountTransactionsData> const& accountTxData) override;
100 
102  getLedgerInfoByIndex(LedgerIndex ledgerSeq) override;
103 
105  getNewestLedgerInfo() override;
106 
108  getLedgerInfoByHash(uint256 const& ledgerHash) override;
109 
110  uint256
111  getHashByIndex(LedgerIndex ledgerIndex) override;
112 
114  getHashesByIndex(LedgerIndex ledgerIndex) override;
115 
117  getHashesByIndex(LedgerIndex minSeq, LedgerIndex maxSeq) override;
118 
120  getTxHashes(LedgerIndex seq) override;
121 
123  getTxHistory(LedgerIndex startIndex) override;
124 
126  getAccountTx(AccountTxArgs const& args) override;
127 
129  locateTransaction(uint256 const& id) override;
130 
131  bool
132  ledgerDbHasSpace(Config const& config) override;
133 
134  bool
135  transactionDbHasSpace(Config const& config) override;
136 
137  bool
138  isCaughtUp(std::string& reason) override;
139 
140 private:
144 
145  bool
146  dbHasSpace(Config const& config);
147 };
148 
161  std::shared_ptr<PgPool> const& pgPool,
162  std::variant<
164  uint256,
165  uint32_t,
166  std::pair<uint32_t, uint32_t>> const& whichLedger,
167  Application& app)
168 {
170 #ifdef RIPPLED_REPORTING
171  auto log = app.journal("Ledger");
172  assert(app.config().reporting());
173  std::stringstream sql;
174  sql << "SELECT ledger_hash, prev_hash, account_set_hash, trans_set_hash, "
175  "total_coins, closing_time, prev_closing_time, close_time_res, "
176  "close_flags, ledger_seq FROM ledgers ";
177 
178  uint32_t expNumResults = 1;
179 
180  if (auto ledgerSeq = std::get_if<uint32_t>(&whichLedger))
181  {
182  sql << "WHERE ledger_seq = " + std::to_string(*ledgerSeq);
183  }
184  else if (auto ledgerHash = std::get_if<uint256>(&whichLedger))
185  {
186  sql << ("WHERE ledger_hash = \'\\x" + strHex(*ledgerHash) + "\'");
187  }
188  else if (
189  auto minAndMax =
191  {
192  expNumResults = minAndMax->second - minAndMax->first;
193 
194  sql
195  << ("WHERE ledger_seq >= " + std::to_string(minAndMax->first) +
196  " AND ledger_seq <= " + std::to_string(minAndMax->second));
197  }
198  else
199  {
200  sql << ("ORDER BY ledger_seq desc LIMIT 1");
201  }
202  sql << ";";
203 
204  JLOG(log.trace()) << __func__ << " : sql = " << sql.str();
205 
206  auto res = PgQuery(pgPool)(sql.str().data());
207  if (!res)
208  {
209  JLOG(log.error()) << __func__ << " : Postgres response is null - sql = "
210  << sql.str();
211  assert(false);
212  return {};
213  }
214  else if (res.status() != PGRES_TUPLES_OK)
215  {
216  JLOG(log.error()) << __func__
217  << " : Postgres response should have been "
218  "PGRES_TUPLES_OK but instead was "
219  << res.status() << " - msg = " << res.msg()
220  << " - sql = " << sql.str();
221  assert(false);
222  return {};
223  }
224 
225  JLOG(log.trace()) << __func__ << " Postgres result msg : " << res.msg();
226 
227  if (res.isNull() || res.ntuples() == 0)
228  {
229  JLOG(log.debug()) << __func__
230  << " : Ledger not found. sql = " << sql.str();
231  return {};
232  }
233  else if (res.ntuples() > 0)
234  {
235  if (res.nfields() != 10)
236  {
237  JLOG(log.error()) << __func__
238  << " : Wrong number of fields in Postgres "
239  "response. Expected 10, but got "
240  << res.nfields() << " . sql = " << sql.str();
241  assert(false);
242  return {};
243  }
244  }
245 
246  for (size_t i = 0; i < res.ntuples(); ++i)
247  {
248  char const* hash = res.c_str(i, 0);
249  char const* prevHash = res.c_str(i, 1);
250  char const* accountHash = res.c_str(i, 2);
251  char const* txHash = res.c_str(i, 3);
252  std::int64_t totalCoins = res.asBigInt(i, 4);
253  std::int64_t closeTime = res.asBigInt(i, 5);
254  std::int64_t parentCloseTime = res.asBigInt(i, 6);
255  std::int64_t closeTimeRes = res.asBigInt(i, 7);
256  std::int64_t closeFlags = res.asBigInt(i, 8);
257  std::int64_t ledgerSeq = res.asBigInt(i, 9);
258 
259  JLOG(log.trace()) << __func__ << " - Postgres response = " << hash
260  << " , " << prevHash << " , " << accountHash << " , "
261  << txHash << " , " << totalCoins << ", " << closeTime
262  << ", " << parentCloseTime << ", " << closeTimeRes
263  << ", " << closeFlags << ", " << ledgerSeq
264  << " - sql = " << sql.str();
265  JLOG(log.debug()) << __func__
266  << " - Successfully fetched ledger with sequence = "
267  << ledgerSeq << " from Postgres";
268 
269  using time_point = NetClock::time_point;
270  using duration = NetClock::duration;
271 
272  LedgerInfo info;
273  if (!info.parentHash.parseHex(prevHash + 2))
274  assert(false);
275  if (!info.txHash.parseHex(txHash + 2))
276  assert(false);
277  if (!info.accountHash.parseHex(accountHash + 2))
278  assert(false);
279  info.drops = totalCoins;
280  info.closeTime = time_point{duration{closeTime}};
281  info.parentCloseTime = time_point{duration{parentCloseTime}};
282  info.closeFlags = closeFlags;
283  info.closeTimeResolution = duration{closeTimeRes};
284  info.seq = ledgerSeq;
285  if (!info.hash.parseHex(hash + 2))
286  assert(false);
287  info.validated = true;
288  infos.push_back(info);
289  }
290 
291 #endif
292  return infos;
293 }
294 
305  std::shared_ptr<PgPool> const& pgPool,
307  Application& app)
308 {
310  std::visit(
311  [&infos, &app, &pgPool](auto&& arg) {
312  infos = loadLedgerInfos(pgPool, arg, app);
313  },
314  whichLedger);
315  assert(infos.size() <= 1);
316  if (!infos.size())
317  return {};
318  return infos[0];
319 }
320 
321 #ifdef RIPPLED_REPORTING
322 static bool
323 writeToLedgersDB(LedgerInfo const& info, PgQuery& pgQuery, beast::Journal& j)
324 {
325  JLOG(j.debug()) << __func__;
326  auto cmd = boost::format(
327  R"(INSERT INTO ledgers
328  VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
329 
330  auto ledgerInsert = boost::str(
331  cmd % info.seq % strHex(info.hash) % strHex(info.parentHash) %
332  info.drops.drops() % info.closeTime.time_since_epoch().count() %
333  info.parentCloseTime.time_since_epoch().count() %
334  info.closeTimeResolution.count() % info.closeFlags %
335  strHex(info.accountHash) % strHex(info.txHash));
336  JLOG(j.trace()) << __func__ << " : "
337  << " : "
338  << "query string = " << ledgerInsert;
339 
340  auto res = pgQuery(ledgerInsert.data());
341 
342  return res;
343 }
344 
345 enum class DataFormat { binary, expanded };
348  Application& app,
349  std::vector<uint256>& nodestoreHashes,
350  std::vector<uint32_t>& ledgerSequences,
351  DataFormat format)
352 {
354  if (format == DataFormat::binary)
355  ret = TxnsDataBinary();
356  else
357  ret = TxnsData();
358 
359  std::vector<
361  txns = flatFetchTransactions(app, nodestoreHashes);
362  for (size_t i = 0; i < txns.size(); ++i)
363  {
364  auto& [txn, meta] = txns[i];
365  if (format == DataFormat::binary)
366  {
367  auto& transactions = std::get<TxnsDataBinary>(ret);
368  Serializer txnSer = txn->getSerializer();
369  Serializer metaSer = meta->getSerializer();
370  // SerialIter it(item->slice());
371  Blob txnBlob = txnSer.getData();
372  Blob metaBlob = metaSer.getData();
373  transactions.push_back(
374  std::make_tuple(txnBlob, metaBlob, ledgerSequences[i]));
375  }
376  else
377  {
378  auto& transactions = std::get<TxnsData>(ret);
379  std::string reason;
380  auto txnRet = std::make_shared<Transaction>(txn, reason, app);
381  txnRet->setLedger(ledgerSequences[i]);
382  txnRet->setStatus(COMMITTED);
383  auto txMeta = std::make_shared<TxMeta>(
384  txnRet->getID(), ledgerSequences[i], *meta);
385  transactions.push_back(std::make_pair(txnRet, txMeta));
386  }
387  }
388  return ret;
389 }
390 
392 processAccountTxStoredProcedureResult(
393  RelationalDatabase::AccountTxArgs const& args,
394  Json::Value& result,
395  Application& app,
396  beast::Journal j)
397 {
398  AccountTxResult ret;
399  ret.limit = args.limit;
400 
401  try
402  {
403  if (result.isMember("transactions"))
404  {
405  std::vector<uint256> nodestoreHashes;
406  std::vector<uint32_t> ledgerSequences;
407  for (auto& t : result["transactions"])
408  {
409  if (t.isMember("ledger_seq") && t.isMember("nodestore_hash"))
410  {
411  uint32_t ledgerSequence = t["ledger_seq"].asUInt();
412  std::string nodestoreHashHex =
413  t["nodestore_hash"].asString();
414  nodestoreHashHex.erase(0, 2);
415  uint256 nodestoreHash;
416  if (!nodestoreHash.parseHex(nodestoreHashHex))
417  assert(false);
418 
419  if (nodestoreHash.isNonZero())
420  {
421  ledgerSequences.push_back(ledgerSequence);
422  nodestoreHashes.push_back(nodestoreHash);
423  }
424  else
425  {
426  assert(false);
427  return {ret, {rpcINTERNAL, "nodestoreHash is zero"}};
428  }
429  }
430  else
431  {
432  assert(false);
433  return {ret, {rpcINTERNAL, "missing postgres fields"}};
434  }
435  }
436 
437  assert(nodestoreHashes.size() == ledgerSequences.size());
438  ret.transactions = flatFetchTransactions(
439  app,
440  nodestoreHashes,
441  ledgerSequences,
442  args.binary ? DataFormat::binary : DataFormat::expanded);
443 
444  JLOG(j.trace()) << __func__ << " : processed db results";
445 
446  if (result.isMember("marker"))
447  {
448  auto& marker = result["marker"];
449  assert(marker.isMember("ledger"));
450  assert(marker.isMember("seq"));
451  ret.marker = {
452  marker["ledger"].asUInt(), marker["seq"].asUInt()};
453  }
454  assert(result.isMember("ledger_index_min"));
455  assert(result.isMember("ledger_index_max"));
456  ret.ledgerRange = {
457  result["ledger_index_min"].asUInt(),
458  result["ledger_index_max"].asUInt()};
459  return {ret, rpcSUCCESS};
460  }
461  else if (result.isMember("error"))
462  {
463  JLOG(j.debug())
464  << __func__ << " : error = " << result["error"].asString();
465  return {
466  ret,
467  RPC::Status{rpcINVALID_PARAMS, result["error"].asString()}};
468  }
469  else
470  {
471  return {ret, {rpcINTERNAL, "unexpected Postgres response"}};
472  }
473  }
474  catch (std::exception& e)
475  {
476  JLOG(j.debug()) << __func__ << " : "
477  << "Caught exception : " << e.what();
478  return {ret, {rpcINTERNAL, e.what()}};
479  }
480 }
481 #endif
482 
483 void
485 {
486 #ifdef RIPPLED_REPORTING
487  pgPool_->idleSweeper();
488 #endif
489 }
490 
493 {
494 #ifdef RIPPLED_REPORTING
495  auto seq = PgQuery(pgPool_)("SELECT min_ledger()");
496  if (!seq)
497  {
498  JLOG(j_.error()) << "Error querying minimum ledger sequence.";
499  }
500  else if (!seq.isNull())
501  return seq.asInt();
502 #endif
503  return {};
504 }
505 
508 {
509 #ifdef RIPPLED_REPORTING
510  auto seq = PgQuery(pgPool_)("SELECT max_ledger()");
511  if (seq && !seq.isNull())
512  return seq.asBigInt();
513 #endif
514  return {};
515 }
516 
519 {
520 #ifdef RIPPLED_REPORTING
521  auto range = PgQuery(pgPool_)("SELECT complete_ledgers()");
522  if (range)
523  return range.c_str();
524 #endif
525  return "error";
526 }
527 
530 {
531  using namespace std::chrono_literals;
532 #ifdef RIPPLED_REPORTING
533  auto age = PgQuery(pgPool_)("SELECT age()");
534  if (!age || age.isNull())
535  JLOG(j_.debug()) << "No ledgers in database";
536  else
537  return std::chrono::seconds{age.asInt()};
538 #endif
539  return weeks{2};
540 }
541 
542 bool
544  LedgerInfo const& info,
545  std::vector<AccountTransactionsData> const& accountTxData)
546 {
547 #ifdef RIPPLED_REPORTING
548  JLOG(j_.debug()) << __func__ << " : "
549  << "Beginning write to Postgres";
550 
551  try
552  {
553  // Create a PgQuery object to run multiple commands over the same
554  // connection in a single transaction block.
555  PgQuery pg(pgPool_);
556  auto res = pg("BEGIN");
557  if (!res || res.status() != PGRES_COMMAND_OK)
558  {
559  std::stringstream msg;
560  msg << "bulkWriteToTable : Postgres insert error: " << res.msg();
561  Throw<std::runtime_error>(msg.str());
562  }
563 
564  // Writing to the ledgers db fails if the ledger already exists in the
565  // db. In this situation, the ETL process has detected there is another
566  // writer, and falls back to only publishing
567  if (!writeToLedgersDB(info, pg, j_))
568  {
569  JLOG(j_.warn()) << __func__ << " : "
570  << "Failed to write to ledgers database.";
571  return false;
572  }
573 
574  std::stringstream transactionsCopyBuffer;
575  std::stringstream accountTransactionsCopyBuffer;
576  for (auto const& data : accountTxData)
577  {
578  std::string txHash = strHex(data.txHash);
579  std::string nodestoreHash = strHex(data.nodestoreHash);
580  auto idx = data.transactionIndex;
581  auto ledgerSeq = data.ledgerSequence;
582 
583  transactionsCopyBuffer << std::to_string(ledgerSeq) << '\t'
584  << std::to_string(idx) << '\t' << "\\\\x"
585  << txHash << '\t' << "\\\\x" << nodestoreHash
586  << '\n';
587 
588  for (auto const& a : data.accounts)
589  {
590  std::string acct = strHex(a);
591  accountTransactionsCopyBuffer
592  << "\\\\x" << acct << '\t' << std::to_string(ledgerSeq)
593  << '\t' << std::to_string(idx) << '\n';
594  }
595  }
596 
597  pg.bulkInsert("transactions", transactionsCopyBuffer.str());
598  pg.bulkInsert(
599  "account_transactions", accountTransactionsCopyBuffer.str());
600 
601  res = pg("COMMIT");
602  if (!res || res.status() != PGRES_COMMAND_OK)
603  {
604  std::stringstream msg;
605  msg << "bulkWriteToTable : Postgres insert error: " << res.msg();
606  assert(false);
607  Throw<std::runtime_error>(msg.str());
608  }
609 
610  JLOG(j_.info()) << __func__ << " : "
611  << "Successfully wrote to Postgres";
612  return true;
613  }
614  catch (std::exception& e)
615  {
616  JLOG(j_.error()) << __func__
617  << "Caught exception writing to Postgres : "
618  << e.what();
619  assert(false);
620  return false;
621  }
622 #else
623  return false;
624 #endif
625 }
626 
629 {
630  return loadLedgerHelper(pgPool_, ledgerSeq, app_);
631 }
632 
635 {
636  return loadLedgerHelper(pgPool_, {}, app_);
637 }
638 
641 {
642  return loadLedgerHelper(pgPool_, ledgerHash, app_);
643 }
644 
645 uint256
647 {
648  auto infos = loadLedgerInfos(pgPool_, ledgerIndex, app_);
649  assert(infos.size() <= 1);
650  if (infos.size())
651  return infos[0].hash;
652  return {};
653 }
654 
657 {
658  LedgerHashPair p;
659  auto infos = loadLedgerInfos(pgPool_, ledgerIndex, app_);
660  assert(infos.size() <= 1);
661  if (infos.size())
662  {
663  p.ledgerHash = infos[0].hash;
664  p.parentHash = infos[0].parentHash;
665  return p;
666  }
667  return {};
668 }
669 
672 {
674  auto infos = loadLedgerInfos(pgPool_, std::make_pair(minSeq, maxSeq), app_);
675  for (auto& info : infos)
676  {
677  ret[info.seq] = {info.hash, info.parentHash};
678  }
679  return ret;
680 }
681 
684 {
685  std::vector<uint256> nodestoreHashes;
686 
687 #ifdef RIPPLED_REPORTING
688  auto log = app_.journal("Ledger");
689 
690  std::string query =
691  "SELECT nodestore_hash"
692  " FROM transactions "
693  " WHERE ledger_seq = " +
694  std::to_string(seq);
695  auto res = PgQuery(pgPool_)(query.c_str());
696 
697  if (!res)
698  {
699  JLOG(log.error()) << __func__
700  << " : Postgres response is null - query = " << query;
701  assert(false);
702  return {};
703  }
704  else if (res.status() != PGRES_TUPLES_OK)
705  {
706  JLOG(log.error()) << __func__
707  << " : Postgres response should have been "
708  "PGRES_TUPLES_OK but instead was "
709  << res.status() << " - msg = " << res.msg()
710  << " - query = " << query;
711  assert(false);
712  return {};
713  }
714 
715  JLOG(log.trace()) << __func__ << " Postgres result msg : " << res.msg();
716 
717  if (res.isNull() || res.ntuples() == 0)
718  {
719  JLOG(log.debug()) << __func__
720  << " : Ledger not found. query = " << query;
721  return {};
722  }
723  else if (res.ntuples() > 0)
724  {
725  if (res.nfields() != 1)
726  {
727  JLOG(log.error()) << __func__
728  << " : Wrong number of fields in Postgres "
729  "response. Expected 1, but got "
730  << res.nfields() << " . query = " << query;
731  assert(false);
732  return {};
733  }
734  }
735 
736  JLOG(log.trace()) << __func__ << " : result = " << res.c_str()
737  << " : query = " << query;
738  for (size_t i = 0; i < res.ntuples(); ++i)
739  {
740  char const* nodestoreHash = res.c_str(i, 0);
741  uint256 hash;
742  if (!hash.parseHex(nodestoreHash + 2))
743  assert(false);
744 
745  nodestoreHashes.push_back(hash);
746  }
747 #endif
748 
749  return nodestoreHashes;
750 }
751 
754 {
756 
757 #ifdef RIPPLED_REPORTING
758  if (!app_.config().reporting())
759  {
760  assert(false);
761  Throw<std::runtime_error>(
762  "called getTxHistory but not in reporting mode");
763  }
764 
765  std::string sql = boost::str(
766  boost::format("SELECT nodestore_hash, ledger_seq "
767  " FROM transactions"
768  " ORDER BY ledger_seq DESC LIMIT 20 "
769  "OFFSET %u;") %
770  startIndex);
771 
772  auto res = PgQuery(pgPool_)(sql.data());
773 
774  if (!res)
775  {
776  JLOG(j_.error()) << __func__
777  << " : Postgres response is null - sql = " << sql;
778  assert(false);
779  return {};
780  }
781  else if (res.status() != PGRES_TUPLES_OK)
782  {
783  JLOG(j_.error()) << __func__
784  << " : Postgres response should have been "
785  "PGRES_TUPLES_OK but instead was "
786  << res.status() << " - msg = " << res.msg()
787  << " - sql = " << sql;
788  assert(false);
789  return {};
790  }
791 
792  JLOG(j_.trace()) << __func__ << " Postgres result msg : " << res.msg();
793 
794  if (res.isNull() || res.ntuples() == 0)
795  {
796  JLOG(j_.debug()) << __func__ << " : Empty postgres response";
797  assert(false);
798  return {};
799  }
800  else if (res.ntuples() > 0)
801  {
802  if (res.nfields() != 2)
803  {
804  JLOG(j_.error()) << __func__
805  << " : Wrong number of fields in Postgres "
806  "response. Expected 1, but got "
807  << res.nfields() << " . sql = " << sql;
808  assert(false);
809  return {};
810  }
811  }
812 
813  JLOG(j_.trace()) << __func__ << " : Postgres result = " << res.c_str();
814 
815  std::vector<uint256> nodestoreHashes;
816  std::vector<uint32_t> ledgerSequences;
817  for (size_t i = 0; i < res.ntuples(); ++i)
818  {
819  uint256 hash;
820  if (!hash.parseHex(res.c_str(i, 0) + 2))
821  assert(false);
822  nodestoreHashes.push_back(hash);
823  ledgerSequences.push_back(res.asBigInt(i, 1));
824  }
825 
826  auto txns = flatFetchTransactions(app_, nodestoreHashes);
827  for (size_t i = 0; i < txns.size(); ++i)
828  {
829  auto const& [sttx, meta] = txns[i];
830  assert(sttx);
831 
832  std::string reason;
833  auto txn = std::make_shared<Transaction>(sttx, reason, app_);
834  txn->setLedger(ledgerSequences[i]);
835  txn->setStatus(COMMITTED);
836  ret.push_back(txn);
837  }
838 
839 #endif
840  return ret;
841 }
842 
845 {
846 #ifdef RIPPLED_REPORTING
847  pg_params dbParams;
848 
849  char const*& command = dbParams.first;
850  std::vector<std::optional<std::string>>& values = dbParams.second;
851  command =
852  "SELECT account_tx($1::bytea, $2::bool, "
853  "$3::bigint, $4::bigint, $5::bigint, $6::bytea, "
854  "$7::bigint, $8::bool, $9::bigint, $10::bigint)";
855  values.resize(10);
856  values[0] = "\\x" + strHex(args.account);
857  values[1] = args.forward ? "true" : "false";
858 
859  static std::uint32_t const page_length(200);
860  if (args.limit == 0 || args.limit > page_length)
861  values[2] = std::to_string(page_length);
862  else
863  values[2] = std::to_string(args.limit);
864 
865  if (args.ledger)
866  {
867  if (auto range = std::get_if<LedgerRange>(&args.ledger.value()))
868  {
869  values[3] = std::to_string(range->min);
870  values[4] = std::to_string(range->max);
871  }
872  else if (auto hash = std::get_if<LedgerHash>(&args.ledger.value()))
873  {
874  values[5] = ("\\x" + strHex(*hash));
875  }
876  else if (
877  auto sequence = std::get_if<LedgerSequence>(&args.ledger.value()))
878  {
879  values[6] = std::to_string(*sequence);
880  }
881  else if (std::get_if<LedgerShortcut>(&args.ledger.value()))
882  {
883  // current, closed and validated are all treated as validated
884  values[7] = "true";
885  }
886  else
887  {
888  JLOG(j_.error()) << "doAccountTxStoredProcedure - "
889  << "Error parsing ledger args";
890  return {};
891  }
892  }
893 
894  if (args.marker)
895  {
896  values[8] = std::to_string(args.marker->ledgerSeq);
897  values[9] = std::to_string(args.marker->txnSeq);
898  }
899  for (size_t i = 0; i < values.size(); ++i)
900  {
901  JLOG(j_.trace()) << "value " << std::to_string(i) << " = "
902  << (values[i] ? values[i].value() : "null");
903  }
904 
905  auto res = PgQuery(pgPool_)(dbParams);
906  if (!res)
907  {
908  JLOG(j_.error()) << __func__
909  << " : Postgres response is null - account = "
910  << strHex(args.account);
911  assert(false);
912  return {{}, {rpcINTERNAL, "Postgres error"}};
913  }
914  else if (res.status() != PGRES_TUPLES_OK)
915  {
916  JLOG(j_.error()) << __func__
917  << " : Postgres response should have been "
918  "PGRES_TUPLES_OK but instead was "
919  << res.status() << " - msg = " << res.msg()
920  << " - account = " << strHex(args.account);
921  assert(false);
922  return {{}, {rpcINTERNAL, "Postgres error"}};
923  }
924 
925  JLOG(j_.trace()) << __func__ << " Postgres result msg : " << res.msg();
926  if (res.isNull() || res.ntuples() == 0)
927  {
928  JLOG(j_.debug()) << __func__
929  << " : No data returned from Postgres : account = "
930  << strHex(args.account);
931 
932  assert(false);
933  return {{}, {rpcINTERNAL, "Postgres error"}};
934  }
935 
936  char const* resultStr = res.c_str();
937  JLOG(j_.trace()) << __func__ << " : "
938  << "postgres result = " << resultStr
939  << " : account = " << strHex(args.account);
940 
941  Json::Value v;
942  Json::Reader reader;
943  bool success = reader.parse(resultStr, resultStr + strlen(resultStr), v);
944  if (success)
945  {
946  return processAccountTxStoredProcedureResult(args, v, app_, j_);
947  }
948 #endif
949  // This shouldn't happen. Postgres should return a parseable error
950  assert(false);
951  return {{}, {rpcINTERNAL, "Failed to deserialize Postgres result"}};
952 }
953 
956 {
957 #ifdef RIPPLED_REPORTING
958  auto baseCmd = boost::format(R"(SELECT tx('%s');)");
959 
960  std::string txHash = "\\x" + strHex(id);
961  std::string sql = boost::str(baseCmd % txHash);
962 
963  auto res = PgQuery(pgPool_)(sql.data());
964 
965  if (!res)
966  {
967  JLOG(app_.journal("Transaction").error())
968  << __func__
969  << " : Postgres response is null - tx ID = " << strHex(id);
970  assert(false);
971  return {};
972  }
973  else if (res.status() != PGRES_TUPLES_OK)
974  {
975  JLOG(app_.journal("Transaction").error())
976  << __func__
977  << " : Postgres response should have been "
978  "PGRES_TUPLES_OK but instead was "
979  << res.status() << " - msg = " << res.msg()
980  << " - tx ID = " << strHex(id);
981  assert(false);
982  return {};
983  }
984 
985  JLOG(app_.journal("Transaction").trace())
986  << __func__ << " Postgres result msg : " << res.msg();
987  if (res.isNull() || res.ntuples() == 0)
988  {
989  JLOG(app_.journal("Transaction").debug())
990  << __func__
991  << " : No data returned from Postgres : tx ID = " << strHex(id);
992  // This shouldn't happen
993  assert(false);
994  return {};
995  }
996 
997  char const* resultStr = res.c_str();
998  JLOG(app_.journal("Transaction").debug())
999  << "postgres result = " << resultStr;
1000 
1001  Json::Value v;
1002  Json::Reader reader;
1003  bool success = reader.parse(resultStr, resultStr + strlen(resultStr), v);
1004  if (success)
1005  {
1006  if (v.isMember("nodestore_hash") && v.isMember("ledger_seq"))
1007  {
1008  uint256 nodestoreHash;
1009  if (!nodestoreHash.parseHex(
1010  v["nodestore_hash"].asString().substr(2)))
1011  assert(false);
1012  uint32_t ledgerSeq = v["ledger_seq"].asUInt();
1013  if (nodestoreHash.isNonZero())
1014  return {std::make_pair(nodestoreHash, ledgerSeq)};
1015  }
1016  if (v.isMember("min_seq") && v.isMember("max_seq"))
1017  {
1018  return {ClosedInterval<uint32_t>(
1019  v["min_seq"].asUInt(), v["max_seq"].asUInt())};
1020  }
1021  }
1022 #endif
1023  // Shouldn' happen. Postgres should return the ledger range searched if
1024  // the transaction was not found
1025  assert(false);
1026  Throw<std::runtime_error>(
1027  "Transaction::Locate - Invalid Postgres response");
1028  return {};
1029 }
1030 
1031 bool
1033 {
1034  /* Postgres server could be running on a different machine. */
1035 
1036  return true;
1037 }
1038 
1039 bool
1041 {
1042  return dbHasSpace(config);
1043 }
1044 
1045 bool
1047 {
1048  return dbHasSpace(config);
1049 }
1050 
1052 getPostgresDatabase(Application& app, Config const& config, JobQueue& jobQueue)
1053 {
1054  return std::make_unique<PostgresDatabaseImp>(app, config, jobQueue);
1055 }
1056 
1057 bool
1059 {
1060 #ifdef RIPPLED_REPORTING
1061  using namespace std::chrono_literals;
1062  auto age = PgQuery(pgPool_)("SELECT age()");
1063  if (!age || age.isNull())
1064  {
1065  reason = "No ledgers in database";
1066  return false;
1067  }
1068  if (std::chrono::seconds{age.asInt()} > 3min)
1069  {
1070  reason = "No recently-published ledger";
1071  return false;
1072  }
1073 #endif
1074  return true;
1075 }
1076 
1077 } // namespace ripple
ripple::COMMITTED
@ COMMITTED
Definition: Transaction.h:50
ripple::loadLedgerInfos
static std::vector< LedgerInfo > loadLedgerInfos(std::shared_ptr< PgPool > const &pgPool, std::variant< std::monostate, uint256, uint32_t, std::pair< uint32_t, uint32_t >> const &whichLedger, Application &app)
loadLedgerInfos Loads the ledger info for the specified ledger/s from the database
Definition: PostgresDatabase.cpp:160
ripple::Application
Definition: Application.h:115
std::vector::resize
T resize(T... args)
std::make_tuple
T make_tuple(T... args)
ripple::Blob
std::vector< unsigned char > Blob
Storage for linear binary data.
Definition: Blob.h:30
ripple::PostgresDatabaseImp
Definition: PostgresDatabase.cpp:49
std::string
STL class.
std::shared_ptr< PgPool >
ripple::LedgerInfo::parentHash
uint256 parentHash
Definition: ReadView.h:94
ripple::rpcINVALID_PARAMS
@ rpcINVALID_PARAMS
Definition: ErrorCodes.h:84
ripple::PostgresDatabaseImp::locateTransaction
Transaction::Locator locateTransaction(uint256 const &id) override
locateTransaction Returns information used to locate a transaction.
Definition: PostgresDatabase.cpp:955
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::RelationalDatabase::AccountTxArgs
Definition: RelationalDatabase.h:96
std::pair
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:91
ripple::PostgresDatabaseImp::transactionDbHasSpace
bool transactionDbHasSpace(Config const &config) override
transactionDbHasSpace Checks if the transaction database has available space.
Definition: PostgresDatabase.cpp:1046
ripple::PostgresDatabaseImp::stop
void stop() override
Definition: PostgresDatabase.cpp:74
std::vector
STL class.
std::vector::size
T size(T... args)
std::chrono::seconds
ripple::RelationalDatabase::AccountTxs
std::vector< AccountTx > AccountTxs
Definition: RelationalDatabase.h:86
std::stringstream
STL class.
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
ripple::PostgresDatabaseImp::getValidatedLedgerAge
std::chrono::seconds getValidatedLedgerAge() override
getValidatedLedgerAge Returns the age of the last validated ledger.
Definition: PostgresDatabase.cpp:529
std::monostate
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:83
Json::Reader
Unserialize a JSON document into a Value.
Definition: json_reader.h:36
ripple::RelationalDatabase::AccountTxArgs::account
AccountID account
Definition: RelationalDatabase.h:98
ripple::PostgresDatabaseImp::getNewestLedgerInfo
std::optional< LedgerInfo > getNewestLedgerInfo() override
getNewestLedgerInfo Returns the info of the newest saved ledger.
Definition: PostgresDatabase.cpp:634
ripple::LedgerInfo::txHash
uint256 txHash
Definition: ReadView.h:92
ripple::RelationalDatabase::AccountTxResult
Definition: RelationalDatabase.h:106
ripple::PostgresDatabaseImp::PostgresDatabaseImp
PostgresDatabaseImp(Application &app, Config const &config, JobQueue &jobQueue)
Definition: PostgresDatabase.cpp:52
ripple::PostgresDatabaseImp::getMinLedgerSeq
std::optional< LedgerIndex > getMinLedgerSeq() override
getMinLedgerSeq Returns the minimum ledger sequence in the Ledgers table.
Definition: PostgresDatabase.cpp:492
ripple::uint256
base_uint< 256 > uint256
Definition: base_uint.h:550
std::vector::push_back
T push_back(T... args)
ripple::LedgerInfo::closeTime
NetClock::time_point closeTime
Definition: ReadView.h:114
ripple::base_uint< 256 >
ripple::rpcSUCCESS
@ rpcSUCCESS
Definition: ErrorCodes.h:44
ripple::Config::reporting
bool reporting() const
Definition: Config.h:337
ripple::PostgresDatabaseImp::getHashesByIndex
std::optional< LedgerHashPair > getHashesByIndex(LedgerIndex ledgerIndex) override
getHashesByIndex Returns the hashes of the ledger and its parent as specified by the ledgerIndex.
Definition: PostgresDatabase.cpp:656
ripple::PostgresDatabaseImp::getAccountTx
std::pair< AccountTxResult, RPC::Status > getAccountTx(AccountTxArgs const &args) override
getAccountTx Get the last account transactions specified by the AccountTxArgs struct.
Definition: PostgresDatabase.cpp:844
ripple::LedgerHashPair::ledgerHash
uint256 ledgerHash
Definition: RelationalDatabase.h:38
ripple::PostgresDatabaseImp::app_
Application & app_
Definition: PostgresDatabase.cpp:141
ripple::Transaction::Locator
Definition: Transaction.h:315
ripple::RelationalDatabase::MetaTxsList
std::vector< txnMetaLedgerType > MetaTxsList
Definition: RelationalDatabase.h:88
ripple::Config
Definition: Config.h:89
ripple::LedgerHashPair
Definition: RelationalDatabase.h:36
ripple::Application::config
virtual Config & config()=0
ripple::RelationalDatabase::AccountTxArgs::marker
std::optional< AccountTxMarker > marker
Definition: RelationalDatabase.h:103
ripple::RelationalDatabase::AccountTxArgs::limit
uint32_t limit
Definition: RelationalDatabase.h:102
std::string::c_str
T c_str(T... args)
ripple::PostgresDatabaseImp::dbHasSpace
bool dbHasSpace(Config const &config)
Definition: PostgresDatabase.cpp:1032
ripple::LedgerInfo::closeFlags
int closeFlags
Definition: ReadView.h:105
std::to_string
T to_string(T... args)
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::PostgresDatabaseImp::getHashByIndex
uint256 getHashByIndex(LedgerIndex ledgerIndex) override
getHashByIndex Returns the hash of the ledger with the given sequence.
Definition: PostgresDatabase.cpp:646
std::string::erase
T erase(T... args)
ripple::getPostgresDatabase
std::unique_ptr< RelationalDatabase > getPostgresDatabase(Application &app, Config const &config, JobQueue &jobQueue)
Definition: PostgresDatabase.cpp:1052
ripple::TxnsDataBinary
RelationalDatabase::MetaTxsList TxnsDataBinary
Definition: PostgresDatabase.cpp:47
Json::Value::isMember
bool isMember(const char *key) const
Return true if the object has a member named key.
Definition: json_value.cpp:932
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
ripple::PostgresDatabaseImp::getTxHistory
std::vector< std::shared_ptr< Transaction > > getTxHistory(LedgerIndex startIndex) override
getTxHistory Returns the 20 most recent transactions starting from the given number.
Definition: PostgresDatabase.cpp:753
std::map
STL class.
ripple::PostgresDatabaseImp::getMaxLedgerSeq
std::optional< LedgerIndex > getMaxLedgerSeq() override
getMaxLedgerSeq Returns the maximum ledger sequence in the Ledgers table.
Definition: PostgresDatabase.cpp:507
ripple::range
ClosedInterval< T > range(T low, T high)
Create a closed range interval.
Definition: RangeSet.h:53
ripple::PostgresDatabaseImp::getLedgerInfoByIndex
std::optional< LedgerInfo > getLedgerInfoByIndex(LedgerIndex ledgerSeq) override
getLedgerInfoByIndex Returns a ledger by its sequence.
Definition: PostgresDatabase.cpp:628
ripple::PostgresDatabaseImp::getTxHashes
std::vector< uint256 > getTxHashes(LedgerIndex seq) override
getTxHashes Returns a vector of the hashes of transactions belonging to the ledger with the provided ...
Definition: PostgresDatabase.cpp:683
ripple::LedgerInfo::drops
XRPAmount drops
Definition: ReadView.h:96
ripple::AccountTxResult
RelationalDatabase::AccountTxResult AccountTxResult
Definition: PostgresDatabase.cpp:45
ripple::JobQueue
A pool of threads to perform work.
Definition: JobQueue.h:55
ripple::rpcINTERNAL
@ rpcINTERNAL
Definition: ErrorCodes.h:130
ripple::RelationalDatabase::AccountTxArgs::forward
bool forward
Definition: RelationalDatabase.h:101
std::string::substr
T substr(T... args)
ripple::PostgresDatabaseImp::isCaughtUp
bool isCaughtUp(std::string &reason) override
isCaughtUp returns whether the database is caught up with the network
Definition: PostgresDatabase.cpp:1058
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
ripple::PostgresDatabaseImp::getCompleteLedgers
std::string getCompleteLedgers() override
getCompleteLedgers Returns a string which contains a list of completed ledgers.
Definition: PostgresDatabase.cpp:518
ripple::LedgerInfo::closeTimeResolution
NetClock::duration closeTimeResolution
Definition: ReadView.h:108
Json::Value::asUInt
UInt asUInt() const
Definition: json_value.cpp:545
ripple::PostgresDatabase
Definition: PostgresDatabase.h:27
Json::Reader::parse
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:74
ripple::LedgerHashPair::parentHash
uint256 parentHash
Definition: RelationalDatabase.h:39
std::get_if
T get_if(T... args)
ripple::PostgresDatabaseImp::getLedgerInfoByHash
std::optional< LedgerInfo > getLedgerInfoByHash(uint256 const &ledgerHash) override
getLedgerInfoByHash Returns the info of the ledger with given hash.
Definition: PostgresDatabase.cpp:640
std::visit
T visit(T... args)
ripple::PostgresDatabaseImp::pgPool_
std::shared_ptr< PgPool > pgPool_
Definition: PostgresDatabase.cpp:143
std::optional
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::PostgresDatabaseImp::sweep
void sweep() override
sweep Sweeps the database.
Definition: PostgresDatabase.cpp:484
std::make_pair
T make_pair(T... args)
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:75
ripple::strHex
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
ripple::TxnsData
RelationalDatabase::AccountTxs TxnsData
Definition: PostgresDatabase.cpp:46
ripple::ClosedInterval
boost::icl::closed_interval< T > ClosedInterval
A closed interval over the domain T.
Definition: RangeSet.h:44
ripple::NetClock::duration
std::chrono::duration< rep, period > duration
Definition: chrono.h:55
ripple::RelationalDatabase::AccountTxArgs::ledger
std::optional< LedgerSpecifier > ledger
Definition: RelationalDatabase.h:99
ripple::base_uint::parseHex
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition: base_uint.h:496
ripple::Config::reportingReadOnly
bool reportingReadOnly() const
Definition: Config.h:349
ripple::LedgerInfo::validated
bool validated
Definition: ReadView.h:101
std::unique_ptr
STL class.
ripple::loadLedgerHelper
std::shared_ptr< Ledger > loadLedgerHelper(LedgerInfo const &info, Application &app, bool acquire)
Definition: Ledger.cpp:1076
ripple::PostgresDatabaseImp::ledgerDbHasSpace
bool ledgerDbHasSpace(Config const &config) override
ledgerDbHasSpace Checks if the ledger database has available space.
Definition: PostgresDatabase.cpp:1040
ripple::NetClock::time_point
std::chrono::time_point< NetClock > time_point
Definition: chrono.h:56
std::string::data
T data(T... args)
ripple::LedgerInfo::accountHash
uint256 accountHash
Definition: ReadView.h:93
std::exception::what
T what(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::PostgresDatabaseImp::j_
beast::Journal j_
Definition: PostgresDatabase.cpp:142
std::variant
Json::Value::asString
std::string asString() const
Returns the unquoted string value.
Definition: json_value.cpp:469
ripple::LedgerInfo::parentCloseTime
NetClock::time_point parentCloseTime
Definition: ReadView.h:84
ripple::flatFetchTransactions
std::vector< std::pair< std::shared_ptr< STTx const >, std::shared_ptr< STObject const > > > flatFetchTransactions(Application &app, std::vector< uint256 > &nodestoreHashes)
Definition: Ledger.cpp:1151
ripple::PostgresDatabaseImp::writeLedgerAndTransactions
bool writeLedgerAndTransactions(LedgerInfo const &info, std::vector< AccountTransactionsData > const &accountTxData) override
writeLedgerAndTransactions Writes new ledger and transaction data into the database.
Definition: PostgresDatabase.cpp:543