20 #ifdef RIPPLED_REPORTING
26 #include <arpa/inet.h>
27 #include <netinet/in.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
32 #include <ripple/basics/contract.h>
33 #include <ripple/core/Pg.h>
34 #include <boost/asio/ssl/detail/openssl_init.hpp>
35 #include <boost/format.hpp>
55 noticeReceiver(
void* arg, PGresult
const* res)
58 JLOG(j.
info()) <<
"server message: " << PQresultErrorMessage(res);
66 if (error_.has_value())
69 ss << error_->first <<
": " << error_->second;
91 if (PQstatus(conn_.get()) == CONNECTION_OK)
98 conn_.reset(PQconnectdbParams(
99 reinterpret_cast<char const* const*
>(&config_.keywordsIdx[0]),
100 reinterpret_cast<char const* const*
>(&config_.valuesIdx[0]),
103 Throw<std::runtime_error>(
"No db connection struct");
108 if (PQstatus(conn_.get()) == CONNECTION_BAD)
111 ss <<
"DB connection status " << PQstatus(conn_.get()) <<
": "
112 << PQerrorMessage(conn_.get());
113 Throw<std::runtime_error>(ss.
str());
122 Pg::query(
char const* command,
std::size_t nParams,
char const*
const* values)
125 pg_result_type ret{
nullptr, [](PGresult* result) { PQclear(result); }};
140 ret.reset(PQexecParams(
155 ret.reset(PQexec(conn_.get(), command));
158 Throw<std::runtime_error>(
"no result structure returned");
165 JLOG(j_.
error()) <<
"database error, retrying: " << e.
what();
171 switch (PQresultStatus(ret.get()))
173 case PGRES_TUPLES_OK:
174 case PGRES_COMMAND_OK:
177 case PGRES_COPY_BOTH:
181 ss <<
"bad query result: " << PQresStatus(PQresultStatus(ret.get()))
182 <<
" error message: " << PQerrorMessage(conn_.get())
183 <<
", number of tuples: " << PQntuples(ret.get())
184 <<
", number of fields: " << PQnfields(ret.get());
186 PgResult retRes(ret.get(), conn_.get());
192 return PgResult(std::move(ret));
195 static pg_formatted_params
206 for (
auto const& value : values)
211 ss << value->c_str();
224 JLOG(j.
trace()) <<
"query: " << dbParams.first <<
". params: " << ss.
str();
229 Pg::query(pg_params
const& dbParams)
231 char const*
const& command = dbParams.first;
232 auto const formattedParams = formatParams(dbParams, j_);
235 formattedParams.size(),
236 formattedParams.size()
237 ?
reinterpret_cast<char const* const*
>(&formattedParams[0])
242 Pg::bulkInsert(
char const* table,
std::string const& records)
246 static auto copyCmd = boost::format(R
"(COPY %s FROM stdin)");
247 auto res = query(boost::str(copyCmd % table).c_str());
248 if (!res || res.status() != PGRES_COPY_IN)
251 ss <<
"bulkInsert to " << table
252 <<
". Postgres insert error: " << res.msg();
254 ss <<
". Query status not PGRES_COPY_IN: " << res.status();
255 Throw<std::runtime_error>(ss.
str());
258 if (PQputCopyData(conn_.get(), records.
c_str(), records.
size()) == -1)
261 ss <<
"bulkInsert to " << table
262 <<
". PQputCopyData error: " << PQerrorMessage(conn_.get());
264 Throw<std::runtime_error>(ss.
str());
267 if (PQputCopyEnd(conn_.get(),
nullptr) == -1)
270 ss <<
"bulkInsert to " << table
271 <<
". PQputCopyEnd error: " << PQerrorMessage(conn_.get());
273 Throw<std::runtime_error>(ss.
str());
277 pg_result_type copyEndResult{
278 nullptr, [](PGresult* result) { PQclear(result); }};
279 copyEndResult.reset(PQgetResult(conn_.get()));
280 ExecStatusType
status = PQresultStatus(copyEndResult.get());
281 if (status != PGRES_COMMAND_OK)
284 ss <<
"bulkInsert to " << table
285 <<
". PQputCopyEnd status not PGRES_COMMAND_OK: " <<
status;
287 Throw<std::runtime_error>(ss.
str());
298 pg_result_type res{
nullptr, [](PGresult* result) { PQclear(result); }};
303 res.reset(PQgetResult(conn_.get()));
309 switch (PQresultStatus(res.get()))
312 if (PQputCopyEnd(conn_.get(),
nullptr) != -1)
316 case PGRES_COPY_BOTH:
320 }
while (res && conn_);
322 return conn_ !=
nullptr;
331 static boost::asio::ssl::detail::openssl_init<true> initSsl;
349 pg_connection_type conn(
350 PQconnectdb(
get(pgConfig,
"conninfo").c_str()),
351 [](PGconn* conn) { PQfinish(conn); });
353 Throw<std::runtime_error>(
"Can't create DB connection.");
354 if (PQstatus(conn.get()) != CONNECTION_OK)
357 ss <<
"Initial DB connection failed: " << PQerrorMessage(conn.get());
358 Throw<std::runtime_error>(ss.
str());
361 int const sockfd = PQsocket(conn.get());
363 Throw<std::runtime_error>(
"No DB socket is open.");
364 struct sockaddr_storage addr;
365 socklen_t len =
sizeof(addr);
366 if (getpeername(sockfd,
reinterpret_cast<struct sockaddr*
>(&addr), &len) ==
369 Throw<std::system_error>(
374 bool const remember_ip =
get(pgConfig,
"remember_ip",
true);
378 config_.keywords.push_back(
"port");
379 config_.keywords.push_back(
"hostaddr");
383 if (addr.ss_family == AF_INET)
385 hostaddr.
assign(INET_ADDRSTRLEN,
'\0');
386 struct sockaddr_in const& ainfo =
387 reinterpret_cast<struct sockaddr_in&
>(addr);
390 AF_INET, &ainfo.sin_addr, &hostaddr[0], hostaddr.
size()))
392 Throw<std::system_error>(
395 "Can't get IPv4 address string.");
398 else if (addr.ss_family == AF_INET6)
400 hostaddr.
assign(INET6_ADDRSTRLEN,
'\0');
401 struct sockaddr_in6 const& ainfo =
402 reinterpret_cast<struct sockaddr_in6&
>(addr);
405 AF_INET6, &ainfo.sin6_addr, &hostaddr[0], hostaddr.
size()))
407 Throw<std::system_error>(
410 "Can't get IPv6 address string.");
414 config_.values.push_back(port.
c_str());
415 config_.values.push_back(hostaddr.
c_str());
417 std::unique_ptr<PQconninfoOption, void (*)(PQconninfoOption*)> connOptions(
418 PQconninfo(conn.get()),
419 [](PQconninfoOption* opts) { PQconninfoFree(opts); });
421 Throw<std::runtime_error>(
"Can't get DB connection options.");
424 for (PQconninfoOption* option = connOptions.get();
425 option->keyword !=
nullptr;
428 if (++nfields > maxFields)
431 ss <<
"DB returned connection options with > " << maxFields
433 Throw<std::runtime_error>(ss.
str());
438 (!
strcmp(option->keyword,
"hostaddr") ||
439 !
strcmp(option->keyword,
"port"))))
444 if (
strlen(option->keyword) > maxFieldSize ||
445 strlen(option->val) > maxFieldSize)
448 ss <<
"DB returned a connection option name or value with\n";
449 ss <<
"excessive size (>" << maxFieldSize <<
" bytes).\n";
450 ss <<
"option (possibly truncated): "
455 ss <<
" value (possibly truncated): "
458 Throw<std::runtime_error>(ss.
str());
460 config_.keywords.push_back(option->keyword);
461 config_.values.push_back(option->val);
464 config_.keywordsIdx.reserve(config_.keywords.size() + 1);
465 config_.valuesIdx.reserve(config_.values.size() + 1);
466 for (
std::size_t n = 0; n < config_.keywords.size(); ++n)
468 config_.keywordsIdx.push_back(config_.keywords[n].c_str());
469 config_.valuesIdx.push_back(config_.values[n].c_str());
471 config_.keywordsIdx.push_back(
nullptr);
472 config_.valuesIdx.push_back(
nullptr);
474 get_if_exists(pgConfig,
"max_connections", config_.max_connections);
485 ss <<
"max_connections: " << config_.max_connections <<
", "
486 <<
"timeout: " << config_.timeout.count() <<
", "
487 <<
"connection params: ";
489 for (
std::size_t i = 0; i < config_.keywords.size(); ++i)
495 ss << config_.keywords[i] <<
": "
496 << (config_.keywords[i] ==
"password" ?
"*" : config_.values[i]);
509 JLOG(j_.
info()) <<
"stopped";
513 PgPool::idleSweeper()
518 before = idle_.size();
522 idle_.upper_bound(clock_type::now() - config_.timeout);
523 for (
auto it = idle_.begin(); it != found;)
525 it = idle_.erase(it);
529 after = idle_.size();
532 JLOG(j_.
info()) <<
"Idle sweeper. connections: " << connections_
533 <<
". checked out: " << connections_ -
after
534 <<
". idle before, after sweep: " << before <<
", "
551 auto entry = idle_.rbegin();
552 ret = std::move(entry->second);
556 else if (connections_ < config_.max_connections)
559 ret = std::make_unique<Pg>(config_, j_, stop_, mutex_);
564 JLOG(j_.
error()) <<
"No database connections available.";
567 }
while (!ret && !stop_);
579 if (!stop_ && pg->clear())
581 idle_.emplace(clock_type::now(), std::move(pg));
598 auto ret = std::make_shared<PgPool>(pgConfig, j);
685 #define LATEST_SCHEMA_VERSION 1
687 char const* version_query = R
"(
688 CREATE TABLE IF NOT EXISTS version (version int NOT NULL,
689 fresh_pending int NOT NULL);
691 -- Version 0 means that no schema has been fully deployed.
694 IF NOT EXISTS (SELECT 1 FROM version) THEN
695 INSERT INTO version VALUES (0, 0);
699 -- Function to set the schema version. _in_pending should only be set to
700 -- non-zero prior to an attempt to initialize the schema from scratch.
701 -- After successful initialization, this should set to 0.
702 -- _in_version should be set to the version of schema that has been applied
703 -- once successful application has occurred.
704 CREATE OR REPLACE FUNCTION set_schema_version (
709 _current_version int;
711 IF _in_version IS NULL OR _in_pending IS NULL THEN RETURN; END IF;
712 IF EXISTS (SELECT 1 FROM version) THEN DELETE FROM version; END IF;
713 INSERT INTO version VALUES (_in_version, _in_pending);
718 -- PQexec() returns the output of the last statement in its response.
719 SELECT * FROM version;
724 "There is no such thing as schema version 0."
729 -- Table to store ledger headers.
730 CREATE TABLE IF NOT EXISTS ledgers (
731 ledger_seq bigint PRIMARY KEY,
732 ledger_hash bytea NOT NULL,
733 prev_hash bytea NOT NULL,
734 total_coins bigint NOT NULL,
735 closing_time bigint NOT NULL,
736 prev_closing_time bigint NOT NULL,
737 close_time_res bigint NOT NULL,
738 close_flags bigint NOT NULL,
739 account_set_hash bytea NOT NULL,
740 trans_set_hash bytea NOT NULL
743 -- Index for lookups by ledger hash.
744 CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers
745 USING hash (ledger_hash);
747 -- Transactions table. Deletes from the ledger table
748 -- cascade here based on ledger_seq.
749 CREATE TABLE IF NOT EXISTS transactions (
750 ledger_seq bigint NOT NULL,
751 transaction_index bigint NOT NULL,
752 trans_id bytea NOT NULL,
753 nodestore_hash bytea NOT NULL,
754 constraint transactions_pkey PRIMARY KEY (ledger_seq, transaction_index),
755 constraint transactions_fkey FOREIGN KEY (ledger_seq)
756 REFERENCES ledgers (ledger_seq) ON DELETE CASCADE
759 -- Index for lookups by transaction hash.
760 CREATE INDEX IF NOT EXISTS transactions_trans_id_idx ON transactions
761 USING hash (trans_id);
763 -- Table that maps accounts to transactions affecting them. Deletes from the
764 -- ledger table by way of transactions table cascade here based on ledger_seq.
765 CREATE TABLE IF NOT EXISTS account_transactions (
766 account bytea NOT NULL,
767 ledger_seq bigint NOT NULL,
768 transaction_index bigint NOT NULL,
769 constraint account_transactions_pkey PRIMARY KEY (account, ledger_seq,
771 constraint account_transactions_fkey FOREIGN KEY (ledger_seq,
772 transaction_index) REFERENCES transactions (
773 ledger_seq, transaction_index) ON DELETE CASCADE
776 -- Index to allow for fast cascading deletions and referential integrity.
777 CREATE INDEX IF NOT EXISTS fki_account_transactions_idx ON
778 account_transactions USING btree (ledger_seq, transaction_index);
780 -- Avoid inadvertent administrative tampering with committed data.
781 CREATE OR REPLACE RULE ledgers_update_protect AS ON UPDATE TO
782 ledgers DO INSTEAD NOTHING;
783 CREATE OR REPLACE RULE transactions_update_protect AS ON UPDATE TO
784 transactions DO INSTEAD NOTHING;
785 CREATE OR REPLACE RULE account_transactions_update_protect AS ON UPDATE TO
786 account_transactions DO INSTEAD NOTHING;
788 -- Stored procedure to assist with the tx() RPC call. Takes transaction hash
789 -- as input. If found, returns the ledger sequence in which it was applied.
790 -- If not, returns the range of ledgers searched.
791 CREATE OR REPLACE FUNCTION tx (
793 ) RETURNS jsonb AS $$
795 _min_seq bigint := min_ledger();
796 _max_seq bigint := max_ledger();
798 _nodestore_hash bytea;
801 IF _min_seq IS NULL THEN
802 RETURN jsonb_build_object('error', 'empty database');
804 IF length(_in_trans_id) != 32 THEN
805 RETURN jsonb_build_object('error', '_in_trans_id size: '
806 || to_char(length(_in_trans_id), '999'));
809 EXECUTE 'SELECT nodestore_hash, ledger_seq
812 AND ledger_seq BETWEEN $2 AND $3
813 ' INTO _nodestore_hash, _ledger_seq USING _in_trans_id, _min_seq, _max_seq;
814 IF _nodestore_hash IS NULL THEN
815 RETURN jsonb_build_object('min_seq', _min_seq, 'max_seq', _max_seq);
817 RETURN jsonb_build_object('nodestore_hash', _nodestore_hash, 'ledger_seq',
822 -- Return the earliest ledger sequence intended for range operations
823 -- that protect the bottom of the range from deletion. Return NULL if empty.
824 CREATE OR REPLACE FUNCTION min_ledger () RETURNS bigint AS $$
826 _min_seq bigint := (SELECT ledger_seq from min_seq);
828 IF _min_seq IS NULL THEN
829 RETURN (SELECT ledger_seq FROM ledgers ORDER BY ledger_seq ASC LIMIT 1);
836 -- Return the latest ledger sequence in the database, or NULL if empty.
837 CREATE OR REPLACE FUNCTION max_ledger () RETURNS bigint AS $$
839 RETURN (SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1);
843 -- account_tx() RPC helper. From the rippled reporting process, only the
844 -- parameters without defaults are required. For the parameters with
845 -- defaults, validation should be done by rippled, such as:
846 -- _in_account_id should be a valid xrp base58 address.
847 -- _in_forward either true or false according to the published api
848 -- _in_limit should be validated and not simply passed through from
851 -- For _in_ledger_index_min and _in_ledger_index_max, if passed in the
852 -- request, verify that their type is int and pass through as is.
853 -- For _ledger_hash, verify and convert from hex length 32 bytes and
854 -- prepend with \x (\\x C++).
856 -- For _in_ledger_index, if the input type is integer, then pass through
857 -- as is. If the type is string and contents = validated, then do not
858 -- set _in_ledger_index. Instead set _in_invalidated to TRUE.
860 -- There is no need for rippled to do any type of lookup on max/min
861 -- ledger range, lookup of hash, or the like. This functions does those
862 -- things, including error responses if bad input. Only the above must
863 -- be done to set the correct search range.
865 -- If a marker is present in the request, verify the members 'ledger'
866 -- and 'seq' are integers and they correspond to _in_marker_seq
869 -- JSON input field 'ledger' corresponds to _in_marker_seq
870 -- JSON input field 'seq' corresponds to _in_marker_index
871 CREATE OR REPLACE FUNCTION account_tx (
872 _in_account_id bytea,
875 _in_ledger_index_min bigint = NULL,
876 _in_ledger_index_max bigint = NULL,
877 _in_ledger_hash bytea = NULL,
878 _in_ledger_index bigint = NULL,
879 _in_validated bool = NULL,
880 _in_marker_seq bigint = NULL,
881 _in_marker_index bigint = NULL
882 ) RETURNS jsonb AS $$
886 _sort_order text := (SELECT CASE WHEN _in_forward IS TRUE THEN
887 'ASC' ELSE 'DESC' END);
897 _transactions jsonb[] := '{}';
899 IF _in_ledger_index_min IS NOT NULL OR
900 _in_ledger_index_max IS NOT NULL THEN
901 _min := (SELECT CASE WHEN _in_ledger_index_min IS NULL
902 THEN min_ledger() ELSE greatest(
903 _in_ledger_index_min, min_ledger()) END);
904 _max := (SELECT CASE WHEN _in_ledger_index_max IS NULL OR
905 _in_ledger_index_max = -1 THEN max_ledger() ELSE
906 least(_in_ledger_index_max, max_ledger()) END);
909 RETURN jsonb_build_object('error', 'max is less than min ledger');
912 ELSIF _in_ledger_hash IS NOT NULL OR _in_ledger_index IS NOT NULL
913 OR _in_validated IS TRUE THEN
914 IF _in_ledger_hash IS NOT NULL THEN
915 IF length(_in_ledger_hash) != 32 THEN
916 RETURN jsonb_build_object('error', '_in_ledger_hash size: '
917 || to_char(length(_in_ledger_hash), '999'));
919 EXECUTE 'SELECT ledger_seq
921 WHERE ledger_hash = $1'
922 INTO _min USING _in_ledger_hash::bytea;
924 IF _in_ledger_index IS NOT NULL AND _in_validated IS TRUE THEN
925 RETURN jsonb_build_object('error',
926 '_in_ledger_index cannot be set and _in_validated true');
928 IF _in_validated IS TRUE THEN
929 _in_ledger_index := max_ledger();
931 _min := (SELECT ledger_seq
933 WHERE ledger_seq = _in_ledger_index);
936 RETURN jsonb_build_object('error', 'ledger not found');
940 _min := min_ledger();
941 _max := max_ledger();
944 IF _in_marker_seq IS NOT NULL OR _in_marker_index IS NOT NULL THEN
946 IF _in_marker_seq IS NULL OR _in_marker_index IS NULL THEN
947 -- The rippled implementation returns no transaction results
948 -- if either of these values are missing.
952 IF _in_forward IS TRUE THEN
953 _between_min := _in_marker_seq;
954 _between_max := _max;
956 _between_min := _min;
957 _between_max := _in_marker_seq;
962 _between_min := _min;
963 _between_max := _max;
965 IF _between_max < _between_min THEN
966 RETURN jsonb_build_object('error', 'ledger search range is '
967 || to_char(_between_min, '999') || '-'
968 || to_char(_between_max, '999'));
972 SELECT transactions.ledger_seq, transactions.transaction_index,
973 transactions.trans_id, transactions.nodestore_hash
975 INNER JOIN account_transactions
976 ON transactions.ledger_seq =
977 account_transactions.ledger_seq
978 AND transactions.transaction_index =
979 account_transactions.transaction_index
980 WHERE account_transactions.account = $1
981 AND account_transactions.ledger_seq BETWEEN $2 AND $3
982 ORDER BY transactions.ledger_seq %s, transactions.transaction_index %s
983 ', _sort_order, _sort_order);
985 OPEN _cursor FOR EXECUTE _sql USING _in_account_id, _between_min,
988 FETCH _cursor INTO _record;
989 IF _record IS NULL THEN EXIT; END IF;
990 IF _marker IS TRUE THEN
991 IF _in_marker_seq = _record.ledger_seq THEN
992 IF _in_forward IS TRUE THEN
993 IF _in_marker_index > _record.transaction_index THEN
997 IF _in_marker_index < _record.transaction_index THEN
1005 _tally := _tally + 1;
1006 IF _tally > _in_limit THEN
1007 _ret_marker := jsonb_build_object(
1008 'ledger', _record.ledger_seq,
1009 'seq', _record.transaction_index);
1013 -- Is the transaction index in the tx object?
1014 _transactions := _transactions || jsonb_build_object(
1015 'ledger_seq', _record.ledger_seq,
1016 'transaction_index', _record.transaction_index,
1017 'trans_id', _record.trans_id,
1018 'nodestore_hash', _record.nodestore_hash);
1023 _result := jsonb_build_object('ledger_index_min', _min,
1024 'ledger_index_max', _max,
1025 'transactions', _transactions);
1026 IF _ret_marker IS NOT NULL THEN
1027 _result := _result || jsonb_build_object('marker', _ret_marker);
1031 $$ LANGUAGE plpgsql;
1033 -- Trigger prior to insert on ledgers table. Validates length of hash fields.
1034 -- Verifies ancestry based on ledger_hash & prev_hash as follows:
1035 -- 1) If ledgers is empty, allows insert.
1036 -- 2) For each new row, check for previous and later ledgers by a single
1037 -- sequence. For each that exist, confirm ancestry based on hashes.
1038 -- 3) Disallow inserts with no prior or next ledger by sequence if any
1039 -- ledgers currently exist. This disallows gaps to be introduced by
1040 -- way of inserting.
1041 CREATE OR REPLACE FUNCTION insert_ancestry() RETURNS TRIGGER AS $$
1046 IF length(NEW.ledger_hash) != 32 OR length(NEW.prev_hash) != 32 THEN
1047 RAISE 'ledger_hash and prev_hash must each be 32 bytes: %', NEW;
1050 IF (SELECT ledger_hash
1052 ORDER BY ledger_seq DESC
1053 LIMIT 1) = NEW.prev_hash THEN RETURN NEW; END IF;
1055 IF NOT EXISTS (SELECT 1 FROM LEDGERS) THEN RETURN NEW; END IF;
1057 _parent := (SELECT ledger_hash
1059 WHERE ledger_seq = NEW.ledger_seq - 1);
1060 _child := (SELECT prev_hash
1062 WHERE ledger_seq = NEW.ledger_seq + 1);
1063 IF _parent IS NULL AND _child IS NULL THEN
1064 RAISE 'Ledger Ancestry error: orphan.';
1066 IF _parent != NEW.prev_hash THEN
1067 RAISE 'Ledger Ancestry error: bad parent.';
1069 IF _child != NEW.ledger_hash THEN
1070 RAISE 'Ledger Ancestry error: bad child.';
1075 $$ LANGUAGE plpgsql;
1077 -- Trigger function prior to delete on ledgers table. Disallow gaps from
1078 -- forming. Do not allow deletions if both the previous and next ledgers
1079 -- are present. In other words, only allow either the least or greatest
1081 CREATE OR REPLACE FUNCTION delete_ancestry () RETURNS TRIGGER AS $$
1085 WHERE ledger_seq = OLD.ledger_seq + 1)
1086 AND EXISTS (SELECT 1
1088 WHERE ledger_seq = OLD.ledger_seq - 1) THEN
1089 RAISE 'Ledger Ancestry error: Can only delete the least or greatest '
1094 $$ LANGUAGE plpgsql;
1096 -- Track the minimum sequence that should be used for ranged queries
1097 -- with protection against deletion during the query. This should
1098 -- be updated before calling online_delete() to not block deleting that
1100 CREATE TABLE IF NOT EXISTS min_seq (
1101 ledger_seq bigint NOT NULL
1104 -- Set the minimum sequence for use in ranged queries with protection
1105 -- against deletion greater than or equal to the input parameter. This
1106 -- should be called prior to online_delete() with the same parameter
1107 -- value so that online_delete() is not blocked by range queries
1108 -- that are protected against concurrent deletion of the ledger at
1109 -- the bottom of the range. This function needs to be called from a
1110 -- separate transaction from that which executes online_delete().
1111 CREATE OR REPLACE FUNCTION prepare_delete (
1112 _in_last_rotated bigint
1113 ) RETURNS void AS $$
1115 IF EXISTS (SELECT 1 FROM min_seq) THEN
1116 DELETE FROM min_seq;
1118 INSERT INTO min_seq VALUES (_in_last_rotated + 1);
1120 $$ LANGUAGE plpgsql;
1122 -- Function to delete old data. All data belonging to ledgers prior to and
1123 -- equal to the _in_seq parameter will be deleted. This should be
1124 -- called with the input parameter equivalent to the value of lastRotated
1125 -- in rippled's online_delete routine.
1126 CREATE OR REPLACE FUNCTION online_delete (
1128 ) RETURNS void AS $$
1130 DELETE FROM LEDGERS WHERE ledger_seq <= _in_seq;
1132 $$ LANGUAGE plpgsql;
1134 -- Function to delete data from the top of the ledger range. Delete
1135 -- everything greater than the input parameter.
1136 -- It doesn't do a normal range delete because of the trigger protecting
1137 -- deletions causing gaps. Instead, it walks back from the greatest ledger.
1138 CREATE OR REPLACE FUNCTION delete_above (
1140 ) RETURNS void AS $$
1142 _max_seq bigint := max_ledger();
1143 _i bigint := _max_seq;
1145 IF _max_seq IS NULL THEN RETURN; END IF;
1147 IF _i <= _in_seq THEN RETURN; END IF;
1148 EXECUTE 'DELETE FROM ledgers WHERE ledger_seq = $1' USING _i;
1152 $$ LANGUAGE plpgsql;
1154 -- Verify correct ancestry of ledgers in database:
1155 -- Table to persist last-confirmed latest ledger with proper ancestry.
1156 CREATE TABLE IF NOT EXISTS ancestry_verified (
1157 ledger_seq bigint NOT NULL
1160 -- Function to verify ancestry of ledgers based on ledger_hash and prev_hash.
1161 -- Upon failure, returns ledger sequence failing ancestry check.
1162 -- Otherwise, returns NULL.
1163 -- _in_full: If TRUE, verify entire table. Else verify starting from
1164 -- value in ancestry_verfied table. If no value, then start
1165 -- from lowest ledger.
1166 -- _in_persist: If TRUE, persist the latest ledger with correct ancestry.
1167 -- If an exception was raised because of failure, persist
1168 -- the latest ledger prior to that which failed.
1169 -- _in_min: If set and _in_full is not true, the starting ledger from which
1171 -- _in_max: If set and _in_full is not true, the latest ledger to verify.
1172 CREATE OR REPLACE FUNCTION check_ancestry (
1173 _in_full bool = FALSE,
1174 _in_persist bool = TRUE,
1175 _in_min bigint = NULL,
1176 _in_max bigint = NULL
1177 ) RETURNS bigint AS $$
1181 _last_verified bigint;
1186 IF _in_full IS TRUE AND
1187 (_in_min IS NOT NULL) OR (_in_max IS NOT NULL) THEN
1188 RAISE 'Cannot specify manual range and do full check.';
1191 IF _in_min IS NOT NULL THEN
1193 ELSIF _in_full IS NOT TRUE THEN
1194 _last_verified := (SELECT ledger_seq FROM ancestry_verified);
1195 IF _last_verified IS NULL THEN
1196 _min := min_ledger();
1198 _min := _last_verified + 1;
1201 _min := min_ledger();
1203 EXECUTE 'SELECT * FROM ledgers WHERE ledger_seq = $1'
1204 INTO _parent USING _min - 1;
1205 IF _last_verified IS NOT NULL AND _parent IS NULL THEN
1206 RAISE 'Verified ledger % doesn''t exist.', _last_verified;
1209 IF _in_max IS NOT NULL THEN
1212 _max := max_ledger();
1215 OPEN _cursor FOR EXECUTE 'SELECT *
1217 WHERE ledger_seq BETWEEN $1 AND $2
1218 ORDER BY ledger_seq ASC'
1221 FETCH _cursor INTO _current;
1222 IF _current IS NULL THEN EXIT; END IF;
1223 IF _parent IS NOT NULL THEN
1224 IF _current.prev_hash != _parent.ledger_hash THEN
1226 RETURN _current.ledger_seq;
1227 RAISE 'Ledger ancestry failure current, parent:% %',
1231 _parent := _current;
1235 IF _in_persist IS TRUE AND _parent IS NOT NULL THEN
1236 DELETE FROM ancestry_verified;
1237 INSERT INTO ancestry_verified VALUES (_parent.ledger_seq);
1242 $$ LANGUAGE plpgsql;
1244 -- Return number of whole seconds since the latest ledger was inserted, based
1245 -- on ledger close time (not wall clock) of the insert.
1246 -- Note that ledgers.closing_time is number of seconds since the XRP
1247 -- epoch, which is 01/01/2000 00:00:00. This in turn is 946684800 seconds
1248 -- after the UNIX epoch. This conforms to the "age" field in the
1249 -- server_info RPC call.
1250 CREATE OR REPLACE FUNCTION age () RETURNS bigint AS $$
1252 RETURN (EXTRACT(EPOCH FROM (now())) -
1253 (946684800 + (SELECT closing_time
1255 ORDER BY ledger_seq DESC
1258 $$ LANGUAGE plpgsql;
1260 -- Return range of ledgers, or empty if none. This conforms to the
1261 -- "complete_ledgers" field of the server_info RPC call. Note
1262 -- that ledger gaps are prevented for reporting mode so the range
1263 -- is simply the set between the least and greatest ledgers.
1264 CREATE OR REPLACE FUNCTION complete_ledgers () RETURNS text AS $$
1266 _min bigint := min_ledger();
1267 _max bigint := max_ledger();
1269 IF _min IS NULL THEN RETURN 'empty'; END IF;
1270 IF _min = _max THEN RETURN _min; END IF;
1271 RETURN _min || '-' || _max;
1273 $$ LANGUAGE plpgsql;
1294 "There is no upgrade path from version 0. Instead, install "
1295 "from full_schemata."
1327 if (currentVersion != 0 && schemaVersion != currentVersion + 1)
1331 ss <<
"Schema upgrade versions past initial deployment must increase "
1332 "monotonically. Versions: current, target: "
1333 << currentVersion <<
", " << schemaVersion;
1334 Throw<std::runtime_error>(ss.
str());
1337 auto res = PgQuery(pool)({schema, {}});
1341 ss <<
"Error applying schema from version " << currentVersion <<
"to "
1342 << schemaVersion <<
": " << res.msg();
1343 Throw<std::runtime_error>(ss.
str());
1346 auto cmd = boost::format(R
"(SELECT set_schema_version(%u, 0))");
1347 res = PgQuery(pool)({boost::str(cmd % schemaVersion).c_str(), {}});
1351 ss <<
"Error setting schema version from " << currentVersion <<
" to "
1352 << schemaVersion <<
": " << res.msg();
1353 Throw<std::runtime_error>(ss.
str());
1361 auto res = PgQuery(pool)({version_query, {}});
1365 ss <<
"Error getting database schema version: " << res.msg();
1366 Throw<std::runtime_error>(ss.
str());
1372 if (currentSchemaVersion == LATEST_SCHEMA_VERSION)
1375 if (currentSchemaVersion == 0)
1380 pendingSchemaVersion ? pendingSchemaVersion : LATEST_SCHEMA_VERSION;
1384 auto cmd = boost::format(R
"(SELECT set_schema_version(0, %u))");
1385 res = PgQuery(pool)({boost::str(cmd % freshVersion).c_str(), {}});
1389 ss <<
"Error setting schema version from " << currentSchemaVersion
1390 <<
" to " << freshVersion <<
": " << res.msg();
1391 Throw<std::runtime_error>(ss.
str());
1397 full_schemata[freshVersion],
1398 currentSchemaVersion,
1400 currentSchemaVersion = freshVersion;
1404 for (; currentSchemaVersion < LATEST_SCHEMA_VERSION; ++currentSchemaVersion)
1408 upgrade_schemata[currentSchemaVersion],
1409 currentSchemaVersion,
1410 currentSchemaVersion + 1);