20 #include <ripple/app/ledger/AcceptedLedger.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/ledger/LedgerToJson.h>
23 #include <ripple/app/ledger/TransactionMaster.h>
24 #include <ripple/app/main/Application.h>
25 #include <ripple/app/misc/Manifest.h>
26 #include <ripple/app/misc/impl/AccountTxPaging.h>
27 #include <ripple/app/rdb/backend/PostgresDatabase.h>
28 #include <ripple/app/rdb/backend/detail/Node.h>
29 #include <ripple/basics/BasicConfig.h>
30 #include <ripple/basics/StringUtilities.h>
31 #include <ripple/core/DatabaseCon.h>
32 #include <ripple/core/Pg.h>
33 #include <ripple/core/SociDB.h>
34 #include <ripple/json/json_reader.h>
35 #include <ripple/json/to_string.h>
36 #include <ripple/nodestore/DatabaseShard.h>
37 #include <boost/algorithm/string.hpp>
38 #include <boost/range/adaptor/transformed.hpp>
39 #include <soci/sqlite3/soci-sqlite3.h>
57 ,
j_(
app_.journal(
"PgPool"))
59 #ifdef RIPPLED_REPORTING
60 make_PgPool(config.section(
"ledger_tx_tables"),
j_)
65 #ifdef RIPPLED_REPORTING
76 #ifdef RIPPLED_REPORTING
170 #ifdef RIPPLED_REPORTING
171 auto log = app.
journal(
"Ledger");
174 sql <<
"SELECT ledger_hash, prev_hash, account_set_hash, trans_set_hash, "
175 "total_coins, closing_time, prev_closing_time, close_time_res, "
176 "close_flags, ledger_seq FROM ledgers ";
178 uint32_t expNumResults = 1;
180 if (
auto ledgerSeq = std::get_if<uint32_t>(&whichLedger))
184 else if (
auto ledgerHash = std::get_if<uint256>(&whichLedger))
186 sql << (
"WHERE ledger_hash = \'\\x" +
strHex(*ledgerHash) +
"\'");
192 expNumResults = minAndMax->second - minAndMax->first;
200 sql << (
"ORDER BY ledger_seq desc LIMIT 1");
204 JLOG(log.trace()) << __func__ <<
" : sql = " << sql.
str();
206 auto res = PgQuery(pgPool)(sql.
str().data());
209 JLOG(log.error()) << __func__ <<
" : Postgres response is null - sql = "
214 else if (res.status() != PGRES_TUPLES_OK)
216 JLOG(log.error()) << __func__
217 <<
" : Postgres response should have been "
218 "PGRES_TUPLES_OK but instead was "
219 << res.status() <<
" - msg = " << res.msg()
220 <<
" - sql = " << sql.
str();
225 JLOG(log.trace()) << __func__ <<
" Postgres result msg : " << res.msg();
227 if (res.isNull() || res.ntuples() == 0)
229 JLOG(log.debug()) << __func__
230 <<
" : Ledger not found. sql = " << sql.
str();
233 else if (res.ntuples() > 0)
235 if (res.nfields() != 10)
237 JLOG(log.error()) << __func__
238 <<
" : Wrong number of fields in Postgres "
239 "response. Expected 10, but got "
240 << res.nfields() <<
" . sql = " << sql.
str();
246 for (
size_t i = 0; i < res.ntuples(); ++i)
248 char const* hash = res.c_str(i, 0);
249 char const* prevHash = res.c_str(i, 1);
250 char const* accountHash = res.c_str(i, 2);
251 char const* txHash = res.c_str(i, 3);
259 JLOG(log.trace()) << __func__ <<
" - Postgres response = " << hash
260 <<
" , " << prevHash <<
" , " << accountHash <<
" , "
261 << txHash <<
" , " << totalCoins <<
", " << closeTime
262 <<
", " << parentCloseTime <<
", " << closeTimeRes
263 <<
", " << closeFlags <<
", " << ledgerSeq
264 <<
" - sql = " << sql.
str();
265 JLOG(log.debug()) << __func__
266 <<
" - Successfully fetched ledger with sequence = "
267 << ledgerSeq <<
" from Postgres";
279 info.
drops = totalCoins;
280 info.
closeTime = time_point{duration{closeTime}};
284 info.
seq = ledgerSeq;
311 [&infos, &app, &pgPool](
auto&& arg) {
315 assert(infos.
size() <= 1);
321 #ifdef RIPPLED_REPORTING
323 writeToLedgersDB(LedgerInfo
const& info, PgQuery& pgQuery,
beast::Journal& j)
325 JLOG(j.
debug()) << __func__;
326 auto cmd = boost::format(
327 R
"(INSERT INTO ledgers
328 VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
330 auto ledgerInsert = boost::str(
331 cmd % info.seq %
strHex(info.hash) %
strHex(info.parentHash) %
332 info.drops.drops() % info.closeTime.time_since_epoch().count() %
333 info.parentCloseTime.time_since_epoch().count() %
334 info.closeTimeResolution.count() % info.closeFlags %
336 JLOG(j.
trace()) << __func__ <<
" : "
338 <<
"query string = " << ledgerInsert;
340 auto res = pgQuery(ledgerInsert.data());
345 enum class DataFormat { binary, expanded };
354 if (format == DataFormat::binary)
362 for (
size_t i = 0; i < txns.size(); ++i)
364 auto& [txn, meta] = txns[i];
365 if (format == DataFormat::binary)
367 auto& transactions = std::get<TxnsDataBinary>(ret);
368 Serializer txnSer = txn->getSerializer();
369 Serializer metaSer = meta->getSerializer();
371 Blob txnBlob = txnSer.getData();
372 Blob metaBlob = metaSer.getData();
378 auto& transactions = std::get<TxnsData>(ret);
380 auto txnRet = std::make_shared<Transaction>(txn, reason, app);
381 txnRet->setLedger(ledgerSequences[i]);
383 auto txMeta = std::make_shared<TxMeta>(
384 txnRet->getID(), ledgerSequences[i], *meta);
392 processAccountTxStoredProcedureResult(
393 RelationalDatabase::AccountTxArgs
const& args,
399 ret.limit = args.limit;
403 if (result.
isMember(
"transactions"))
407 for (
auto& t : result[
"transactions"])
409 if (t.isMember(
"ledger_seq") && t.isMember(
"nodestore_hash"))
411 uint32_t ledgerSequence = t[
"ledger_seq"].asUInt();
413 t[
"nodestore_hash"].asString();
414 nodestoreHashHex.
erase(0, 2);
416 if (!nodestoreHash.parseHex(nodestoreHashHex))
419 if (nodestoreHash.isNonZero())
421 ledgerSequences.
push_back(ledgerSequence);
422 nodestoreHashes.
push_back(nodestoreHash);
427 return {ret, {
rpcINTERNAL,
"nodestoreHash is zero"}};
433 return {ret, {
rpcINTERNAL,
"missing postgres fields"}};
437 assert(nodestoreHashes.
size() == ledgerSequences.
size());
442 args.binary ? DataFormat::binary : DataFormat::expanded);
444 JLOG(j.
trace()) << __func__ <<
" : processed db results";
446 if (result.isMember(
"marker"))
448 auto& marker = result[
"marker"];
449 assert(marker.isMember(
"ledger"));
450 assert(marker.isMember(
"seq"));
452 marker[
"ledger"].asUInt(), marker[
"seq"].asUInt()};
454 assert(result.isMember(
"ledger_index_min"));
455 assert(result.isMember(
"ledger_index_max"));
457 result[
"ledger_index_min"].asUInt(),
458 result[
"ledger_index_max"].asUInt()};
461 else if (result.isMember(
"error"))
464 << __func__ <<
" : error = " << result[
"error"].asString();
471 return {ret, {
rpcINTERNAL,
"unexpected Postgres response"}};
476 JLOG(j.
debug()) << __func__ <<
" : "
477 <<
"Caught exception : " << e.
what();
486 #ifdef RIPPLED_REPORTING
494 #ifdef RIPPLED_REPORTING
495 auto seq = PgQuery(
pgPool_)(
"SELECT min_ledger()");
498 JLOG(
j_.
error()) <<
"Error querying minimum ledger sequence.";
500 else if (!seq.isNull())
509 #ifdef RIPPLED_REPORTING
510 auto seq = PgQuery(
pgPool_)(
"SELECT max_ledger()");
511 if (seq && !seq.isNull())
512 return seq.asBigInt();
520 #ifdef RIPPLED_REPORTING
521 auto range = PgQuery(
pgPool_)(
"SELECT complete_ledgers()");
523 return range.c_str();
531 using namespace std::chrono_literals;
532 #ifdef RIPPLED_REPORTING
533 auto age = PgQuery(
pgPool_)(
"SELECT age()");
534 if (!age || age.isNull())
535 JLOG(
j_.
debug()) <<
"No ledgers in database";
547 #ifdef RIPPLED_REPORTING
548 JLOG(
j_.
debug()) << __func__ <<
" : "
549 <<
"Beginning write to Postgres";
556 auto res = pg(
"BEGIN");
557 if (!res || res.status() != PGRES_COMMAND_OK)
560 msg <<
"bulkWriteToTable : Postgres insert error: " << res.msg();
561 Throw<std::runtime_error>(msg.
str());
567 if (!writeToLedgersDB(info, pg,
j_))
569 JLOG(
j_.
warn()) << __func__ <<
" : "
570 <<
"Failed to write to ledgers database.";
576 for (
auto const& data : accountTxData)
580 auto idx = data.transactionIndex;
581 auto ledgerSeq = data.ledgerSequence;
585 << txHash <<
'\t' <<
"\\\\x" << nodestoreHash
588 for (
auto const& a : data.accounts)
591 accountTransactionsCopyBuffer
597 pg.bulkInsert(
"transactions", transactionsCopyBuffer.
str());
599 "account_transactions", accountTransactionsCopyBuffer.
str());
602 if (!res || res.status() != PGRES_COMMAND_OK)
605 msg <<
"bulkWriteToTable : Postgres insert error: " << res.msg();
607 Throw<std::runtime_error>(msg.
str());
610 JLOG(
j_.
info()) << __func__ <<
" : "
611 <<
"Successfully wrote to Postgres";
617 <<
"Caught exception writing to Postgres : "
649 assert(infos.size() <= 1);
651 return infos[0].hash;
660 assert(infos.size() <= 1);
675 for (
auto& info : infos)
677 ret[info.seq] = {info.hash, info.parentHash};
687 #ifdef RIPPLED_REPORTING
691 "SELECT nodestore_hash"
692 " FROM transactions "
693 " WHERE ledger_seq = " +
699 JLOG(log.error()) << __func__
700 <<
" : Postgres response is null - query = " << query;
704 else if (res.status() != PGRES_TUPLES_OK)
706 JLOG(log.error()) << __func__
707 <<
" : Postgres response should have been "
708 "PGRES_TUPLES_OK but instead was "
709 << res.status() <<
" - msg = " << res.msg()
710 <<
" - query = " << query;
715 JLOG(log.trace()) << __func__ <<
" Postgres result msg : " << res.msg();
717 if (res.isNull() || res.ntuples() == 0)
719 JLOG(log.debug()) << __func__
720 <<
" : Ledger not found. query = " << query;
723 else if (res.ntuples() > 0)
725 if (res.nfields() != 1)
727 JLOG(log.error()) << __func__
728 <<
" : Wrong number of fields in Postgres "
729 "response. Expected 1, but got "
730 << res.nfields() <<
" . query = " << query;
736 JLOG(log.trace()) << __func__ <<
" : result = " << res.c_str()
737 <<
" : query = " << query;
738 for (
size_t i = 0; i < res.ntuples(); ++i)
740 char const* nodestoreHash = res.
c_str(i, 0);
742 if (!hash.
parseHex(nodestoreHash + 2))
749 return nodestoreHashes;
757 #ifdef RIPPLED_REPORTING
761 Throw<std::runtime_error>(
762 "called getTxHistory but not in reporting mode");
766 boost::format(
"SELECT nodestore_hash, ledger_seq "
768 " ORDER BY ledger_seq DESC LIMIT 20 "
777 <<
" : Postgres response is null - sql = " << sql;
781 else if (res.status() != PGRES_TUPLES_OK)
784 <<
" : Postgres response should have been "
785 "PGRES_TUPLES_OK but instead was "
786 << res.status() <<
" - msg = " << res.msg()
787 <<
" - sql = " << sql;
792 JLOG(
j_.
trace()) << __func__ <<
" Postgres result msg : " << res.msg();
794 if (res.isNull() || res.ntuples() == 0)
796 JLOG(
j_.
debug()) << __func__ <<
" : Empty postgres response";
800 else if (res.ntuples() > 0)
802 if (res.nfields() != 2)
805 <<
" : Wrong number of fields in Postgres "
806 "response. Expected 1, but got "
807 << res.nfields() <<
" . sql = " << sql;
813 JLOG(
j_.
trace()) << __func__ <<
" : Postgres result = " << res.c_str();
817 for (
size_t i = 0; i < res.ntuples(); ++i)
820 if (!hash.
parseHex(res.c_str(i, 0) + 2))
823 ledgerSequences.
push_back(res.asBigInt(i, 1));
827 for (
size_t i = 0; i < txns.size(); ++i)
829 auto const& [sttx, meta] = txns[i];
833 auto txn = std::make_shared<Transaction>(sttx, reason,
app_);
834 txn->setLedger(ledgerSequences[i]);
846 #ifdef RIPPLED_REPORTING
849 char const*& command = dbParams.first;
852 "SELECT account_tx($1::bytea, $2::bool, "
853 "$3::bigint, $4::bigint, $5::bigint, $6::bytea, "
854 "$7::bigint, $8::bool, $9::bigint, $10::bigint)";
857 values[1] = args.
forward ?
"true" :
"false";
860 if (args.
limit == 0 || args.
limit > page_length)
867 if (
auto range = std::get_if<LedgerRange>(&args.
ledger.value()))
872 else if (
auto hash = std::get_if<LedgerHash>(&args.
ledger.value()))
874 values[5] = (
"\\x" +
strHex(*hash));
877 auto sequence = std::get_if<LedgerSequence>(&args.
ledger.value()))
881 else if (std::get_if<LedgerShortcut>(&args.
ledger.value()))
888 JLOG(
j_.
error()) <<
"doAccountTxStoredProcedure - "
889 <<
"Error parsing ledger args";
899 for (
size_t i = 0; i < values.
size(); ++i)
902 << (values[i] ? values[i].value() :
"null");
905 auto res = PgQuery(
pgPool_)(dbParams);
909 <<
" : Postgres response is null - account = "
914 else if (res.status() != PGRES_TUPLES_OK)
917 <<
" : Postgres response should have been "
918 "PGRES_TUPLES_OK but instead was "
919 << res.status() <<
" - msg = " << res.msg()
925 JLOG(
j_.
trace()) << __func__ <<
" Postgres result msg : " << res.msg();
926 if (res.isNull() || res.ntuples() == 0)
929 <<
" : No data returned from Postgres : account = "
936 char const* resultStr = res.c_str();
937 JLOG(
j_.
trace()) << __func__ <<
" : "
938 <<
"postgres result = " << resultStr
943 bool success = reader.
parse(resultStr, resultStr + strlen(resultStr), v);
946 return processAccountTxStoredProcedureResult(args, v,
app_,
j_);
951 return {{}, {
rpcINTERNAL,
"Failed to deserialize Postgres result"}};
957 #ifdef RIPPLED_REPORTING
958 auto baseCmd = boost::format(R
"(SELECT tx('%s');)");
969 <<
" : Postgres response is null - tx ID = " <<
strHex(
id);
973 else if (res.status() != PGRES_TUPLES_OK)
977 <<
" : Postgres response should have been "
978 "PGRES_TUPLES_OK but instead was "
979 << res.status() <<
" - msg = " << res.msg()
980 <<
" - tx ID = " <<
strHex(
id);
986 << __func__ <<
" Postgres result msg : " << res.msg();
987 if (res.isNull() || res.ntuples() == 0)
991 <<
" : No data returned from Postgres : tx ID = " <<
strHex(
id);
997 char const* resultStr = res.c_str();
999 <<
"postgres result = " << resultStr;
1003 bool success = reader.
parse(resultStr, resultStr + strlen(resultStr), v);
1012 uint32_t ledgerSeq = v[
"ledger_seq"].
asUInt();
1019 v[
"min_seq"].asUInt(), v[
"max_seq"].asUInt())};
1026 Throw<std::runtime_error>(
1027 "Transaction::Locate - Invalid Postgres response");
1054 return std::make_unique<PostgresDatabaseImp>(app, config, jobQueue);
1060 #ifdef RIPPLED_REPORTING
1061 using namespace std::chrono_literals;
1062 auto age = PgQuery(
pgPool_)(
"SELECT age()");
1063 if (!age || age.isNull())
1065 reason =
"No ledgers in database";
1070 reason =
"No recently-published ledger";