20 #include <ripple/app/rdb/backend/PostgresDatabase.h>
21 #include <ripple/app/reporting/ReportingETL.h>
23 #include <ripple/beast/core/CurrentThreadName.h>
24 #include <ripple/json/json_reader.h>
25 #include <ripple/json/json_writer.h>
26 #include <boost/asio/connect.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/websocket.hpp>
45 ss <<
"LedgerInfo { Sequence : " << info.
seq
60 while (!
stopping_ && (sle = writeQueue.pop()))
63 if (!ledger->exists(sle->key()))
64 ledger->rawInsert(sle);
78 org::xrpl::rpc::v1::GetLedgerResponse& data)
81 for (
auto& txn : data.transactions_list().transactions())
83 auto& raw = txn.transaction_blob();
88 auto txSerializer = std::make_shared<Serializer>(sttx.getSerializer());
91 sttx.getTransactionID(), ledger->info().seq, txn.metadata_blob()};
94 std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
98 <<
"Inserting transaction = " << sttx.getTransactionID();
99 uint256 nodestoreHash = ledger->rawTxInsertWithHash(
100 sttx.getTransactionID(), txSerializer, metaSerializer);
103 return accountTxData;
110 auto ledger = std::const_pointer_cast<Ledger>(
115 <<
"Database is not empty";
132 <<
"Deserialized ledger header. "
137 ledger->stateMap().clearSynching();
138 ledger->txMap().clearSynching();
140 #ifdef RIPPLED_REPORTING
148 std::thread asyncWriter{[
this, &ledger, &writeQueue]() {
160 writeQueue.
push(
null);
169 #ifdef RIPPLED_REPORTING
171 ->writeLedgerAndTransactions(ledger->info(), accountTxData);
176 JLOG(
journal_.
debug()) <<
"Time to download and store ledger = "
177 << ((end -
start).count()) / 1000000000.0;
185 <<
"Flushing ledger. "
188 auto& accountHash = ledger->info().accountHash;
189 auto& txHash = ledger->info().txHash;
190 auto& ledgerHash = ledger->info().hash;
195 ledger->setImmutable(
false);
205 addRaw(ledger->info(), s);
218 <<
"Flushed " << numFlushed
219 <<
" nodes to nodestore from stateMap";
221 <<
"Flushed " << numTxFlushed
222 <<
" nodes to nodestore from txMap";
226 << (end -
start).count() / 1000000000.0
232 <<
"Flushed 0 nodes from state map";
235 if (numTxFlushed == 0)
238 <<
"Flushed 0 nodes from tx map";
242 if (ledger->stateMap().getHash().as_uint256() != accountHash)
246 <<
"State map hash does not match. "
247 <<
"Expected hash = " <<
strHex(accountHash) <<
"Actual hash = "
248 <<
strHex(ledger->stateMap().getHash().as_uint256());
249 Throw<std::runtime_error>(
"state map hash mismatch");
252 if (ledger->txMap().getHash().as_uint256() != txHash)
256 <<
"Tx map hash does not match. "
257 <<
"Expected hash = " <<
strHex(txHash) <<
"Actual hash = "
258 <<
strHex(ledger->txMap().getHash().as_uint256());
259 Throw<std::runtime_error>(
"tx map hash mismatch");
262 if (ledger->info().hash != ledgerHash)
266 <<
"Ledger hash does not match. "
267 <<
"Expected hash = " <<
strHex(ledgerHash)
268 <<
"Actual hash = " <<
strHex(ledger->info().hash);
269 Throw<std::runtime_error>(
"ledger hash mismatch");
273 <<
"Successfully flushed ledger! "
289 <<
"Attempting to publish ledger = "
291 size_t numAttempts = 0;
300 <<
"Trying to publish. Could not find ledger with sequence = "
311 if (numAttempts >= maxAttempts)
314 <<
"Failed to publish ledger after "
315 << numAttempts <<
" attempts.";
319 <<
"Attempting to become ETL writer";
326 <<
"In strict read-only mode. "
327 <<
"Skipping publishing this ledger. "
328 <<
"Beginning fast forward.";
356 <<
"Attempting to fetch ledger with sequence = "
362 <<
"GetLedger reply = " << response->DebugString();
370 <<
"Attempting to fetch ledger with sequence = "
376 <<
"GetLedger reply = " << response->DebugString();
383 org::xrpl::rpc::v1::GetLedgerResponse& rawData)
386 <<
"Beginning ledger update";
392 <<
"Deserialized ledger header. "
395 next->setLedgerInfo(lgrInfo);
397 next->stateMap().clearSynching();
398 next->txMap().clearSynching();
405 <<
"Inserted all transactions. Number of transactions = "
406 << rawData.transactions_list().transactions_size();
408 for (
auto& obj : rawData.ledger_objects().objects())
414 auto& data = obj.data();
417 if (data.size() == 0)
420 <<
"Erasing object = " << *key;
421 if (next->exists(*key))
422 next->rawErase(*key);
429 if (next->exists(*key))
432 <<
"Replacing object = " << *key;
433 next->rawReplace(sle);
438 <<
"Inserting object = " << *key;
439 next->rawInsert(sle);
445 <<
"Inserted/modified/deleted all objects. Number of objects = "
446 << rawData.ledger_objects().objects_size();
448 if (!rawData.skiplist_included())
450 next->updateSkipList();
453 <<
"tx process is not sending skiplist. This indicates that the tx "
454 "process is parsing metadata instead of doing a SHAMap diff. "
455 "Make sure tx process is running the same code as reporting to "
456 "use SHAMap diff instead of parsing metadata";
460 <<
"Finished ledger update. "
462 return {std::move(next), std::move(accountTxData)};
491 <<
"Starting etl pipeline";
499 Throw<std::runtime_error>(
"runETLPipeline: parent ledger is null");
504 constexpr uint32_t maxQueueSize = 1000;
507 transformQueue{maxQueueSize};
514 uint32_t currentSequence = startSequence;
542 auto time = ((end -
start).count()) / 1000000000.0;
544 fetchResponse->transactions_list().transactions_size() / time;
547 <<
" . Extract phase tps = " << tps;
549 transformQueue.push(std::move(fetchResponse));
553 transformQueue.push({});
559 loadQueue{maxQueueSize};
569 while (!writeConflict)
572 transformQueue.pop()};
583 auto [next, accountTxData] =
587 auto duration = ((end -
start).count()) / 1000000000.0;
601 &lastPublishedSequence,
605 size_t totalTransactions = 0;
606 double totalTime = 0;
607 while (!writeConflict)
612 result{loadQueue.pop()};
620 auto& ledger = result->first;
621 auto& accountTxData = result->second;
631 #ifdef RIPPLED_REPORTING
634 ledger->info(), accountTxData))
635 writeConflict =
true;
642 lastPublishedSequence = ledger->info().seq;
645 auto kvTime = ((mid -
start).count()) / 1000000000.0;
646 auto relationalTime = ((end - mid).count()) / 1000000000.0;
648 size_t numTxns = accountTxData.size();
650 totalTransactions += numTxns;
652 <<
"Load phase of etl : "
653 <<
"Successfully published ledger! Ledger info: "
655 <<
". txn count = " << numTxns
656 <<
". key-value write time = " << kvTime
657 <<
". relational write time = " << relationalTime
658 <<
". key-value tps = " << numTxns / kvTime
659 <<
". relational tps = " << numTxns / relationalTime
660 <<
". total key-value tps = " << totalTransactions / totalTime;
671 <<
"Stopping etl pipeline";
673 return lastPublishedSequence;
689 auto ledger = std::const_pointer_cast<Ledger>(
694 <<
"Database is empty. Will download a ledger "
700 <<
"ledger sequence specified in config. "
701 <<
"Will begin ETL process starting with ledger "
709 <<
"Waiting for next ledger to be validated by network...";
712 if (mostRecentValidated)
715 <<
"Ledger " << *mostRecentValidated
716 <<
" has been validated. "
723 <<
"The wait for the next validated "
724 <<
"ledger has been aborted. "
725 <<
"Exiting monitor loop";
734 Throw<std::runtime_error>(
735 "start sequence specified but db is already populated");
739 <<
"Database already populated. Picking up from the tip of history";
745 <<
"Failed to load initial ledger. Exiting monitor loop";
752 uint32_t nextSequence = ledger->info().seq + 1;
755 <<
"Database is populated. "
756 <<
"Starting monitor loop. sequence = "
762 <<
"Ledger with sequence = " << nextSequence
763 <<
" has been validated by the network. "
764 <<
"Attempting to find in database and publish";
781 constexpr
size_t timeoutSeconds = 10;
787 <<
"Failed to publish ledger with sequence = " << nextSequence
788 <<
" . Beginning ETL";
794 <<
"Aborting ETL. Falling back to publishing";
797 nextSequence = *lastPublished + 1;
809 JLOG(
journal_.
debug()) <<
"Starting reporting in strict read only mode";
814 uint32_t sequence = *mostRecent;
838 , journal_(app.journal(
"ReportingETL"))
839 , publishStrand_(app_.getIOService())
840 , loadBalancer_(*this)
845 #ifndef RIPPLED_REPORTING
846 Throw<std::runtime_error>(
847 "Config file specifies reporting, but software was not built with "
848 "-Dreporting=1. To use reporting, configure CMake with "
852 Throw<std::runtime_error>(
853 "Reporting requires tx tables. Set use_tx_tables=1 in config "
854 "file, under [ledger_tx_tables] section");
859 auto& vals = section.
values();
865 auto optIp = source.
get(
"source_ip");
869 auto optWsPort = source.
get(
"source_ws_port");
873 auto optGrpcPort = source.
get(
"source_grpc_port");
894 auto const optRO = section.
get(
"read_only");
897 readOnly_ = (*optRO ==
"true" || *optRO ==
"1");
903 auto asciiToIntThrows =
904 [](
auto& dest,
std::string const& src,
char const* onError) {
905 char const*
const srcEnd = src.data() + src.size();
910 while (ptr != srcEnd &&
911 std::isspace(
static_cast<unsigned char>(*ptr)))
918 Throw<std::runtime_error>(onError + src);
927 "Expected integral START_LEDGER command line argument. Got: ");
932 auto const optStartSeq = section.
get(
"start_sequence");
940 "Expected integral start_sequence config entry. Got: ");
944 auto const optFlushInterval = section.
get(
"flush_interval");
945 if (optFlushInterval)
949 "Expected integral flush_interval config entry. Got: ");
951 auto const optNumMarkers = section.
get(
"num_markers");
956 "Expected integral num_markers config entry. Got: ");