rippled
ETLSource.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
21 #define RIPPLE_APP_REPORTING_ETLSOURCE_H_INCLUDED
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/reporting/ETLHelpers.h>
24 #include <ripple/protocol/STLedgerEntry.h>
25 #include <ripple/rpc/Context.h>
26 
27 #include <boost/algorithm/string.hpp>
28 #include <boost/beast/core.hpp>
29 #include <boost/beast/core/string.hpp>
30 #include <boost/beast/websocket.hpp>
31 
32 #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
33 #include <grpcpp/grpcpp.h>
34 
35 namespace ripple {
36 
37 class ReportingETL;
38 
46 class ETLSource
47 {
49 
51 
53 
55 
56  // a reference to the applications io_service
57  boost::asio::io_context& ioc_;
58 
60 
62  ws_;
63  boost::asio::ip::tcp::resolver resolver_;
64 
65  boost::beast::flat_buffer readBuffer_;
66 
68 
70 
72 
74 
76 
77  mutable std::mutex mtx_;
78 
79  size_t numFailures_ = 0;
80 
82 
84 
85  // true if this ETL source is forwarding transactions received on the
86  // transactions_proposed stream. There are usually multiple ETL sources,
87  // so to avoid forwarding the same transaction multiple times, we only
88  // forward from one particular ETL source at a time.
90 
91  // The last time a message was received on the ledgers stream
92  std::chrono::system_clock::time_point lastMsgTime_;
94 
95  // used for retrying connections
96  boost::asio::steady_timer timer_;
97 
98 public:
99  bool
100  isConnected() const
101  {
102  return connected_;
103  }
104 
105  std::chrono::system_clock::time_point
107  {
109  return lastMsgTime_;
110  }
111 
112  void
114  {
117  }
118 
122  ETLSource(std::string ip, std::string wsPort, ReportingETL& etl);
123 
125  ETLSource(
126  std::string ip,
127  std::string wsPort,
128  std::string grpcPort,
129  ReportingETL& etl);
130 
133  bool
134  hasLedger(uint32_t sequence) const
135  {
136  std::lock_guard lck(mtx_);
137  for (auto& pair : validatedLedgers_)
138  {
139  if (sequence >= pair.first && sequence <= pair.second)
140  {
141  return true;
142  }
143  else if (sequence < pair.first)
144  {
145  // validatedLedgers_ is a sorted list of disjoint ranges
146  // if the sequence comes before this range, the sequence will
147  // come before all subsequent ranges
148  return false;
149  }
150  }
151  return false;
152  }
153 
157  void
159  {
162  boost::split(ranges, range, boost::is_any_of(","));
163  for (auto& pair : ranges)
164  {
165  std::vector<std::string> minAndMax;
166 
167  boost::split(minAndMax, pair, boost::is_any_of("-"));
168 
169  if (minAndMax.size() == 1)
170  {
171  uint32_t sequence = std::stoll(minAndMax[0]);
172  pairs.push_back(std::make_pair(sequence, sequence));
173  }
174  else
175  {
176  assert(minAndMax.size() == 2);
177  uint32_t min = std::stoll(minAndMax[0]);
178  uint32_t max = std::stoll(minAndMax[1]);
179  pairs.push_back(std::make_pair(min, max));
180  }
181  }
182  std::sort(pairs.begin(), pairs.end(), [](auto left, auto right) {
183  return left.first < right.first;
184  });
185 
186  // we only hold the lock here, to avoid blocking while string processing
187  std::lock_guard lck(mtx_);
188  validatedLedgers_ = std::move(pairs);
190  }
191 
196  {
197  std::lock_guard lck(mtx_);
198 
199  return validatedLedgersRaw_;
200  }
201 
203  void
205  {
206  JLOG(journal_.debug()) << __func__ << " : "
207  << "Closing websocket";
208 
209  assert(ws_);
210  close(false);
211  }
212 
219  fetchLedger(uint32_t ledgerSequence, bool getObjects = true);
220 
222  toString() const
223  {
224  return "{ validated_ledger : " + getValidatedRange() +
225  " , ip : " + ip_ + " , web socket port : " + wsPort_ +
226  ", grpc port : " + grpcPort_ + " }";
227  }
228 
230  toJson() const
231  {
233  result["connected"] = connected_.load();
234  result["validated_ledgers_range"] = getValidatedRange();
235  result["ip"] = ip_;
236  result["websocket_port"] = wsPort_;
237  result["grpc_port"] = grpcPort_;
238  auto last = getLastMsgTime();
239  if (last.time_since_epoch().count() != 0)
240  result["last_message_arrival_time"] =
241  to_string(std::chrono::floor<std::chrono::microseconds>(last));
242  return result;
243  }
244 
249  bool
251  uint32_t ledgerSequence,
253 
256  void
257  start();
258 
260  void
261  reconnect(boost::beast::error_code ec);
262 
264  void
265  onResolve(
266  boost::beast::error_code ec,
267  boost::asio::ip::tcp::resolver::results_type results);
268 
270  void
271  onConnect(
272  boost::beast::error_code ec,
273  boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint);
274 
276  void
277  onHandshake(boost::beast::error_code ec);
278 
280  void
281  onWrite(boost::beast::error_code ec, size_t size);
282 
284  void
285  onRead(boost::beast::error_code ec, size_t size);
286 
289  bool
290  handleMessage();
291 
294  void
295  close(bool startAgain);
296 
300  getP2pForwardingStub() const;
301 
306  forwardToP2p(RPC::JsonContext& context) const;
307 };
308 
316 {
317 private:
319 
321 
323 
324 public:
326 
331  void
332  add(std::string& host, std::string& websocketPort, std::string& grpcPort);
333 
339  void
340  add(std::string& host, std::string& websocketPort);
341 
345  void
347  uint32_t sequence,
349 
360  fetchLedger(uint32_t ledgerSequence, bool getObjects);
361 
363  void
364  start();
365 
366  void
367  stop();
368 
376  bool
378  {
379  for (auto& src : sources_)
380  {
381  assert(src);
382  // We pick the first ETLSource encountered that is connected
383  if (src->isConnected())
384  {
385  if (src.get() == in)
386  return true;
387  else
388  return false;
389  }
390  }
391 
392  // If no sources connected, then this stream has not been forwarded.
393  return true;
394  }
395 
397  toJson() const
398  {
400  for (auto& src : sources_)
401  {
402  ret.append(src->toJson());
403  }
404  return ret;
405  }
406 
410  getP2pForwardingStub() const;
411 
416  forwardToP2p(RPC::JsonContext& context) const;
417 
418 private:
429  template <class Func>
430  bool
431  execute(Func f, uint32_t ledgerSequence);
432 };
433 
434 } // namespace ripple
435 #endif
ripple::Application
Definition: Application.h:115
ripple::RPC::JsonContext
Definition: Context.h:53
std::string
STL class.
std::shared_ptr
STL class.
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
ripple::ETLSource::toJson
Json::Value toJson() const
Definition: ETLSource.h:230
ripple::ETLSource::validatedLedgers_
std::vector< std::pair< uint32_t, uint32_t > > validatedLedgers_
Definition: ETLSource.h:67
ripple::ETLLoadBalancer::forwardToP2p
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a randomly selected p2p node.
Definition: ETLSource.cpp:755
ripple::ETLLoadBalancer::start
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
Definition: ETLSource.cpp:969
ripple::ETLSource::loadInitialLedger
bool loadInitialLedger(uint32_t ledgerSequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Download a ledger in full.
Definition: ETLSource.cpp:557
Json::arrayValue
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
std::pair
ripple::ETLSource::onRead
void onRead(boost::beast::error_code ec, size_t size)
Callback.
Definition: ETLSource.cpp:296
ripple::ETLLoadBalancer::shouldPropagateStream
bool shouldPropagateStream(ETLSource *in) const
Determine whether messages received on the transactions_proposed stream should be forwarded to subscr...
Definition: ETLSource.h:377
std::vector
STL class.
std::vector::size
T size(T... args)
ripple::ETLSource::mtx_
std::mutex mtx_
Definition: ETLSource.h:77
ripple::ETLSource::etl_
ReportingETL & etl_
Definition: ETLSource.h:54
ripple::ETLLoadBalancer::fetchLedger
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:699
std::lock_guard
STL class.
ripple::ETLSource::getLastMsgTime
std::chrono::system_clock::time_point getLastMsgTime() const
Definition: ETLSource.h:106
ripple::ETLSource::ip_
std::string ip_
Definition: ETLSource.h:48
ripple::NetworkValidatedLedgers
This datastructure is used to keep track of the sequence of the most recent ledger validated by the n...
Definition: ETLHelpers.h:39
ripple::ETLSource::ETLSource
ETLSource(std::string ip, std::string wsPort, ReportingETL &etl)
Create ETL source without gRPC endpoint Fetch ledger and load initial ledger will fail for this sourc...
Definition: ETLSource.cpp:31
ripple::QualityDirection::in
@ in
ripple::ETLSource::onResolve
void onResolve(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results)
Callback.
Definition: ETLSource.cpp:182
ripple::ETLSource::stop
void stop()
Close the underlying websocket.
Definition: ETLSource.h:204
ripple::ETLSource::forwardingStream_
std::atomic_bool forwardingStream_
Definition: ETLSource.h:89
ripple::ETLLoadBalancer::getP2pForwardingStub
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Randomly select a p2p node to forward a gRPC request to.
Definition: ETLSource.cpp:733
ripple::ETLSource::validatedLedgersRaw_
std::string validatedLedgersRaw_
Definition: ETLSource.h:69
ripple::ETLSource::onHandshake
void onHandshake(boost::beast::error_code ec)
Callback.
Definition: ETLSource.cpp:245
ripple::ETLSource::readBuffer_
boost::beast::flat_buffer readBuffer_
Definition: ETLSource.h:65
ripple::ETLSource
This class manages a connection to a single ETL source.
Definition: ETLSource.h:46
ripple::ETLLoadBalancer::ETLLoadBalancer
ETLLoadBalancer(ReportingETL &etl)
Definition: ETLSource.cpp:650
std::sort
T sort(T... args)
std::vector::push_back
T push_back(T... args)
ripple::ETLSource::fetchLedger
std::pair< grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects=true)
Fetch the specified ledger.
Definition: ETLSource.cpp:624
ripple::ETLSource::setLastMsgTime
void setLastMsgTime()
Definition: ETLSource.h:113
std::stoll
T stoll(T... args)
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:882
ripple::ETLLoadBalancer::journal_
beast::Journal journal_
Definition: ETLSource.h:320
ripple::ETLSource::connected_
std::atomic_bool connected_
Definition: ETLSource.h:83
ripple::ETLSource::stub_
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > stub_
Definition: ETLSource.h:59
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
std::atomic_bool::load
T load(T... args)
ripple::ETLSource::hasLedger
bool hasLedger(uint32_t sequence) const
Definition: ETLSource.h:134
ripple::ETLLoadBalancer::stop
void stop()
Definition: ETLSource.cpp:976
ripple::ETLLoadBalancer::loadInitialLedger
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:680
ripple::ETLSource::onConnect
void onConnect(boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
Callback.
Definition: ETLSource.cpp:203
ripple::ETLSource::lastMsgTime_
std::chrono::system_clock::time_point lastMsgTime_
Definition: ETLSource.h:92
ripple::ETLSource::reconnect
void reconnect(boost::beast::error_code ec)
Attempt to reconnect to the ETL source.
Definition: ETLSource.cpp:98
ripple::ETLLoadBalancer
This class is used to manage connections to transaction processing processes This class spawns a list...
Definition: ETLSource.h:315
ripple::ETLLoadBalancer::execute
bool execute(Func f, uint32_t ledgerSequence)
f is a function that takes an ETLSource as an argument and returns a bool.
Definition: ETLSource.cpp:898
ripple::ETLSource::isConnected
bool isConnected() const
Definition: ETLSource.h:100
ripple::ReportingETL
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition: ReportingETL.h:70
ripple::ETLSource::grpcPort_
std::string grpcPort_
Definition: ETLSource.h:52
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::atomic_bool
ripple::ETLSource::app_
Application & app_
Definition: ETLSource.h:75
ripple::range
ClosedInterval< T > range(T low, T high)
Create a closed range interval.
Definition: RangeSet.h:53
ripple::ETLLoadBalancer::sources_
std::vector< std::unique_ptr< ETLSource > > sources_
Definition: ETLSource.h:322
ripple::ETLSource::journal_
beast::Journal journal_
Definition: ETLSource.h:73
ripple::ETLSource::getP2pForwardingStub
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Get grpc stub to forward requests to p2p node.
Definition: ETLSource.cpp:791
ripple::ETLLoadBalancer::toJson
Json::Value toJson() const
Definition: ETLSource.h:397
ripple::ETLSource::numFailures_
size_t numFailures_
Definition: ETLSource.h:79
ripple::ETLSource::ioc_
boost::asio::io_context & ioc_
Definition: ETLSource.h:57
ripple::ETLSource::getValidatedRange
std::string getValidatedRange() const
Definition: ETLSource.h:195
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ETLSource::start
void start()
Begin sequence of operations to connect to the ETL source and subscribe to ledgers and transactions_p...
Definition: ETLSource.cpp:170
ripple::ETLSource::handleMessage
bool handleMessage()
Handle the most recently received message.
Definition: ETLSource.cpp:319
ripple::ETLSource::networkValidatedLedgers_
NetworkValidatedLedgers & networkValidatedLedgers_
Definition: ETLSource.h:71
ripple::ETLSource::setValidatedRange
void setValidatedRange(std::string const &range)
process the validated range received on the ledgers stream.
Definition: ETLSource.h:158
std::vector::begin
T begin(T... args)
ripple::ETLSource::ws_
std::unique_ptr< boost::beast::websocket::stream< boost::beast::tcp_stream > > ws_
Definition: ETLSource.h:62
ripple::ETLSource::closing_
std::atomic_bool closing_
Definition: ETLSource.h:81
ripple::ETLLoadBalancer::etl_
ReportingETL & etl_
Definition: ETLSource.h:318
ripple::ETLSource::onWrite
void onWrite(boost::beast::error_code ec, size_t size)
Callback.
Definition: ETLSource.cpp:279
ripple::ETLSource::toString
std::string toString() const
Definition: ETLSource.h:222
ripple::ETLSource::resolver_
boost::asio::ip::tcp::resolver resolver_
Definition: ETLSource.h:63
std::optional
std::mutex
STL class.
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
std::make_pair
T make_pair(T... args)
std::vector::end
T end(T... args)
ripple::ETLSource::close
void close(bool startAgain)
Close the websocket.
Definition: ETLSource.cpp:135
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub >
ripple::ETLSource::lastMsgTimeMtx_
std::mutex lastMsgTimeMtx_
Definition: ETLSource.h:93
ripple::ETLSource::timer_
boost::asio::steady_timer timer_
Definition: ETLSource.h:96
ripple::ETLSource::forwardToP2p
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a p2p node.
Definition: ETLSource.cpp:812
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::ETLSource::wsPort_
std::string wsPort_
Definition: ETLSource.h:50
ripple::ETLLoadBalancer::add
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:657
std::chrono::system_clock::now
T now(T... args)