20 #include <ripple/app/main/GRPCServer.h>
21 #include <ripple/app/reporting/P2pProxy.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/resource/Fees.h>
25 #include <ripple/beast/net/IPAddressConversion.h>
43 peerClean = peer.
substr(first + 1);
59 template <
class Request,
class Response>
61 org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
62 grpc::ServerCompletionQueue& cq,
75 , bindListener_(
std::move(bindListener))
76 , handler_(
std::move(handler))
77 , forward_(
std::move(forward))
78 , requiredCondition_(
std::move(requiredCondition))
79 , loadType_(
std::move(loadType))
80 , secureGatewayIPs_(secureGatewayIPs)
87 template <
class Request,
class Response>
91 return std::make_shared<CallData<Request, Response>>(
103 template <
class Request,
class Response>
108 BOOST_ASSERT(!finished_);
111 this->shared_from_this();
126 thisShared->process(coro);
133 grpc::StatusCode::INTERNAL,
"Job Queue is already stopped"};
134 responder_.FinishWithError(status,
this);
138 template <
class Request,
class Response>
145 auto usage = getUsage();
150 grpc::StatusCode::RESOURCE_EXHAUSTED,
151 "usage balance exceeds threshold"};
152 responder_.FinishWithError(status,
this);
156 auto loadType = getLoadType();
157 usage.charge(loadType);
162 toLog <<
"role = " << (int)role;
164 toLog <<
" address = ";
165 if (
auto clientIp = getClientIpAddress())
166 toLog << clientIp.value();
169 if (
auto user = getUser())
170 toLog << user.value();
203 grpc::StatusCode::FAILED_PRECONDITION,
205 responder_.FinishWithError(status,
this);
214 responder_.Finish(result.
first, result.
second,
this);
226 grpc::Status status{grpc::StatusCode::INTERNAL, ex.
what()};
227 responder_.FinishWithError(status,
this);
231 template <
class Request,
class Response>
236 if (
auto descriptor =
237 Request::GetDescriptor()->FindFieldByName(
"client_ip"))
239 Request::GetReflection()->SetString(&request_, descriptor, ctx_.peer());
241 <<
"Set client_ip to " << ctx_.peer();
246 Throw<std::runtime_error>(
247 "Attempting to forward but no client_ip field in "
253 grpc::ClientContext clientContext;
255 auto status = forward_(stub.get(), &clientContext, request_, &response);
256 responder_.Finish(response, status,
this);
262 <<
"Failed to forward request to tx";
264 grpc::StatusCode::INTERNAL,
265 "Attempted to act as proxy but failed "
266 "to create forwarding stub"};
267 responder_.FinishWithError(status,
this);
271 template <
class Request,
class Response>
278 template <
class Request,
class Response>
285 template <
class Request,
class Response>
291 else if (wasForwarded())
297 template <
class Request,
class Response>
301 if (
auto descriptor =
302 Request::GetDescriptor()->FindFieldByName(
"client_ip"))
305 Request::GetReflection()->GetString(request_, descriptor);
306 if (!clientIp.
empty())
314 template <
class Request,
class Response>
318 if (
auto descriptor = Request::GetDescriptor()->FindFieldByName(
"user"))
321 Request::GetReflection()->GetString(request_, descriptor);
330 template <
class Request,
class Response>
334 auto endpoint = getClientEndpoint();
336 return endpoint->address();
340 template <
class Request,
class Response>
344 auto endpoint = getProxiedClientEndpoint();
346 return endpoint->address();
350 template <
class Request,
class Response>
354 auto descriptor = Request::GetDescriptor()->FindFieldByName(
"client_ip");
358 Request::GetReflection()->GetString(request_, descriptor);
359 if (!clientIp.
empty())
362 <<
"Got client_ip from request : " << clientIp;
363 return getEndpoint(clientIp);
369 template <
class Request,
class Response>
373 return getEndpoint(ctx_.peer());
376 template <
class Request,
class Response>
382 auto clientIp = getClientIpAddress();
383 auto proxiedIp = getProxiedClientIpAddress();
384 if (clientIp && !proxiedIp)
395 template <
class Request,
class Response>
403 if (
auto descriptor =
404 Response::GetDescriptor()->FindFieldByName(
"is_unlimited"))
406 Response::GetReflection()->SetBool(&response, descriptor,
true);
411 template <
class Request,
class Response>
415 auto endpoint = getClientEndpoint();
416 auto proxiedEndpoint = getProxiedClientEndpoint();
423 Throw<std::runtime_error>(
"Failed to get client endpoint");
434 auto const optIp = section.
get(
"ip");
438 auto const optPort = section.
get(
"port");
443 boost::asio::ip::tcp::endpoint endpoint(
444 boost::asio::ip::make_address(*optIp),
std::stoi(*optPort));
452 JLOG(
journal_.
error()) <<
"Error setting grpc server address";
453 Throw<std::runtime_error>(
"Error setting grpc server address");
456 auto const optSecureGateway = section.
get(
"secure_gateway");
457 if (optSecureGateway)
465 boost::algorithm::trim(ip);
466 auto const addr = boost::asio::ip::make_address(ip);
468 if (addr.is_unspecified())
471 <<
"Can't pass unspecified IP in "
472 <<
"secure_gateway section of port_grpc";
473 Throw<std::runtime_error>(
474 "Unspecified IP in secure_gateway section");
483 <<
"Error parsing secure gateway IPs for grpc server";
484 Throw<std::runtime_error>(
485 "Error parsing secure_gateway section");
510 JLOG(
journal_.
debug()) <<
"Completion Queue has been shutdown";
526 return sPtr.get() == ptr;
528 BOOST_ASSERT(it != requests.
end());
529 it->swap(requests.
back());
549 while (
cq_->Next(&tag, &ok))
553 <<
" ptr = " << ptr <<
" ok = " << ok;
558 <<
"Destroying object";
563 if (!ptr->isFinished())
565 JLOG(
journal_.
debug()) <<
"Received new request. Processing";
568 auto cloned = ptr->clone();
575 JLOG(
journal_.
debug()) <<
"Sent response. Destroying object";
589 auto addToRequests = [&requests](
auto callData) {
595 org::xrpl::rpc::v1::GetLedgerRequest,
596 org::xrpl::rpc::v1::GetLedgerResponse>;
598 addToRequests(std::make_shared<cd>(
602 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
605 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger,
612 org::xrpl::rpc::v1::GetLedgerDataRequest,
613 org::xrpl::rpc::v1::GetLedgerDataResponse>;
615 addToRequests(std::make_shared<cd>(
619 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
620 RequestGetLedgerData,
622 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData,
629 org::xrpl::rpc::v1::GetLedgerDiffRequest,
630 org::xrpl::rpc::v1::GetLedgerDiffResponse>;
632 addToRequests(std::make_shared<cd>(
636 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
637 RequestGetLedgerDiff,
639 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff,
646 org::xrpl::rpc::v1::GetLedgerEntryRequest,
647 org::xrpl::rpc::v1::GetLedgerEntryResponse>;
649 addToRequests(std::make_shared<cd>(
653 &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::
654 RequestGetLedgerEntry,
656 &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry,
673 grpc::ServerBuilder builder;
675 builder.AddListeningPort(
serverAddress_, grpc::InsecureServerCredentials());
681 cq_ = builder.AddCompletionQueue();
683 server_ = builder.BuildAndStart();