20 #ifndef RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED
21 #define RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED
23 #include <ripple/app/main/Application.h>
24 #include <ripple/app/rdb/RelationalDatabase.h>
25 #include <ripple/app/reporting/ETLHelpers.h>
26 #include <ripple/app/reporting/ETLSource.h>
27 #include <ripple/core/JobQueue.h>
28 #include <ripple/net/InfoSub.h>
29 #include <ripple/protocol/ErrorCodes.h>
30 #include <ripple/resource/Charge.h>
31 #include <ripple/rpc/Context.h>
32 #include <ripple/rpc/GRPCHandlers.h>
33 #include <ripple/rpc/Role.h>
34 #include <ripple/rpc/impl/Handler.h>
35 #include <ripple/rpc/impl/RPCHelpers.h>
36 #include <ripple/rpc/impl/Tuning.h>
38 #include <boost/algorithm/string.hpp>
39 #include <boost/beast/core.hpp>
40 #include <boost/beast/core/string.hpp>
41 #include <boost/beast/websocket.hpp>
43 #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
44 #include <grpcpp/grpcpp.h>
225 org::xrpl::rpc::v1::GetLedgerResponse& data);
237 org::xrpl::rpc::v1::GetLedgerResponse& rawData);
251 publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts = 10);
317 if (last.time_since_epoch().count() != 0)
318 result[
"last_publish_time"] =
319 to_string(std::chrono::floor<std::chrono::microseconds>(
size_t flushInterval_
Used to determine when to write to the database during the initial ledger download.
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerData(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
std::shared_ptr< Ledger > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the datab...
std::optional< uint32_t > startSequence_
Ledger sequence to start ETL from.
std::mutex publishTimeMtx_
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
std::vector< AccountTransactionsData > insertTransactions(std::shared_ptr< Ledger > &ledger, org::xrpl::rpc::v1::GetLedgerResponse &data)
Insert all of the extracted transactions into the ledger.
void flushLedger(std::shared_ptr< Ledger > &ledger)
Write all new data to the key-value store.
ETLLoadBalancer loadBalancer_
Mechanism for communicating with ETL sources.
void stop()
Puts the datastructure in the stopped state Future calls to this datastructure will not block This op...
This datastructure is used to keep track of the sequence of the most recent ledger validated by the n...
Application & getApplication()
NetworkValidatedLedgers networkValidatedLedgers_
Mechanism for detecting when the network has validated a new ledger.
bool publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts=10)
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
@ objectValue
object value (collection of name/value pairs).
beast::Journal & getJournal()
std::atomic_bool writing_
Whether the process is writing to the database. Used by server_info.
bool readOnly_
Whether the process is in strict read-only mode.
size_t numMarkers_
This variable controls the number of GetLedgerData calls that will be executed in parallel during the...
virtual Config & config()=0
ETLLoadBalancer & getETLLoadBalancer()
std::chrono::time_point< std::chrono::system_clock > getLastPublish()
This class is used to manage connections to transaction processing processes This class spawns a list...
NetworkValidatedLedgers & getNetworkValidatedLedgers()
void monitorReadOnly()
Monitor the database for newly written ledgers.
boost::asio::io_context::strand publishStrand_
Strand to ensure that ledgers are published in order.
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
uint32_t getNumMarkers()
Get the number of markers to use during the initial ledger download.
A generic endpoint for log messages.
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerDataAndDiff(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
std::chrono::time_point< std::chrono::system_clock > lastPublish_
The time that the most recently published ledger was published.
Json::Value toJson() const
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::atomic_bool stopping_
Whether the software is stopping.
void start()
start all of the necessary components and begin ETL
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
bool reportingReadOnly() const
std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > buildNextLedger(std::shared_ptr< Ledger > &parent, org::xrpl::rpc::v1::GetLedgerResponse &rawData)
Build the next ledger using the previous ledger and the extracted data.
void monitor()
Monitor the network for newly validated ledgers.
ReportingETL(Application &app)
void consumeLedgerData(std::shared_ptr< Ledger > &ledger, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Consume data from a queue and insert that data into the ledger This function will continue to pull fr...
std::optional< uint32_t > runETLPipeline(uint32_t startSequence)
Run ETL.
Struct used to keep track of what to write to transactions and account_transactions tables in Postgre...