rippled
Public Member Functions | Private Member Functions | Private Attributes | List of all members
ripple::ReportingETL Class Reference

This class is responsible for continuously extracting data from a p2p node, and writing that data to the databases. More...

Collaboration diagram for ripple::ReportingETL:
Collaboration graph
[legend]

Public Member Functions

 ReportingETL (Application &app)
 
 ~ReportingETL ()
 
NetworkValidatedLedgersgetNetworkValidatedLedgers ()
 
bool isStopping () const
 
uint32_t getNumMarkers ()
 Get the number of markers to use during the initial ledger download. More...
 
ApplicationgetApplication ()
 
beast::JournalgetJournal ()
 
Json::Value getInfo ()
 
void start ()
 start all of the necessary components and begin ETL More...
 
void stop ()
 
ETLLoadBalancergetETLLoadBalancer ()
 

Private Member Functions

std::chrono::time_point< std::chrono::system_clockgetLastPublish ()
 
void setLastPublish ()
 
std::shared_ptr< LedgerloadInitialLedger (uint32_t sequence)
 Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the databases. More...
 
std::optional< uint32_t > runETLPipeline (uint32_t startSequence)
 Run ETL. More...
 
void monitor ()
 Monitor the network for newly validated ledgers. More...
 
void monitorReadOnly ()
 Monitor the database for newly written ledgers. More...
 
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerData (uint32_t sequence)
 Extract data for a particular ledger from an ETL source. More...
 
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerDataAndDiff (uint32_t sequence)
 Extract data for a particular ledger from an ETL source. More...
 
std::vector< AccountTransactionsDatainsertTransactions (std::shared_ptr< Ledger > &ledger, org::xrpl::rpc::v1::GetLedgerResponse &data)
 Insert all of the extracted transactions into the ledger. More...
 
std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > buildNextLedger (std::shared_ptr< Ledger > &parent, org::xrpl::rpc::v1::GetLedgerResponse &rawData)
 Build the next ledger using the previous ledger and the extracted data. More...
 
void flushLedger (std::shared_ptr< Ledger > &ledger)
 Write all new data to the key-value store. More...
 
bool publishLedger (uint32_t ledgerSequence, uint32_t maxAttempts=10)
 Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers stream. More...
 
void publishLedger (std::shared_ptr< Ledger > &ledger)
 Publish the passed in ledger. More...
 
