20 #include <ripple/app/reporting/ETLSource.h>
21 #include <ripple/app/reporting/ReportingETL.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/json/json_reader.h>
24 #include <ripple/json/json_writer.h>
35 , ioc_(etl.getApplication().getIOService())
36 , ws_(
std::make_unique<
38 boost::asio::make_strand(ioc_)))
39 , resolver_(
boost::asio::make_strand(ioc_))
40 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
41 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
42 , app_(etl_.getApplication())
56 , ioc_(etl.getApplication().getIOService())
57 , ws_(
std::make_unique<
59 boost::asio::make_strand(ioc_)))
60 , resolver_(
boost::asio::make_strand(ioc_))
61 , networkValidatedLedgers_(etl_.getNetworkValidatedLedgers())
62 , journal_(etl_.getApplication().journal(
"ReportingETL::ETLSource"))
63 , app_(etl_.getApplication())
75 <<
"Using IP to connect to ETL source: " << connectionString;
81 <<
"Using DNS to connect to ETL source: " << connectionString;
85 stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
87 connectionString, grpc::InsecureChannelCredentials()));
104 if (ec != boost::asio::error::operation_aborted &&
105 ec != boost::asio::error::connection_refused)
108 <<
"error code = " << ec <<
" - " <<
toString();
113 <<
"error code = " << ec <<
" - " <<
toString();
119 <<
" - etl is stopping. aborting reconnect";
126 timer_.expires_after(boost::asio::chrono::seconds(waitTime));
127 timer_.async_wait([
this, fname = __func__](
auto ec) {
128 bool startAgain = (ec != boost::asio::error::operation_aborted);
129 JLOG(
journal_.
trace()) << fname <<
" async_wait : ec = " << ec;
138 ioc_.post([
this, startAgain]() {
149 boost::beast::websocket::close_code::normal,
150 [this, startAgain, fname = __func__](auto ec) {
153 JLOG(journal_.error())
154 << fname <<
" async_close : "
155 <<
"error code = " << ec <<
" - " << toString();
172 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
174 auto const host = ip_;
175 auto const port = wsPort_;
177 resolver_.async_resolve(
178 host, port, [
this](
auto ec,
auto results) { onResolve(ec, results); });
182 ETLSource::onResolve(
183 boost::beast::error_code ec,
184 boost::asio::ip::tcp::resolver::results_type results)
186 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
195 boost::beast::get_lowest_layer(*ws_).expires_after(
197 boost::beast::get_lowest_layer(*ws_).async_connect(
198 results, [
this](
auto ec,
auto ep) { onConnect(ec, ep); });
203 ETLSource::onConnect(
204 boost::beast::error_code ec,
205 boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
207 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
219 boost::beast::get_lowest_layer(*ws_).expires_never();
223 boost::beast::websocket::stream_base::timeout::suggested(
224 boost::beast::role_type::client));
227 ws_->set_option(boost::beast::websocket::stream_base::decorator(
228 [](boost::beast::websocket::request_type& req) {
230 boost::beast::http::field::user_agent,
232 " websocket-client-async");
240 ws_->async_handshake(host,
"/", [
this](
auto ec) { onHandshake(ec); });
245 ETLSource::onHandshake(boost::beast::error_code ec)
247 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
257 jv[
"command"] =
"subscribe";
261 jv[
"streams"].
append(ledgerStream);
263 jv[
"streams"].
append(txnStream);
265 jv[
"streams"].
append(validationStream);
267 jv[
"streams"].
append(manifestStream);
270 JLOG(journal_.trace()) <<
"Sending subscribe stream message";
273 boost::asio::buffer(fastWriter.
write(jv)),
274 [
this](
auto ec,
size_t size) { onWrite(ec, size); });
279 ETLSource::onWrite(boost::beast::error_code ec,
size_t bytesWritten)
281 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
291 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
296 ETLSource::onRead(boost::beast::error_code ec,
size_t size)
298 JLOG(journal_.trace()) << __func__ <<
" : ec = " << ec <<
" - "
308 boost::beast::flat_buffer buffer;
309 swap(readBuffer_, buffer);
311 JLOG(journal_.trace())
312 << __func__ <<
" : calling async_read - " << toString();
314 readBuffer_, [
this](
auto ec,
size_t size) { onRead(ec, size); });
319 ETLSource::handleMessage()
321 JLOG(journal_.trace()) << __func__ <<
" : " << toString();
330 static_cast<char const*
>(readBuffer_.data().data()), response))
332 JLOG(journal_.error())
334 <<
"Error parsing stream message."
335 <<
" Message = " << readBuffer_.data().data();
339 uint32_t ledgerIndex = 0;
342 if (response[
"result"].isMember(jss::ledger_index))
344 ledgerIndex = response[
"result"][jss::ledger_index].
asUInt();
346 if (response[jss::result].isMember(jss::validated_ledgers))
349 response[jss::result][jss::validated_ledgers].asString());
351 JLOG(journal_.debug())
353 <<
"Received a message on ledger "
354 <<
" subscription stream. Message : "
359 if (etl_.getETLLoadBalancer().shouldPropagateStream(
this))
361 if (response.
isMember(jss::transaction))
363 etl_.getApplication().getOPs().forwardProposedTransaction(
368 response[
"type"] ==
"validationReceived")
370 etl_.getApplication().getOPs().forwardValidation(response);
374 response[
"type"] ==
"manifestReceived")
376 etl_.getApplication().getOPs().forwardManifest(response);
380 if (response.
isMember(
"type") && response[
"type"] ==
"ledgerClosed")
382 JLOG(journal_.debug())
384 <<
"Received a message on ledger "
385 <<
" subscription stream. Message : "
387 if (response.
isMember(jss::ledger_index))
389 ledgerIndex = response[jss::ledger_index].
asUInt();
391 if (response.
isMember(jss::validated_ledgers))
394 response[jss::validated_ledgers].asString());
399 if (ledgerIndex != 0)
401 JLOG(journal_.trace())
403 <<
"Pushing ledger sequence = " << ledgerIndex <<
" - "
405 networkValidatedLedgers_.push(ledgerIndex);
411 JLOG(journal_.error()) <<
"Exception in handleMessage : " << e.
what();
438 request_.mutable_ledger()->set_sequence(seq);
441 request_.set_marker(marker.
data(), marker.
size());
443 request_.set_user(
"ETL");
446 nextPrefix_ = nextMarker->data()[0];
448 unsigned char prefix = marker.
data()[0];
450 JLOG(journal_.
debug())
451 <<
"Setting up AsyncCallData. marker = " <<
strHex(marker)
455 assert(nextPrefix_ > prefix || nextPrefix_ == 0x00);
457 cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
459 next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
461 context_ = std::make_unique<grpc::ClientContext>();
468 grpc::CompletionQueue& cq,
472 JLOG(journal_.
debug()) <<
"Processing calldata";
475 JLOG(journal_.
error()) <<
"AsyncCallData aborted";
476 return CallStatus::ERRORED;
480 JLOG(journal_.
debug()) <<
"AsyncCallData status_ not ok: "
481 <<
" code = " << status_.error_code()
482 <<
" message = " << status_.error_message();
483 return CallStatus::ERRORED;
485 if (!next_->is_unlimited())
487 JLOG(journal_.
warn())
488 <<
"AsyncCallData is_unlimited is false. Make sure "
489 "secure_gateway is set correctly at the ETL source";
498 if (cur_->marker().size() == 0)
502 unsigned char prefix = cur_->marker()[0];
503 if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
509 request_.set_marker(std::move(cur_->marker()));
513 for (
auto& obj : cur_->ledger_objects().objects())
515 auto key = uint256::fromVoidChecked(obj.key());
519 auto& data = obj.data();
527 return more ? CallStatus::MORE : CallStatus::DONE;
533 grpc::CompletionQueue& cq)
535 context_ = std::make_unique<grpc::ClientContext>();
538 org::xrpl::rpc::v1::GetLedgerDataResponse>>
539 rpc(stub->PrepareAsyncGetLedgerData(context_.
get(), request_, &cq));
543 rpc->Finish(next_.
get(), &status_,
this);
549 if (next_->marker().size() == 0)
557 ETLSource::loadInitialLedger(
564 grpc::CompletionQueue cq;
573 for (
size_t i = 0; i < markers.size(); ++i)
576 if (i + 1 < markers.size())
577 nextMarker = markers[i + 1];
578 calls.
emplace_back(markers[i], nextMarker, sequence, journal_);
581 JLOG(journal_.debug()) <<
"Starting data download for ledger " << sequence
582 <<
". Using source = " << toString();
584 for (
auto& c : calls)
587 size_t numFinished = 0;
589 while (numFinished < calls.size() && !etl_.isStopping() &&
598 JLOG(journal_.error()) <<
"loadInitialLedger - ok is false";
604 JLOG(journal_.debug())
605 <<
"Marker prefix = " << ptr->getMarkerPrefix();
606 auto result = ptr->process(stub_, cq, writeQueue, abort);
607 if (result != AsyncCallData::CallStatus::MORE)
610 JLOG(journal_.debug())
611 <<
"Finished a marker. "
612 <<
"Current number of finished = " << numFinished;
614 if (result == AsyncCallData::CallStatus::ERRORED)
624 ETLSource::fetchLedger(uint32_t ledgerSequence,
bool getObjects)
626 org::xrpl::rpc::v1::GetLedgerResponse response;
628 return {{grpc::StatusCode::INTERNAL,
"No Stub"}, response};
631 org::xrpl::rpc::v1::GetLedgerRequest request;
632 grpc::ClientContext context;
633 request.mutable_ledger()->set_sequence(ledgerSequence);
634 request.set_transactions(
true);
635 request.set_expand(
true);
636 request.set_get_objects(getObjects);
637 request.set_user(
"ETL");
638 grpc::Status status = stub_->GetLedger(&context, request, &response);
639 if (status.ok() && !response.is_unlimited())
641 JLOG(journal_.warn()) <<
"ETLSource::fetchLedger - is_unlimited is "
642 "false. Make sure secure_gateway is set "
643 "correctly on the ETL source. source = "
647 return {status, std::move(response)};
652 , journal_(etl_.getApplication().journal(
"ReportingETL::LoadBalancer"))
663 std::make_unique<ETLSource>(host, websocketPort, grpcPort,
etl_);
665 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
673 std::make_unique<ETLSource>(host, websocketPort,
etl_);
675 JLOG(
journal_.
info()) << __func__ <<
" : added etl source - "
685 [
this, &sequence, &writeQueue](
auto& source) {
686 bool res = source->loadInitialLedger(sequence, writeQueue);
689 JLOG(
journal_.
error()) <<
"Failed to download initial ledger. "
690 <<
" Sequence = " << sequence
691 <<
" source = " << source->toString();
701 org::xrpl::rpc::v1::GetLedgerResponse response;
703 [&response, ledgerSequence, getObjects,
this](
auto& source) {
704 auto [status, data] =
705 source->fetchLedger(ledgerSequence, getObjects);
706 response = std::move(data);
707 if (status.ok() && response.validated())
710 <<
"Successfully fetched ledger = " << ledgerSequence
711 <<
" from source = " << source->toString();
717 <<
"Error getting ledger = " << ledgerSequence
718 <<
" Reply : " << response.DebugString()
719 <<
" error_code : " << status.error_code()
720 <<
" error_msg : " << status.error_message()
721 <<
" source = " << source->toString();
737 srand((
unsigned)time(0));
738 auto sourceIdx = rand() %
sources_.size();
739 auto numAttempts = 0;
740 while (numAttempts <
sources_.size())
742 auto stub =
sources_[sourceIdx]->getP2pForwardingStub();
745 sourceIdx = (sourceIdx + 1) %
sources_.size();
760 srand((
unsigned)time(0));
761 auto sourceIdx = rand() %
sources_.size();
762 auto numAttempts = 0;
765 while (numAttempts <
sources_.size())
767 auto increment = [&]() {
768 sourceIdx = (sourceIdx + 1) %
sources_.size();
772 if (mostRecent && !src->hasLedger(*mostRecent))
777 res = src->forwardToP2p(context);
778 if (!res.
isMember(
"forwarded") || res[
"forwarded"] !=
true)
797 return org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
802 grpc::InsecureChannelCredentials()));
814 JLOG(
journal_.
debug()) <<
"Attempting to forward request to tx. "
821 <<
"Attempted to proxy but failed to connect to tx";
824 namespace beast = boost::beast;
825 namespace http = beast::http;
826 namespace websocket = beast::websocket;
828 using tcp = boost::asio::ip::tcp;
836 tcp::resolver resolver{ioc};
839 auto ws = std::make_unique<websocket::stream<tcp::socket>>(ioc);
842 auto const results = resolver.resolve(
ip_,
wsPort_);
846 net::connect(ws->next_layer(), results.begin(), results.end());
852 ws->set_option(websocket::stream_base::decorator(
853 [&context](websocket::request_type& req) {
855 http::field::user_agent,
857 " websocket-client-coro");
859 http::field::forwarded,
866 ws->handshake(
ip_,
"/");
872 ws->write(net::buffer(fastWriter.
write(request)));
874 beast::flat_buffer buffer;
879 static_cast<char const*
>(buffer.data().data()), response))
882 response[jss::error] =
"Error parsing response from tx";
886 response[
"forwarded"] =
true;
896 template <
class Func>
900 srand((
unsigned)time(0));
901 auto sourceIdx = rand() %
sources_.size();
902 auto numAttempts = 0;
910 <<
"Attempting to execute func. ledger sequence = "
911 << ledgerSequence <<
" - source = " << source->toString();
912 if (source->hasLedger(ledgerSequence))
914 bool res = f(source);
919 <<
"Successfully executed func at source = "
920 << source->toString()
921 <<
" - ledger sequence = " << ledgerSequence;
928 <<
"Failed to execute func at source = "
929 << source->toString()
930 <<
" - ledger sequence = " << ledgerSequence;
937 <<
"Ledger not present at source = " << source->toString()
938 <<
" - ledger sequence = " << ledgerSequence;
940 sourceIdx = (sourceIdx + 1) %
sources_.size();
942 if (numAttempts %
sources_.size() == 0)
952 <<
"Error executing function. "
953 <<
" Tried all sources, but ledger was found in db."
954 <<
" Sequence = " << ledgerSequence;
959 <<
"Error executing function "
960 <<
" - ledger sequence = " << ledgerSequence
961 <<
" - Tried all sources. Sleeping and trying again";