20 #ifndef RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
21 #define RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/reporting/ETLHelpers.h>
24 #include <ripple/protocol/STLedgerEntry.h>
25 #include <ripple/rpc/Context.h>
27 #include <boost/algorithm/string.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/core/string.hpp>
30 #include <boost/beast/websocket.hpp>
32 #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
33 #include <grpcpp/grpcpp.h>
57 boost::asio::io_context&
ioc_;
105 std::chrono::system_clock::time_point
139 if (sequence >= pair.first && sequence <= pair.second)
143 else if (sequence < pair.first)
162 boost::split(ranges,
range, boost::is_any_of(
","));
163 for (
auto& pair : ranges)
167 boost::split(minAndMax, pair, boost::is_any_of(
"-"));
169 if (minAndMax.
size() == 1)
176 assert(minAndMax.
size() == 2);
183 return left.first < right.first;
207 <<
"Closing websocket";
219 fetchLedger(uint32_t ledgerSequence,
bool getObjects =
true);
225 " , ip : " +
ip_ +
" , web socket port : " +
wsPort_ +
236 result[
"websocket_port"] =
wsPort_;
239 if (last.time_since_epoch().count() != 0)
240 result[
"last_message_arrival_time"] =
241 to_string(std::chrono::floor<std::chrono::microseconds>(last));
251 uint32_t ledgerSequence,
266 boost::beast::error_code ec,
267 boost::asio::ip::tcp::resolver::results_type results);
272 boost::beast::error_code ec,
273 boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
281 onWrite(boost::beast::error_code ec,
size_t size);
285 onRead(boost::beast::error_code ec,
size_t size);
295 close(
bool startAgain);
360 fetchLedger(uint32_t ledgerSequence,
bool getObjects);
383 if (src->isConnected())
402 ret.
append(src->toJson());
429 template <
class Func>
431 execute(Func f, uint32_t ledgerSequence);
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Json::Value toJson() const
std::vector< std::pair< uint32_t, uint32_t > > validatedLedgers_
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a randomly selected p2p node.
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
bool loadInitialLedger(uint32_t ledgerSequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Download a ledger in full.
@ arrayValue
array value (ordered list)
void onRead(boost::beast::error_code ec, size_t size)
Callback.
bool shouldPropagateStream(ETLSource *in) const
Determine whether messages received on the transactions_proposed stream should be forwarded to subscr...
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
std::chrono::system_clock::time_point getLastMsgTime() const
This datastructure is used to keep track of the sequence of the most recent ledger validated by the n...
ETLSource(std::string ip, std::string wsPort, ReportingETL &etl)
Create ETL source without gRPC endpoint Fetch ledger and load initial ledger will fail for this sourc...
void onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
Callback.
void stop()
Close the underlying websocket.
std::atomic_bool forwardingStream_
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Randomly select a p2p node to forward a gRPC request to.
std::string validatedLedgersRaw_
void onHandshake(boost::beast::error_code ec)
Callback.
boost::beast::flat_buffer readBuffer_
This class manages a connection to a single ETL source.
ETLLoadBalancer(ReportingETL &etl)
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects=true)
Fetch the specified ledger.
Value & append(const Value &value)
Append value to array at the end.
std::atomic_bool connected_
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > stub_
@ objectValue
object value (collection of name/value pairs).
bool hasLedger(uint32_t sequence) const
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
void onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
Callback.
std::chrono::system_clock::time_point lastMsgTime_
void reconnect(boost::beast::error_code ec)
Attempt to reconnect to the ETL source.
This class is used to manage connections to transaction processing processes This class spawns a list...
bool execute(Func f, uint32_t ledgerSequence)
f is a function that takes an ETLSource as an argument and returns a bool.
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
A generic endpoint for log messages.
ClosedInterval< T > range(T low, T high)
Create a closed range interval.
std::vector< std::unique_ptr< ETLSource > > sources_
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Get grpc stub to forward requests to p2p node.
Json::Value toJson() const
boost::asio::io_context & ioc_
std::string getValidatedRange() const
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
void start()
Begin sequence of operations to connect to the ETL source and subscribe to ledgers and transactions_p...
bool handleMessage()
Handle the most recently received message.
NetworkValidatedLedgers & networkValidatedLedgers_
void setValidatedRange(std::string const &range)
process the validated range received on the ledgers stream.
std::unique_ptr< boost::beast::websocket::stream< boost::beast::tcp_stream > > ws_
std::atomic_bool closing_
void onWrite(boost::beast::error_code ec, size_t size)
Callback.
std::string toString() const
boost::asio::ip::tcp::resolver resolver_
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
void close(bool startAgain)
Close the websocket.
std::mutex lastMsgTimeMtx_
boost::asio::steady_timer timer_
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a p2p node.
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.