rippled
ReportingETL.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED
21 #define RIPPLE_APP_REPORTING_REPORTINGETL_H_INCLUDED
22 
23 #include <ripple/app/main/Application.h>
24 #include <ripple/app/rdb/RelationalDatabase.h>
25 #include <ripple/app/reporting/ETLHelpers.h>
26 #include <ripple/app/reporting/ETLSource.h>
27 #include <ripple/core/JobQueue.h>
28 #include <ripple/net/InfoSub.h>
29 #include <ripple/protocol/ErrorCodes.h>
30 #include <ripple/resource/Charge.h>
31 #include <ripple/rpc/Context.h>
32 #include <ripple/rpc/GRPCHandlers.h>
33 #include <ripple/rpc/Role.h>
34 #include <ripple/rpc/impl/Handler.h>
35 #include <ripple/rpc/impl/RPCHelpers.h>
36 #include <ripple/rpc/impl/Tuning.h>
37 
38 #include <boost/algorithm/string.hpp>
39 #include <boost/beast/core.hpp>
40 #include <boost/beast/core/string.hpp>
41 #include <boost/beast/websocket.hpp>
42 
43 #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
44 #include <grpcpp/grpcpp.h>
45 
46 #include <condition_variable>
47 #include <mutex>
48 #include <queue>
49 
50 #include <chrono>
51 namespace ripple {
52 
54 
71 {
72 private:
74 
76 
78 
91  boost::asio::io_context::strand publishStrand_;
92 
97 
101 
104 
115  size_t flushInterval_ = 0;
116 
126  size_t numMarkers_ = 2;
127 
131  bool readOnly_ = false;
132 
135 
140 
144 
146 
149  {
151  return lastPublish_;
152  }
153 
154  void
156  {
159  }
160 
168  loadInitialLedger(uint32_t sequence);
169 
176  runETLPipeline(uint32_t startSequence);
177 
186  void
187  monitor();
188 
193  void
194  monitorReadOnly();
195 
203  fetchLedgerData(uint32_t sequence);
204 
213  fetchLedgerDataAndDiff(uint32_t sequence);
214 
224  std::shared_ptr<Ledger>& ledger,
225  org::xrpl::rpc::v1::GetLedgerResponse& data);
226 
236  std::shared_ptr<Ledger>& parent,
237  org::xrpl::rpc::v1::GetLedgerResponse& rawData);
238 
241  void
243 
250  bool
251  publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts = 10);
252 
255  void
257 
263  void
265  std::shared_ptr<Ledger>& ledger,
267 
268 public:
269  explicit ReportingETL(Application& app);
270 
272  {
273  }
274 
277  {
279  }
280 
281  bool
282  isStopping() const
283  {
284  return stopping_;
285  }
286 
291  uint32_t
293  {
294  return numMarkers_;
295  }
296 
297  Application&
299  {
300  return app_;
301  }
302 
305  {
306  return journal_;
307  }
308 
311  {
313 
314  result["etl_sources"] = loadBalancer_.toJson();
315  result["is_writer"] = writing_.load();
316  auto last = getLastPublish();
317  if (last.time_since_epoch().count() != 0)
318  result["last_publish_time"] =
319  to_string(std::chrono::floor<std::chrono::microseconds>(
320  getLastPublish()));
321  return result;
322  }
323 
325  void
327  {
328  JLOG(journal_.info()) << "Starting reporting etl";
329  assert(app_.config().reporting());
330  assert(app_.config().standalone());
331  assert(app_.config().reportingReadOnly() == readOnly_);
332 
333  stopping_ = false;
334 
336  doWork();
337  }
338 
339  void
341  {
342  JLOG(journal_.info()) << "onStop called";
343  JLOG(journal_.debug()) << "Stopping Reporting ETL";
344  stopping_ = true;
347 
348  JLOG(journal_.debug()) << "Stopped loadBalancer";
349  if (worker_.joinable())
350  worker_.join();
351 
352  JLOG(journal_.debug()) << "Joined worker thread";
353  }
354 
357  {
358  return loadBalancer_;
359  }
360 
361 private:
362  void
363  doWork();
364 };
365 
366 } // namespace ripple
367 #endif
ripple::ReportingETL::flushInterval_
size_t flushInterval_
Used to determine when to write to the database during the initial ledger download.
Definition: ReportingETL.h:115
ripple::Application
Definition: Application.h:115
ripple::ReportingETL::fetchLedgerData
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerData(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:353
ripple::ReportingETL::loadInitialLedger
std::shared_ptr< Ledger > loadInitialLedger(uint32_t sequence)
Download a ledger with specified sequence in full, via GetLedgerData, and write the data to the datab...
Definition: ReportingETL.cpp:107
ripple::ReportingETL::startSequence_
std::optional< uint32_t > startSequence_
Ledger sequence to start ETL from.
Definition: ReportingETL.h:139
std::shared_ptr< Ledger >
ripple::ReportingETL::publishTimeMtx_
std::mutex publishTimeMtx_
Definition: ReportingETL.h:145
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
ripple::ETLLoadBalancer::start
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
Definition: ETLSource.cpp:969
std::pair
ripple::ReportingETL::setLastPublish
void setLastPublish()
Definition: ReportingETL.h:155
ripple::ReportingETL::insertTransactions
std::vector< AccountTransactionsData > insertTransactions(std::shared_ptr< Ledger > &ledger, org::xrpl::rpc::v1::GetLedgerResponse &data)
Insert all of the extracted transactions into the ledger.
Definition: ReportingETL.cpp:76
ripple::ReportingETL::flushLedger
void flushLedger(std::shared_ptr< Ledger > &ledger)
Write all new data to the key-value store.
Definition: ReportingETL.cpp:182
std::vector
STL class.
ripple::ReportingETL::getInfo
Json::Value getInfo()
Definition: ReportingETL.h:310
ripple::ReportingETL::loadBalancer_
ETLLoadBalancer loadBalancer_
Mechanism for communicating with ETL sources.
Definition: ReportingETL.h:96
ripple::NetworkValidatedLedgers::stop
void stop()
Puts the datastructure in the stopped state Future calls to this datastructure will not block This op...
Definition: ETLHelpers.h:102
ripple::NetworkValidatedLedgers
This datastructure is used to keep track of the sequence of the most recent ledger validated by the n...
Definition: ETLHelpers.h:39
ripple::ReportingETL::getApplication
Application & getApplication()
Definition: ReportingETL.h:298
queue
ripple::ReportingETL::networkValidatedLedgers_
NetworkValidatedLedgers networkValidatedLedgers_
Mechanism for detecting when the network has validated a new ledger.
Definition: ReportingETL.h:100
ripple::ReportingETL::stop
void stop()
Definition: ReportingETL.h:340
std::thread::joinable
T joinable(T... args)
ripple::ReportingETL::publishLedger
bool publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts=10)
Attempt to read the specified ledger from the database, and then publish that ledger to the ledgers s...
Definition: ReportingETL.cpp:286
ripple::ReportingETL::journal_
beast::Journal journal_
Definition: ReportingETL.h:75
ripple::Config::reporting
bool reporting() const
Definition: Config.h:337
std::thread
STL class.
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::ReportingETL::getJournal
beast::Journal & getJournal()
Definition: ReportingETL.h:304
std::atomic_bool::load
T load(T... args)
ripple::ReportingETL::writing_
std::atomic_bool writing_
Whether the process is writing to the database. Used by server_info.
Definition: ReportingETL.h:134
ripple::ETLLoadBalancer::stop
void stop()
Definition: ETLSource.cpp:976
chrono
ripple::ReportingETL::readOnly_
bool readOnly_
Whether the process is in strict read-only mode.
Definition: ReportingETL.h:131
ripple::ReportingETL::numMarkers_
size_t numMarkers_
This variable controls the number of GetLedgerData calls that will be executed in parallel during the...
Definition: ReportingETL.h:126
ripple::Application::config
virtual Config & config()=0
ripple::ReportingETL::getETLLoadBalancer
ETLLoadBalancer & getETLLoadBalancer()
Definition: ReportingETL.h:356
ripple::ReportingETL::getLastPublish
std::chrono::time_point< std::chrono::system_clock > getLastPublish()
Definition: ReportingETL.h:148
ripple::Config::standalone
bool standalone() const
Definition: Config.h:332
std::unique_lock
STL class.
ripple::ETLLoadBalancer
This class is used to manage connections to transaction processing processes This class spawns a list...
Definition: ETLSource.h:315
ripple::ReportingETL::getNetworkValidatedLedgers
NetworkValidatedLedgers & getNetworkValidatedLedgers()
Definition: ReportingETL.h:276
ripple::ReportingETL::monitorReadOnly
void monitorReadOnly()
Monitor the database for newly written ledgers.
Definition: ReportingETL.cpp:807
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::chrono::time_point< std::chrono::system_clock >
ripple::ReportingETL::publishStrand_
boost::asio::io_context::strand publishStrand_
Strand to ensure that ledgers are published in order.
Definition: ReportingETL.h:91
ripple::ReportingETL
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition: ReportingETL.h:70
ripple::ReportingETL::getNumMarkers
uint32_t getNumMarkers()
Get the number of markers to use during the initial ledger download.
Definition: ReportingETL.h:292
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::atomic_bool
ripple::ReportingETL::~ReportingETL
~ReportingETL()
Definition: ReportingETL.h:271
ripple::ReportingETL::fetchLedgerDataAndDiff
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedgerDataAndDiff(uint32_t sequence)
Extract data for a particular ledger from an ETL source.
Definition: ReportingETL.cpp:367
ripple::ReportingETL::lastPublish_
std::chrono::time_point< std::chrono::system_clock > lastPublish_
The time that the most recently published ledger was published.
Definition: ReportingETL.h:143
ripple::ETLLoadBalancer::toJson
Json::Value toJson() const
Definition: ETLSource.h:397
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ReportingETL::stopping_
std::atomic_bool stopping_
Whether the software is stopping.
Definition: ReportingETL.h:103
condition_variable
ripple::ReportingETL::app_
Application & app_
Definition: ReportingETL.h:73
std::optional< uint32_t >
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::ReportingETL::start
void start()
start all of the necessary components and begin ETL
Definition: ReportingETL.h:326
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::ReportingETL::worker_
std::thread worker_
Definition: ReportingETL.h:77
ripple::Config::reportingReadOnly
bool reportingReadOnly() const
Definition: Config.h:349
ripple::ReportingETL::buildNextLedger
std::pair< std::shared_ptr< Ledger >, std::vector< AccountTransactionsData > > buildNextLedger(std::shared_ptr< Ledger > &parent, org::xrpl::rpc::v1::GetLedgerResponse &rawData)
Build the next ledger using the previous ledger and the extracted data.
Definition: ReportingETL.cpp:381
ripple::ReportingETL::monitor
void monitor()
Monitor the network for newly validated ledgers.
Definition: ReportingETL.cpp:687
ripple::ReportingETL::ReportingETL
ReportingETL(Application &app)
Definition: ReportingETL.cpp:836
ripple::ReportingETL::isStopping
bool isStopping() const
Definition: ReportingETL.h:282
ripple::ReportingETL::consumeLedgerData
void consumeLedgerData(std::shared_ptr< Ledger > &ledger, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Consume data from a queue and insert that data into the ledger This function will continue to pull fr...
Definition: ReportingETL.cpp:54
ripple::ReportingETL::doWork
void doWork()
Definition: ReportingETL.cpp:825
ripple::ReportingETL::runETLPipeline
std::optional< uint32_t > runETLPipeline(uint32_t startSequence)
Run ETL.
Definition: ReportingETL.cpp:467
std::thread::join
T join(T... args)
ripple::RelationalDatabase::AccountTransactionsData
Struct used to keep track of what to write to transactions and account_transactions tables in Postgre...
Definition: RelationalDatabase.h:116
Json::Value
Represents a JSON value.
Definition: json_value.h:145
std::chrono::system_clock::now
T now(T... args)