void consumeLedgerData (std::shared_ptr< Ledger > &ledger, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
 Consume data from a queue and insert that data into the ledger This function will continue to pull from the queue until the queue returns nullptr. More...
 
void doWork ()
 

Private Attributes

Applicationapp_
 
beast::Journal journal_
 
std::thread worker_
 
boost::asio::io_context::strand publishStrand_
 Strand to ensure that ledgers are published in order. More...
 
ETLLoadBalancer loadBalancer_
 Mechanism for communicating with ETL sources. More...
 
NetworkValidatedLedgers networkValidatedLedgers_
 Mechanism for detecting when the network has validated a new ledger. More...
 
std::atomic_bool stopping_ = false
 Whether the software is stopping. More...
 
size_t flushInterval_ = 0
 Used to determine when to write to the database during the initial ledger download. More...
 
size_t numMarkers_ = 2
 This variable controls the number of GetLedgerData calls that will be executed in parallel during the initial ledger download. More...
 
bool readOnly_ = false
 Whether the process is in strict read-only mode. More...
 
std::atomic_bool writing_ = false
 Whether the process is writing to the database. Used by server_info. More...
 
std::optional< uint32_t > startSequence_
 Ledger sequence to start ETL from. More...
 
std::chrono::time_point< std::chrono::system_clocklastPublish_
 The time that the most recently published ledger was published. More...
 
std::mutex publishTimeMtx_
 

Detailed Description

This class is responsible for continuously extracting data from a p2p node, and writing that data to the databases.

Usually, multiple different processes share access to the same network accessible databases, in which case only one such process is performing ETL and writing to the database. The other processes simply monitor the database for new ledgers, and publish those ledgers to the various subscription streams. If a monitoring process determines that the ETL writer has failed (no new ledgers written for some time), the process will attempt to become the ETL writer. If there are multiple monitoring processes that try to become the ETL writer at the same time, one will win out, and the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring to writing and from writing to monitoring, based on the activity of other processes running on different machines.

Definition at line 70 of file ReportingETL.h.

Constructor & Destructor Documentation

◆ ReportingETL()

ripple::ReportingETL::ReportingETL ( Application app)
explicit

Definition at line 836 of file ReportingETL.cpp.

◆ ~ReportingETL()

ripple::ReportingETL::~ReportingETL ( )

Definition at line 271 of file ReportingETL.h.

Member Function Documentation

◆ getLastPublish()

std::chrono::time_point<std::chrono::system_clock> ripple::ReportingETL::getLastPublish ( )
private

Definition at line 148 of file ReportingETL.h.

◆ setLastPublish()

void ripple::ReportingETL::setLastPublish ( )
private

Definition at line 155 of file ReportingETL.h.

◆ loadInitialLedger()

std::shared_ptr< Ledger > ripple::ReportingETL::loadInitialLedger ( uint32_t  sequence)
private

Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the databases.

This takes several minutes or longer.

Parameters
sequencethe sequence of the ledger to download
Returns
The ledger downloaded, with a full transaction and account state map

Definition at line 107 of file ReportingETL.cpp.

◆ runETLPipeline()

std::optional< uint32_t > ripple::ReportingETL::runETLPipeline ( uint32_t  startSequence)
private

Run ETL.

Extracts ledgers and writes them to the database, until a write conflict occurs (or the server shuts down).

Note
database must already be populated when this function is called
Parameters
startSequencethe first ledger to extract
Returns
the last ledger written to the database, if any

Definition at line 467 of file ReportingETL.cpp.

◆ monitor()

void ripple::ReportingETL::monitor ( )
private

Monitor the network for newly validated ledgers.

Also monitor the database to see if any process is writing those ledgers. This function is called when the application starts, and will only return when the application is shutting down. If the software detects the database is empty, this function will call loadInitialLedger(). If the software detects ledgers are not being written, this function calls runETLPipeline(). Otherwise, this function publishes ledgers as they are written to the database.

Definition at line 687 of file ReportingETL.cpp.

◆ monitorReadOnly()

void ripple::ReportingETL::monitorReadOnly ( )
private

Monitor the database for newly written ledgers.

Similar to the monitor(), except this function will never call runETLPipeline() or loadInitialLedger(). This function only publishes ledgers as they are written to the database.

Definition at line 807 of file ReportingETL.cpp.

◆ fetchLedgerData()

std::optional< org::xrpl::rpc::v1::GetLedgerResponse > ripple::ReportingETL::fetchLedgerData ( uint32_t  sequence)
private

Extract data for a particular ledger from an ETL source.

This function continously tries to extract the specified ledger (using all available ETL sources) until the extraction succeeds, or the server shuts down.

Parameters
sequencesequence of the ledger to extract
Returns
ledger header and transaction+metadata blobs. Empty optional if the server is shutting down

Definition at line 353 of file ReportingETL.cpp.

◆ fetchLedgerDataAndDiff()

std::optional< org::xrpl::rpc::v1::GetLedgerResponse > ripple::ReportingETL::fetchLedgerDataAndDiff ( uint32_t  sequence)
private

Extract data for a particular ledger from an ETL source.

This function continously tries to extract the specified ledger (using all available ETL sources) until the extraction succeeds, or the server shuts down.

Parameters
sequencesequence of the ledger to extract
Returns
ledger header, transaction+metadata blobs, and all ledger objects created, modified or deleted between this ledger and the parent. Empty optional if the server is shutting down

Definition at line 367 of file ReportingETL.cpp.

◆ insertTransactions()

std::vector< AccountTransactionsData > ripple::ReportingETL::insertTransactions ( std::shared_ptr< Ledger > &  ledger,
org::xrpl::rpc::v1::GetLedgerResponse &  data 
)
private

Insert all of the extracted transactions into the ledger.

Parameters
ledgerledger to insert transactions into
datadata extracted from an ETL source
Returns
struct that contains the neccessary info to write to the transctions and account_transactions tables in Postgres (mostly transaction hashes, corresponding nodestore hashes and affected accounts)

Definition at line 76 of file ReportingETL.cpp.

◆ buildNextLedger()

std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > ripple::ReportingETL::buildNextLedger ( std::shared_ptr< Ledger > &  parent,
org::xrpl::rpc::v1::GetLedgerResponse &  rawData 
)
private

Build the next ledger using the previous ledger and the extracted data.

This function calls insertTransactions()

Note
rawData should be data that corresponds to the ledger immediately following parent
Parameters
parentthe previous ledger
rawDatadata extracted from an ETL source
Returns
the newly built ledger and data to write to Postgres

Definition at line 381 of file ReportingETL.cpp.

◆ flushLedger()

void ripple::ReportingETL::flushLedger ( std::shared_ptr< Ledger > &  ledger)
private

Write all new data to the key-value store.

Parameters
ledgerledger with new data to write

Definition at line 182 of file ReportingETL.cpp.

◆ publishLedger() [1/2]

bool ripple::ReportingETL::publishLedger ( uint32_t  ledgerSequence,
uint32_t  maxAttempts = 10 
)
private

Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers stream.

Parameters
ledgerSequencethe sequence of the ledger to publish
maxAttemptsthe number of times to attempt to read the ledger from the database. 1 attempt per second
Returns
whether the ledger was found in the database and published

Definition at line 286 of file ReportingETL.cpp.

◆ publishLedger() [2/2]

void ripple::ReportingETL::publishLedger ( std::shared_ptr< Ledger > &  ledger)
private

Publish the passed in ledger.

Parameters
ledgerthe ledger to publish

Definition at line 278 of file ReportingETL.cpp.

◆ consumeLedgerData()

void ripple::ReportingETL::consumeLedgerData ( std::shared_ptr< Ledger > &  ledger,
ThreadSafeQueue< std::shared_ptr< SLE >> &  writeQueue 
)
private

Consume data from a queue and insert that data into the ledger This function will continue to pull from the queue until the queue returns nullptr.

This is used during the initial ledger download

Parameters
ledgerthe ledger to insert data into
writeQueuethe queue with extracted data

Definition at line 54 of file ReportingETL.cpp.

◆ getNetworkValidatedLedgers()

NetworkValidatedLedgers& ripple::ReportingETL::getNetworkValidatedLedgers ( )

Definition at line 276 of file ReportingETL.h.

◆ isStopping()

bool ripple::ReportingETL::isStopping ( ) const

Definition at line 282 of file ReportingETL.h.

◆ getNumMarkers()

uint32_t ripple::ReportingETL::getNumMarkers ( )

Get the number of markers to use during the initial ledger download.

This is equivelent to the degree of parallelism during the initial ledger download

Returns
the number of markers

Definition at line 292 of file ReportingETL.h.

◆ getApplication()

Application& ripple::ReportingETL::getApplication ( )

Definition at line 298 of file ReportingETL.h.

◆ getJournal()

beast::Journal& ripple::ReportingETL::getJournal ( )

Definition at line 304 of file ReportingETL.h.

◆ getInfo()

Json::Value ripple::ReportingETL::getInfo ( )

Definition at line 310 of file ReportingETL.h.

◆ start()

void ripple::ReportingETL::start ( )

start all of the necessary components and begin ETL

Definition at line 326 of file ReportingETL.h.

◆ stop()

void ripple::ReportingETL::stop ( )

Definition at line 340 of file ReportingETL.h.

◆ getETLLoadBalancer()

ETLLoadBalancer& ripple::ReportingETL::getETLLoadBalancer ( )

Definition at line 356 of file ReportingETL.h.

◆ doWork()

void ripple::ReportingETL::doWork ( )
private

Definition at line 825 of file ReportingETL.cpp.

Member Data Documentation

◆ app_

Application& ripple::ReportingETL::app_
private

Definition at line 73 of file ReportingETL.h.

◆ journal_

beast::Journal ripple::ReportingETL::journal_
private

Definition at line 75 of file ReportingETL.h.

◆ worker_

std::thread ripple::ReportingETL::worker_
private

Definition at line 77 of file ReportingETL.h.

◆ publishStrand_

boost::asio::io_context::strand ripple::ReportingETL::publishStrand_
private

Strand to ensure that ledgers are published in order.

If ETL is started far behind the network, ledgers will be written and published very rapidly. Monitoring processes will publish ledgers as they are written. However, to publish a ledger, the monitoring process needs to read all of the transactions for that ledger from the database. Reading the transactions from the database requires network calls, which can be slow. It is imperative however that the monitoring processes keep up with the writer, else the monitoring processes will not be able to detect if the writer failed. Therefore, publishing each ledger (which includes reading all of the transactions from the database) is done from the application wide asio io_service, and a strand is used to ensure ledgers are published in order

Definition at line 91 of file ReportingETL.h.

◆ loadBalancer_

ETLLoadBalancer ripple::ReportingETL::loadBalancer_
private

Mechanism for communicating with ETL sources.

ETLLoadBalancer wraps an arbitrary number of ETL sources and load balances ETL requests across those sources.

Definition at line 96 of file ReportingETL.h.

◆ networkValidatedLedgers_

NetworkValidatedLedgers ripple::ReportingETL::networkValidatedLedgers_
private

Mechanism for detecting when the network has validated a new ledger.

This class provides a way to wait for a specific ledger to be validated

Definition at line 100 of file ReportingETL.h.

◆ stopping_

std::atomic_bool ripple::ReportingETL::stopping_ = false
private

Whether the software is stopping.

Definition at line 103 of file ReportingETL.h.

◆ flushInterval_

size_t ripple::ReportingETL::flushInterval_ = 0
private

Used to determine when to write to the database during the initial ledger download.

By default, the software downloads an entire ledger and then writes to the database. If flushInterval_ is non-zero, the software will write to the database as new ledger data (SHAMap leaf nodes) arrives. It is not neccesarily more effient to write the data as it arrives, as different SHAMap leaf nodes share the same SHAMap inner nodes; flushing prematurely can result in the same SHAMap inner node being written to the database more than once. It is recommended to use the default value of 0 for this variable; however, different values can be experimented with if better performance is desired.

Definition at line 115 of file ReportingETL.h.

◆ numMarkers_

size_t ripple::ReportingETL::numMarkers_ = 2
private

This variable controls the number of GetLedgerData calls that will be executed in parallel during the initial ledger download.

GetLedgerData allows clients to page through a ledger over many RPC calls. GetLedgerData returns a marker that is used as an offset in a subsequent call. If numMarkers_ is greater than 1, there will be multiple chains of GetLedgerData calls iterating over different parts of the same ledger in parallel. This can dramatically speed up the time to download the initial ledger. However, a higher value for this member variable puts more load on the ETL source.

Definition at line 126 of file ReportingETL.h.

◆ readOnly_

bool ripple::ReportingETL::readOnly_ = false
private

Whether the process is in strict read-only mode.

In strict read-only mode, the process will never attempt to become the ETL writer, and will only publish ledgers as they are written to the database.

Definition at line 131 of file ReportingETL.h.

◆ writing_

std::atomic_bool ripple::ReportingETL::writing_ = false
private

Whether the process is writing to the database. Used by server_info.

Definition at line 134 of file ReportingETL.h.

◆ startSequence_

std::optional<uint32_t> ripple::ReportingETL::startSequence_
private

Ledger sequence to start ETL from.

If this is empty, ETL will start from the next ledger validated by the network. If this is set, and the database is already populated, an error is thrown.

Definition at line 139 of file ReportingETL.h.

◆ lastPublish_

std::chrono::time_point<std::chrono::system_clock> ripple::ReportingETL::lastPublish_
private

The time that the most recently published ledger was published.

Used by server_info

Definition at line 143 of file ReportingETL.h.

◆ publishTimeMtx_

std::mutex ripple::ReportingETL::publishTimeMtx_
private

Definition at line 145 of file ReportingETL.h.