rippled
ETLSource.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/reporting/ETLSource.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_reader.h>
24 #include <ripple/json/json_writer.h>
25 
26 namespace ripple {
27 
28 // Create ETL source without grpc endpoint
29 // Fetch ledger and load initial ledger will fail for this source
30 // Primarly used in read-only mode, to monitor when ledgers are validated
32  : ip_(ip)
33  , wsPort_(wsPort)
34  , etl_(etl)
35  , ioc_(etl.getApplication().getIOService())
36  , ws_(std::make_unique<
37  boost::beast::websocket::stream<boost::beast::tcp_stream>>(
38  boost::asio::make_strand(ioc_)))
39  , resolver_(boost::asio::make_strand(ioc_))
40  , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
41  , journal_(etl_.getApplication().journal("ReportingETL::ETLSource"))
42  , app_(etl_.getApplication())
43  , timer_(ioc_)
44 {
45 }
46 
48  std::string ip,
49  std::string wsPort,
50  std::string grpcPort,
51  ReportingETL& etl)
52  : ip_(ip)
53  , wsPort_(wsPort)
54  , grpcPort_(grpcPort)
55  , etl_(etl)
56  , ioc_(etl.getApplication().getIOService())
57  , ws_(std::make_unique<
58  boost::beast::websocket::stream<boost::beast::tcp_stream>>(
59  boost::asio::make_strand(ioc_)))
60  , resolver_(boost::asio::make_strand(ioc_))
61  , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
62  , journal_(etl_.getApplication().journal("ReportingETL::ETLSource"))
63  , app_(etl_.getApplication())
64  , timer_(ioc_)
65 {
66  std::string connectionString;
67  try
68  {
69  connectionString =
71  boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
72  .to_string();
73 
74  JLOG(journal_.info())
75  << "Using IP to connect to ETL source: " << connectionString;
76  }
77  catch (std::exception const&)
78  {
79  connectionString = "dns:" + ip_ + ":" + grpcPort_;
80  JLOG(journal_.info())
81  << "Using DNS to connect to ETL source: " << connectionString;
82  }
83  try
84  {
85  stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
86  grpc::CreateChannel(
87  connectionString, grpc::InsecureChannelCredentials()));
88  JLOG(journal_.info()) << "Made stub for remote = " << toString();
89  }
90  catch (std::exception const& e)
91  {
92  JLOG(journal_.error()) << "Exception while creating stub = " << e.what()
93  << " . Remote = " << toString();
94  }
95 }
96 
97 void
98 ETLSource::reconnect(boost::beast::error_code ec)
99 {
100  connected_ = false;
101  // These are somewhat normal errors. operation_aborted occurs on shutdown,
102  // when the timer is cancelled. connection_refused will occur repeatedly
103  // if we cannot connect to the transaction processing process
104  if (ec != boost::asio::error::operation_aborted &&
105  ec != boost::asio::error::connection_refused)
106  {
107  JLOG(journal_.error()) << __func__ << " : "
108  << "error code = " << ec << " - " << toString();
109  }
110  else
111  {
112  JLOG(journal_.warn()) << __func__ << " : "
113  << "error code = " << ec << " - " << toString();
114  }
115 
116  if (etl_.isStopping())
117  {
118  JLOG(journal_.debug()) << __func__ << " : " << toString()
119  << " - etl is stopping. aborting reconnect";
120  return;
121  }
122 
123  // exponentially increasing timeouts, with a max of 30 seconds
124  size_t waitTime = std::min(pow(2, numFailures_), 30.0);
125  numFailures_++;
126  timer_.expires_after(boost::asio::chrono::seconds(waitTime));
127  timer_.async_wait([this, fname = __func__](auto ec) {
128  bool startAgain = (ec != boost::asio::error::operation_aborted);
129  JLOG(journal_.trace()) << fname << " async_wait : ec = " << ec;
130  close(startAgain);
131  });
132 }
133 
134 void
135 ETLSource::close(bool startAgain)
136 {
137  timer_.cancel();
138  ioc_.post([this, startAgain]() {
139  if (closing_)
140  return;
141 
142  if (ws_->is_open())
143  {
144  // onStop() also calls close(). If the async_close is called twice,
145  // an assertion fails. Using closing_ makes sure async_close is only
146  // called once
147  closing_ = true;
148  ws_->async_close(
149  boost::beast::websocket::close_code::normal,
150  [this, startAgain, fname = __func__](auto ec) {
151  if (ec)
152  {
153  JLOG(journal_.error())
154  << fname << " async_close : "
155  << "error code = " << ec << " - " << toString();
156  }
157  closing_ = false;
158  if (startAgain)
159  start();
160  });
161  }
162  else if (startAgain)
163  {
164  start();
165  }
166  });
167 }
168 
169 void
170 ETLSource::start()
171 {
172  JLOG(journal_.trace()) << __func__ << " : " << toString();
173 
174  auto const host = ip_;
175  auto const port = wsPort_;
176 
177  resolver_.async_resolve(
178  host, port, [this](auto ec, auto results) { onResolve(ec, results); });
179 }
180 
181 void
182 ETLSource::onResolve(
183  boost::beast::error_code ec,
184  boost::asio::ip::tcp::resolver::results_type results)
185 {
186  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
187  << toString();
188  if (ec)
189  {
190  // try again
191  reconnect(ec);
192  }
193  else
194  {
195  boost::beast::get_lowest_layer(*ws_).expires_after(
197  boost::beast::get_lowest_layer(*ws_).async_connect(
198  results, [this](auto ec, auto ep) { onConnect(ec, ep); });
199  }
200 }
201 
202 void
203 ETLSource::onConnect(
204  boost::beast::error_code ec,
205  boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
206 {
207  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
208  << toString();
209  if (ec)
210  {
211  // start over
212  reconnect(ec);
213  }
214  else
215  {
216  numFailures_ = 0;
217  // Turn off timeout on the tcp stream, because websocket stream has it's
218  // own timeout system
219  boost::beast::get_lowest_layer(*ws_).expires_never();
220 
221  // Set suggested timeout settings for the websocket
222  ws_->set_option(
223  boost::beast::websocket::stream_base::timeout::suggested(
224  boost::beast::role_type::client));
225 
226  // Set a decorator to change the User-Agent of the handshake
227  ws_->set_option(boost::beast::websocket::stream_base::decorator(
228  [](boost::beast::websocket::request_type& req) {
229  req.set(
230  boost::beast::http::field::user_agent,
231  std::string(BOOST_BEAST_VERSION_STRING) +
232  " websocket-client-async");
233  }));
234 
235  // Update the host_ string. This will provide the value of the
236  // Host HTTP header during the WebSocket handshake.
237  // See https://tools.ietf.org/html/rfc7230#section-5.4
238  auto host = ip_ + ':' + std::to_string(endpoint.port());
239  // Perform the websocket handshake
240  ws_->async_handshake(host, "/", [this](auto ec) { onHandshake(ec); });
241  }
242 }
243 
244 void
245 ETLSource::onHandshake(boost::beast::error_code ec)
246 {
247  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
248  << toString();
249  if (ec)
250  {
251  // start over
252  reconnect(ec);
253  }
254  else
255  {
256  Json::Value jv;
257  jv["command"] = "subscribe";
258 
259  jv["streams"] = Json::arrayValue;
260  Json::Value ledgerStream("ledger");
261  jv["streams"].append(ledgerStream);
262  Json::Value txnStream("transactions_proposed");
263  jv["streams"].append(txnStream);
264  Json::Value validationStream("validations");
265  jv["streams"].append(validationStream);
266  Json::Value manifestStream("manifests");
267  jv["streams"].append(manifestStream);
268  Json::FastWriter fastWriter;
269 
270  JLOG(journal_.trace()) << "Sending subscribe stream message";
271  // Send the message
272  ws_->async_write(
273  boost::asio::buffer(fastWriter.write(jv)),
274  [this](auto ec, size_t size) { onWrite(ec, size); });
275  }
276 }
277 
278 void
279 ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
280 {
281  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
282  << toString();
283  if (ec)
284  {
285  // start over
286  reconnect(ec);
287  }
288  else
289  {
290  ws_->async_read(
291  readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
292  }
293 }
294 
295 void
296 ETLSource::onRead(boost::beast::error_code ec, size_t size)
297 {
298  JLOG(journal_.trace()) << __func__ << " : ec = " << ec << " - "
299  << toString();
300  // if error or error reading message, start over
301  if (ec)
302  {
303  reconnect(ec);
304  }
305  else
306  {
307  handleMessage();
308  boost::beast::flat_buffer buffer;
309  swap(readBuffer_, buffer);
310 
311  JLOG(journal_.trace())
312  << __func__ << " : calling async_read - " << toString();
313  ws_->async_read(
314  readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
315  }
316 }
317 
318 bool
319 ETLSource::handleMessage()
320 {
321  JLOG(journal_.trace()) << __func__ << " : " << toString();
322 
323  setLastMsgTime();
324  connected_ = true;
325  try
326  {
327  Json::Value response;
328  Json::Reader reader;
329  if (!reader.parse(
330  static_cast<char const*>(readBuffer_.data().data()), response))
331  {
332  JLOG(journal_.error())
333  << __func__ << " : "
334  << "Error parsing stream message."
335  << " Message = " << readBuffer_.data().data();
336  return false;
337  }
338 
339  uint32_t ledgerIndex = 0;
340  if (response.isMember("result"))
341  {
342  if (response["result"].isMember(jss::ledger_index))
343  {
344  ledgerIndex = response["result"][jss::ledger_index].asUInt();
345  }
346  if (response[jss::result].isMember(jss::validated_ledgers))
347  {
348  setValidatedRange(
349  response[jss::result][jss::validated_ledgers].asString());
350  }
351  JLOG(journal_.debug())
352  << __func__ << " : "
353  << "Received a message on ledger "
354  << " subscription stream. Message : "
355  << response.toStyledString() << " - " << toString();
356  }
357  else
358  {
359  if (etl_.getETLLoadBalancer().shouldPropagateStream(this))
360  {
361  if (response.isMember(jss::transaction))
362  {
363  etl_.getApplication().getOPs().forwardProposedTransaction(
364  response);
365  }
366  else if (
367  response.isMember("type") &&
368  response["type"] == "validationReceived")
369  {
370  etl_.getApplication().getOPs().forwardValidation(response);
371  }
372  else if (
373  response.isMember("type") &&
374  response["type"] == "manifestReceived")
375  {
376  etl_.getApplication().getOPs().forwardManifest(response);
377  }
378  }
379 
380  if (response.isMember("type") && response["type"] == "ledgerClosed")
381  {
382  JLOG(journal_.debug())
383  << __func__ << " : "
384  << "Received a message on ledger "
385  << " subscription stream. Message : "
386  << response.toStyledString() << " - " << toString();
387  if (response.isMember(jss::ledger_index))
388  {
389  ledgerIndex = response[jss::ledger_index].asUInt();
390  }
391  if (response.isMember(jss::validated_ledgers))
392  {
393  setValidatedRange(
394  response[jss::validated_ledgers].asString());
395  }
396  }
397  }
398 
399  if (ledgerIndex != 0)
400  {
401  JLOG(journal_.trace())
402  << __func__ << " : "
403  << "Pushing ledger sequence = " << ledgerIndex << " - "
404  << toString();
405  networkValidatedLedgers_.push(ledgerIndex);
406  }
407  return true;
408  }
409  catch (std::exception const& e)
410  {
411  JLOG(journal_.error()) << "Exception in handleMessage : " << e.what();
412  return false;
413  }
414 }
415 
417 {
420 
421  org::xrpl::rpc::v1::GetLedgerDataRequest request_;
423 
424  grpc::Status status_;
425 
426  unsigned char nextPrefix_;
427 
429 
430 public:
432  uint256& marker,
433  std::optional<uint256> nextMarker,
434  uint32_t seq,
435  beast::Journal& j)
436  : journal_(j)
437  {
438  request_.mutable_ledger()->set_sequence(seq);
439  if (marker.isNonZero())
440  {
441  request_.set_marker(marker.data(), marker.size());
442  }
443  request_.set_user("ETL");
444  nextPrefix_ = 0x00;
445  if (nextMarker)
446  nextPrefix_ = nextMarker->data()[0];
447 
448  unsigned char prefix = marker.data()[0];
449 
450  JLOG(journal_.debug())
451  << "Setting up AsyncCallData. marker = " << strHex(marker)
452  << " . prefix = " << strHex(std::string(1, prefix))
453  << " . nextPrefix_ = " << strHex(std::string(1, nextPrefix_));
454 
455  assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
456 
457  cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
458 
459  next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
460 
461  context_ = std::make_unique<grpc::ClientContext>();
462  }
463 
464  enum class CallStatus { MORE, DONE, ERRORED };
465  CallStatus
468  grpc::CompletionQueue& cq,
470  bool abort = false)
471  {
472  JLOG(journal_.debug()) << "Processing calldata";
473  if (abort)
474  {
475  JLOG(journal_.error()) << "AsyncCallData aborted";
476  return CallStatus::ERRORED;
477  }
478  if (!status_.ok())
479  {
480  JLOG(journal_.debug()) << "AsyncCallData status_ not ok: "
481  << " code = " << status_.error_code()
482  << " message = " << status_.error_message();
483  return CallStatus::ERRORED;
484  }
485  if (!next_->is_unlimited())
486  {
487  JLOG(journal_.warn())
488  << "AsyncCallData is_unlimited is false. Make sure "
489  "secure_gateway is set correctly at the ETL source";
490  assert(false);
491  }
492 
493  std::swap(cur_, next_);
494 
495  bool more = true;
496 
497  // if no marker returned, we are done
498  if (cur_->marker().size() == 0)
499  more = false;
500 
501  // if returned marker is greater than our end, we are done
502  unsigned char prefix = cur_->marker()[0];
503  if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
504  more = false;
505 
506  // if we are not done, make the next async call
507  if (more)
508  {
509  request_.set_marker(std::move(cur_->marker()));
510  call(stub, cq);
511  }
512 
513  for (auto& obj : cur_->ledger_objects().objects())
514  {
515  auto key = uint256::fromVoidChecked(obj.key());
516  if (!key)
517  throw std::runtime_error("Received malformed object ID");
518 
519  auto& data = obj.data();
520 
521  SerialIter it{data.data(), data.size()};
522  std::shared_ptr<SLE> sle = std::make_shared<SLE>(it, *key);
523 
524  queue.push(sle);
525  }
526 
527  return more ? CallStatus::MORE : CallStatus::DONE;
528  }
529 
530  void
533  grpc::CompletionQueue& cq)
534  {
535  context_ = std::make_unique<grpc::ClientContext>();
536 
537  std::unique_ptr<grpc::ClientAsyncResponseReader<
538  org::xrpl::rpc::v1::GetLedgerDataResponse>>
539  rpc(stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq));
540 
541  rpc->StartCall();
542 
543  rpc->Finish(next_.get(), &status_, this);
544  }
545 
548  {
549  if (next_->marker().size() == 0)
550  return "";
551  else
552  return strHex(std::string{next_->marker().data()[0]});
553  }
554 };
555 
556 bool
557 ETLSource::loadInitialLedger(
558  uint32_t sequence,
560 {
561  if (!stub_)
562  return false;
563 
564  grpc::CompletionQueue cq;
565 
566  void* tag;
567 
568  bool ok = false;
569 
571  std::vector<uint256> markers{getMarkers(etl_.getNumMarkers())};
572 
573  for (size_t i = 0; i < markers.size(); ++i)
574  {
575  std::optional<uint256> nextMarker;
576  if (i + 1 < markers.size())
577  nextMarker = markers[i + 1];
578  calls.emplace_back(markers[i], nextMarker, sequence, journal_);
579  }
580 
581  JLOG(journal_.debug()) << "Starting data download for ledger " << sequence
582  << ". Using source = " << toString();
583 
584  for (auto& c : calls)
585  c.call(stub_, cq);
586 
587  size_t numFinished = 0;
588  bool abort = false;
589  while (numFinished < calls.size() && !etl_.isStopping() &&
590  cq.Next(&tag, &ok))
591  {
592  assert(tag);
593 
594  auto ptr = static_cast<AsyncCallData*>(tag);
595 
596  if (!ok)
597  {
598  JLOG(journal_.error()) << "loadInitialLedger - ok is false";
599  return false;
600  // handle cancelled
601  }
602  else
603  {
604  JLOG(journal_.debug())
605  << "Marker prefix = " << ptr->getMarkerPrefix();
606  auto result = ptr->process(stub_, cq, writeQueue, abort);
607  if (result != AsyncCallData::CallStatus::MORE)
608  {
609  numFinished++;
610  JLOG(journal_.debug())
611  << "Finished a marker. "
612  << "Current number of finished = " << numFinished;
613  }
614  if (result == AsyncCallData::CallStatus::ERRORED)
615  {
616  abort = true;
617  }
618  }
619  }
620  return !abort;
621 }
622 
624 ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
625 {
626  org::xrpl::rpc::v1::GetLedgerResponse response;
627  if (!stub_)
628  return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
629 
630  // ledger header with txns and metadata
631  org::xrpl::rpc::v1::GetLedgerRequest request;
632  grpc::ClientContext context;
633  request.mutable_ledger()->set_sequence(ledgerSequence);
634  request.set_transactions(true);
635  request.set_expand(true);
636  request.set_get_objects(getObjects);
637  request.set_user("ETL");
638  grpc::Status status = stub_->GetLedger(&context, request, &response);
639  if (status.ok() && !response.is_unlimited())
640  {
641  JLOG(journal_.warn()) << "ETLSource::fetchLedger - is_unlimited is "
642  "false. Make sure secure_gateway is set "
643  "correctly on the ETL source. source = "
644  << toString();
645  assert(false);
646  }
647  return {status, std::move(response)};
648 }
649 
650 ETLLoadBalancer::ETLLoadBalancer(ReportingETL& etl)
651  : etl_(etl)
652  , journal_(etl_.getApplication().journal("ReportingETL::LoadBalancer"))
653 {
654 }
655 
656 void
658  std::string& host,
659  std::string& websocketPort,
660  std::string& grpcPort)
661 {
663  std::make_unique<ETLSource>(host, websocketPort, grpcPort, etl_);
664  sources_.push_back(std::move(ptr));
665  JLOG(journal_.info()) << __func__ << " : added etl source - "
666  << sources_.back()->toString();
667 }
668 
669 void
671 {
673  std::make_unique<ETLSource>(host, websocketPort, etl_);
674  sources_.push_back(std::move(ptr));
675  JLOG(journal_.info()) << __func__ << " : added etl source - "
676  << sources_.back()->toString();
677 }
678 
679 void
681  uint32_t sequence,
683 {
684  execute(
685  [this, &sequence, &writeQueue](auto& source) {
686  bool res = source->loadInitialLedger(sequence, writeQueue);
687  if (!res)
688  {
689  JLOG(journal_.error()) << "Failed to download initial ledger. "
690  << " Sequence = " << sequence
691  << " source = " << source->toString();
692  }
693  return res;
694  },
695  sequence);
696 }
697 
699 ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects)
700 {
701  org::xrpl::rpc::v1::GetLedgerResponse response;
702  bool success = execute(
703  [&response, ledgerSequence, getObjects, this](auto& source) {
704  auto [status, data] =
705  source->fetchLedger(ledgerSequence, getObjects);
706  response = std::move(data);
707  if (status.ok() && response.validated())
708  {
709  JLOG(journal_.info())
710  << "Successfully fetched ledger = " << ledgerSequence
711  << " from source = " << source->toString();
712  return true;
713  }
714  else
715  {
716  JLOG(journal_.warn())
717  << "Error getting ledger = " << ledgerSequence
718  << " Reply : " << response.DebugString()
719  << " error_code : " << status.error_code()
720  << " error_msg : " << status.error_message()
721  << " source = " << source->toString();
722  return false;
723  }
724  },
725  ledgerSequence);
726  if (success)
727  return response;
728  else
729  return {};
730 }
731 
734 {
735  if (sources_.size() == 0)
736  return nullptr;
737  srand((unsigned)time(0));
738  auto sourceIdx = rand() % sources_.size();
739  auto numAttempts = 0;
740  while (numAttempts < sources_.size())
741  {
742  auto stub = sources_[sourceIdx]->getP2pForwardingStub();
743  if (!stub)
744  {
745  sourceIdx = (sourceIdx + 1) % sources_.size();
746  ++numAttempts;
747  continue;
748  }
749  return stub;
750  }
751  return nullptr;
752 }
753 
756 {
757  Json::Value res;
758  if (sources_.size() == 0)
759  return res;
760  srand((unsigned)time(0));
761  auto sourceIdx = rand() % sources_.size();
762  auto numAttempts = 0;
763 
764  auto mostRecent = etl_.getNetworkValidatedLedgers().tryGetMostRecent();
765  while (numAttempts < sources_.size())
766  {
767  auto increment = [&]() {
768  sourceIdx = (sourceIdx + 1) % sources_.size();
769  ++numAttempts;
770  };
771  auto& src = sources_[sourceIdx];
772  if (mostRecent && !src->hasLedger(*mostRecent))
773  {
774  increment();
775  continue;
776  }
777  res = src->forwardToP2p(context);
778  if (!res.isMember("forwarded") || res["forwarded"] != true)
779  {
780  increment();
781  continue;
782  }
783  return res;
784  }
786  err.inject(res);
787  return res;
788 }
789 
792 {
793  if (!connected_)
794  return nullptr;
795  try
796  {
797  return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
798  grpc::CreateChannel(
800  boost::asio::ip::make_address(ip_), std::stoi(grpcPort_))
801  .to_string(),
802  grpc::InsecureChannelCredentials()));
803  }
804  catch (std::exception const&)
805  {
806  JLOG(journal_.error()) << "Failed to create grpc stub";
807  return nullptr;
808  }
809 }
810 
813 {
814  JLOG(journal_.debug()) << "Attempting to forward request to tx. "
815  << "request = " << context.params.toStyledString();
816 
817  Json::Value response;
818  if (!connected_)
819  {
820  JLOG(journal_.error())
821  << "Attempted to proxy but failed to connect to tx";
822  return response;
823  }
824  namespace beast = boost::beast; // from <boost/beast.hpp>
825  namespace http = beast::http; // from <boost/beast/http.hpp>
826  namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
827  namespace net = boost::asio; // from <boost/asio.hpp>
828  using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
829  Json::Value& request = context.params;
830  try
831  {
832  // The io_context is required for all I/O
833  net::io_context ioc;
834 
835  // These objects perform our I/O
836  tcp::resolver resolver{ioc};
837 
838  JLOG(journal_.debug()) << "Creating websocket";
839  auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
840 
841  // Look up the domain name
842  auto const results = resolver.resolve(ip_, wsPort_);
843 
844  JLOG(journal_.debug()) << "Connecting websocket";
845  // Make the connection on the IP address we get from a lookup
846  net::connect(ws->next_layer(), results.begin(), results.end());
847 
848  // Set a decorator to change the User-Agent of the handshake
849  // and to tell rippled to charge the client IP for RPC
850  // resources. See "secure_gateway" in
851  // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
852  ws->set_option(websocket::stream_base::decorator(
853  [&context](websocket::request_type& req) {
854  req.set(
855  http::field::user_agent,
856  std::string(BOOST_BEAST_VERSION_STRING) +
857  " websocket-client-coro");
858  req.set(
859  http::field::forwarded,
860  "for=" + context.consumer.to_string());
861  }));
862  JLOG(journal_.debug()) << "client ip: " << context.consumer.to_string();
863 
864  JLOG(journal_.debug()) << "Performing websocket handshake";
865  // Perform the websocket handshake
866  ws->handshake(ip_, "/");
867 
868  Json::FastWriter fastWriter;
869 
870  JLOG(journal_.debug()) << "Sending request";
871  // Send the message
872  ws->write(net::buffer(fastWriter.write(request)));
873 
874  beast::flat_buffer buffer;
875  ws->read(buffer);
876 
877  Json::Reader reader;
878  if (!reader.parse(
879  static_cast<char const*>(buffer.data().data()), response))
880  {
881  JLOG(journal_.error()) << "Error parsing response";
882  response[jss::error] = "Error parsing response from tx";
883  }
884  JLOG(journal_.debug()) << "Successfully forward request";
885 
886  response["forwarded"] = true;
887  return response;
888  }
889  catch (std::exception const& e)
890  {
891  JLOG(journal_.error()) << "Encountered exception : " << e.what();
892  return response;
893  }
894 }
895 
896 template <class Func>
897 bool
898 ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)
899 {
900  srand((unsigned)time(0));
901  auto sourceIdx = rand() % sources_.size();
902  auto numAttempts = 0;
903 
904  while (!etl_.isStopping())
905  {
906  auto& source = sources_[sourceIdx];
907 
908  JLOG(journal_.debug())
909  << __func__ << " : "
910  << "Attempting to execute func. ledger sequence = "
911  << ledgerSequence << " - source = " << source->toString();
912  if (source->hasLedger(ledgerSequence))
913  {
914  bool res = f(source);
915  if (res)
916  {
917  JLOG(journal_.debug())
918  << __func__ << " : "
919  << "Successfully executed func at source = "
920  << source->toString()
921  << " - ledger sequence = " << ledgerSequence;
922  break;
923  }
924  else
925  {
926  JLOG(journal_.warn())
927  << __func__ << " : "
928  << "Failed to execute func at source = "
929  << source->toString()
930  << " - ledger sequence = " << ledgerSequence;
931  }
932  }
933  else
934  {
935  JLOG(journal_.warn())
936  << __func__ << " : "
937  << "Ledger not present at source = " << source->toString()
938  << " - ledger sequence = " << ledgerSequence;
939  }
940  sourceIdx = (sourceIdx + 1) % sources_.size();
941  numAttempts++;
942  if (numAttempts % sources_.size() == 0)
943  {
944  // If another process loaded the ledger into the database, we can
945  // abort trying to fetch the ledger from a transaction processing
946  // process
948  ledgerSequence))
949  {
950  JLOG(journal_.warn())
951  << __func__ << " : "
952  << "Error executing function. "
953  << " Tried all sources, but ledger was found in db."
954  << " Sequence = " << ledgerSequence;
955  return false;
956  }
957  JLOG(journal_.error())
958  << __func__ << " : "
959  << "Error executing function "
960  << " - ledger sequence = " << ledgerSequence
961  << " - Tried all sources. Sleeping and trying again";
963  }
964  }
965  return !etl_.isStopping();
966 }
967 
968 void
970 {
971  for (auto& source : sources_)
972  source->start();
973 }
974 
975 void
977 {
978  for (auto& source : sources_)
979  source->stop();
980 }
981 
982 } // namespace ripple
ripple::AsyncCallData::context_
std::unique_ptr< grpc::ClientContext > context_
Definition: ETLSource.cpp:422
ripple::RPC::JsonContext
Definition: Context.h:53
std::this_thread::sleep_for
T sleep_for(T... args)
std::string
STL class.
std::shared_ptr
STL class.
ripple::AsyncCallData::CallStatus
CallStatus
Definition: ETLSource.cpp:464
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::ETLLoadBalancer::forwardToP2p
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a randomly selected p2p node.
Definition: ETLSource.cpp:755
ripple::ETLLoadBalancer::start
void start()
Setup all of the ETL sources and subscribe to the necessary streams.
Definition: ETLSource.cpp:969
Json::arrayValue
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
beast::IP::Endpoint::to_string
std::string to_string() const
Returns a string representing the endpoint.
Definition: IPEndpoint.cpp:57
std::pair
std::vector
STL class.
ripple::ETLSource::etl_
ReportingETL & etl_
Definition: ETLSource.h:54
std::chrono::seconds
ripple::ETLLoadBalancer::fetchLedger
std::optional< org::xrpl::rpc::v1::GetLedgerResponse > fetchLedger(uint32_t ledgerSequence, bool getObjects)
Fetch data for a specific ledger.
Definition: ETLSource.cpp:699
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::unique_ptr::get
T get(T... args)
ripple::ETLSource::ip_
std::string ip_
Definition: ETLSource.h:48
ripple::AsyncCallData::status_
grpc::Status status_
Definition: ETLSource.cpp:424
ripple::ETLSource::ETLSource
ETLSource(std::string ip, std::string wsPort, ReportingETL &etl)
Create ETL source without gRPC endpoint Fetch ledger and load initial ledger will fail for this sourc...
Definition: ETLSource.cpp:31
ripple::AsyncCallData::journal_
beast::Journal journal_
Definition: ETLSource.cpp:428
boost
Definition: IPAddress.h:103
Json::Value::toStyledString
std::string toStyledString() const
Definition: json_value.cpp:1039
ripple::ReportingETL::getApplication
Application & getApplication()
Definition: ReportingETL.h:298
Json::Reader
Unserialize a JSON document into a Value.
Definition: json_reader.h:36
ripple::ETLLoadBalancer::getP2pForwardingStub
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Randomly select a p2p node to forward a gRPC request to.
Definition: ETLSource.cpp:733
ripple::base_uint::data
pointer data()
Definition: base_uint.h:122
ripple::base_uint::size
constexpr static std::size_t size()
Definition: base_uint.h:519
ripple::AsyncCallData::getMarkerPrefix
std::string getMarkerPrefix()
Definition: ETLSource.cpp:547
ripple::base_uint< 256 >
ripple::Resource::Consumer::to_string
std::string to_string() const
Return a human readable string uniquely identifying this consumer.
Definition: Consumer.cpp:71
std::stoi
T stoi(T... args)
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:882
ripple::ETLLoadBalancer::journal_
beast::Journal journal_
Definition: ETLSource.h:320
ripple::RPC::Context::consumer
Resource::Consumer & consumer
Definition: Context.h:46
ripple::ETLSource::connected_
std::atomic_bool connected_
Definition: ETLSource.h:83
ripple::ETLSource::stub_
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > stub_
Definition: ETLSource.h:59
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::ETLLoadBalancer::stop
void stop()
Definition: ETLSource.cpp:976
ripple::ETLLoadBalancer::loadInitialLedger
void loadInitialLedger(uint32_t sequence, ThreadSafeQueue< std::shared_ptr< SLE >> &writeQueue)
Load the initial ledger, writing data to the queue.
Definition: ETLSource.cpp:680
ripple::AsyncCallData::nextPrefix_
unsigned char nextPrefix_
Definition: ETLSource.cpp:426
ripple::ETLSource::reconnect
void reconnect(boost::beast::error_code ec)
Attempt to reconnect to the ETL source.
Definition: ETLSource.cpp:98
ripple::ReportingETL::getNetworkValidatedLedgers
NetworkValidatedLedgers & getNetworkValidatedLedgers()
Definition: ReportingETL.h:276
ripple::AsyncCallData::AsyncCallData
AsyncCallData(uint256 &marker, std::optional< uint256 > nextMarker, uint32_t seq, beast::Journal &j)
Definition: ETLSource.cpp:431
ripple::ETLLoadBalancer::execute
bool execute(Func f, uint32_t ledgerSequence)
f is a function that takes an ETLSource as an argument and returns a bool.
Definition: ETLSource.cpp:898
std::to_string
T to_string(T... args)
ripple::AsyncCallData::cur_
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse > cur_
Definition: ETLSource.cpp:418
boost::asio
Definition: Overlay.h:41
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::AsyncCallData::process
CallStatus process(std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > &stub, grpc::CompletionQueue &cq, ThreadSafeQueue< std::shared_ptr< SLE >> &queue, bool abort=false)
Definition: ETLSource.cpp:466
ripple::LedgerMaster::getLedgerBySeq
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
Definition: LedgerMaster.cpp:1818
ripple::ReportingETL
This class is responsible for continuously extracting data from a p2p node, and writing that data to ...
Definition: ReportingETL.h:70
ripple::ETLSource::grpcPort_
std::string grpcPort_
Definition: ETLSource.h:52
std::runtime_error
STL class.
ripple::SerialIter
Definition: Serializer.h:310
Json::Value::isMember
bool isMember(const char *key) const
Return true if the object has a member named key.
Definition: json_value.cpp:932
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::AsyncCallData
Definition: ETLSource.cpp:416
ripple::NetworkValidatedLedgers::tryGetMostRecent
std::optional< uint32_t > tryGetMostRecent() const
Get most recently validated sequence.
Definition: ETLHelpers.h:78
Json::FastWriter::write
std::string write(const Value &root) override
Definition: json_writer.cpp:193
ripple::AsyncCallData::call
void call(std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > &stub, grpc::CompletionQueue &cq)
Definition: ETLSource.cpp:531
ripple::RPC::Status::inject
void inject(Object &object) const
Apply the Status to a JsonObject.
Definition: Status.h:115
ripple::RPC::Status
Status represents the results of an operation that might fail.
Definition: Status.h:39
ripple::ETLLoadBalancer::sources_
std::vector< std::unique_ptr< ETLSource > > sources_
Definition: ETLSource.h:322
ripple::ETLSource::journal_
beast::Journal journal_
Definition: ETLSource.h:73
ripple::ETLSource::getP2pForwardingStub
std::unique_ptr< org::xrpl::rpc::v1::XRPLedgerAPIService::Stub > getP2pForwardingStub() const
Get grpc stub to forward requests to p2p node.
Definition: ETLSource.cpp:791
std::swap
T swap(T... args)
std::min
T min(T... args)
ripple::ETLSource::numFailures_
size_t numFailures_
Definition: ETLSource.h:79
ripple::ETLSource::ioc_
boost::asio::io_context & ioc_
Definition: ETLSource.h:57
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ETLSource::start
void start()
Begin sequence of operations to connect to the ETL source and subscribe to ledgers and transactions_p...
Definition: ETLSource.cpp:170
ripple::ETLSource::ws_
std::unique_ptr< boost::beast::websocket::stream< boost::beast::tcp_stream > > ws_
Definition: ETLSource.h:62
std
STL namespace.
ripple::ETLSource::closing_
std::atomic_bool closing_
Definition: ETLSource.h:81
ripple::ETLLoadBalancer::etl_
ReportingETL & etl_
Definition: ETLSource.h:318
Json::Value::asUInt
UInt asUInt() const
Definition: json_value.cpp:545
Json::FastWriter
Outputs a Value in JSON format without formatting (not human friendly).
Definition: json_writer.h:52
Json::Reader::parse
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:74
ripple::rpcFAILED_TO_FORWARD
@ rpcFAILED_TO_FORWARD
Definition: ErrorCodes.h:140
ripple::AsyncCallData::next_
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse > next_
Definition: ETLSource.cpp:419
ripple::ETLSource::toString
std::string toString() const
Definition: ETLSource.h:222
std::optional
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
ripple::strHex
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
ripple::ETLSource::close
void close(bool startAgain)
Close the websocket.
Definition: ETLSource.cpp:135
ripple::AsyncCallData::request_
org::xrpl::rpc::v1::GetLedgerDataRequest request_
Definition: ETLSource.cpp:421
std::unique_ptr< org::xrpl::rpc::v1::GetLedgerDataResponse >
ripple::ETLSource::timer_
boost::asio::steady_timer timer_
Definition: ETLSource.h:96
ripple::RPC::JsonContext::params
Json::Value params
Definition: Context.h:64
ripple::ReportingETL::isStopping
bool isStopping() const
Definition: ReportingETL.h:282
std::exception::what
T what(T... args)
ripple::ETLSource::forwardToP2p
Json::Value forwardToP2p(RPC::JsonContext &context) const
Forward a JSON RPC request to a p2p node.
Definition: ETLSource.cpp:812
ripple::getMarkers
std::vector< uint256 > getMarkers(size_t numMarkers)
Parititions the uint256 keyspace into numMarkers partitions, each of equal size.
Definition: ETLHelpers.h:177
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::ETLSource::wsPort_
std::string wsPort_
Definition: ETLSource.h:50
beast
Definition: base_uint.h:641
ripple::ETLLoadBalancer::add
void add(std::string &host, std::string &websocketPort, std::string &grpcPort)
Add an ETL source.
Definition: ETLSource.cpp:657