rippled
GRPCServer.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_CORE_GRPCSERVER_H_INCLUDED
21 #define RIPPLE_CORE_GRPCSERVER_H_INCLUDED
22 
23 #include <ripple/app/main/Application.h>
24 #include <ripple/core/JobQueue.h>
25 #include <ripple/net/InfoSub.h>
26 #include <ripple/protocol/ErrorCodes.h>
27 #include <ripple/resource/Charge.h>
28 #include <ripple/rpc/Context.h>
29 #include <ripple/rpc/GRPCHandlers.h>
30 #include <ripple/rpc/Role.h>
31 #include <ripple/rpc/impl/Handler.h>
32 #include <ripple/rpc/impl/RPCHelpers.h>
33 #include <ripple/rpc/impl/Tuning.h>
34 
35 #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
36 #include <grpcpp/grpcpp.h>
37 
38 namespace ripple {
39 
40 // Interface that CallData implements
41 class Processor
42 {
43 public:
44  virtual ~Processor() = default;
45 
46  Processor() = default;
47 
48  Processor(const Processor&) = delete;
49 
50  Processor&
51  operator=(const Processor&) = delete;
52 
53  // process a request that has arrived. Can only be called once per instance
54  virtual void
55  process() = 0;
56 
57  // create a new instance of this CallData object, with the same type
58  //(same template parameters) as original. This is called when a CallData
59  // object starts processing a request. Creating a new instance allows the
60  // server to handle additional requests while the first is being processed
62  clone() = 0;
63 
64  // true if this object has finished processing the request. Object will be
65  // deleted once this function returns true
66  virtual bool
67  isFinished() = 0;
68 };
69 
70 class GRPCServerImpl final
71 {
72 private:
73  // CompletionQueue returns events that have occurred, or events that have
74  // been cancelled
76 
78 
79  // The gRPC service defined by the .proto files
80  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_;
81 
83 
85 
87 
89 
91 
92  // typedef for function to bind a listener
93  // This is always of the form:
94  // org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
95  template <class Request, class Response>
96  using BindListener = std::function<void(
97  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService&,
98  grpc::ServerContext*,
99  Request*,
100  grpc::ServerAsyncResponseWriter<Response>*,
101  grpc::CompletionQueue*,
102  grpc::ServerCompletionQueue*,
103  void*)>;
104 
105  // typedef for actual handler (that populates a response)
106  // handlers are defined in rpc/GRPCHandlers.h
107  template <class Request, class Response>
110  // This implementation is currently limited to v1 of the API
111  static unsigned constexpr apiVersion = 1;
112 
113  template <class Request, class Response>
114  using Forward = std::function<grpc::Status(
115  org::xrpl::rpc::v1::XRPLedgerAPIService::Stub*,
116  grpc::ClientContext*,
117  Request,
118  Response*)>;
119 
120 public:
121  explicit GRPCServerImpl(Application& app);
122 
123  GRPCServerImpl(const GRPCServerImpl&) = delete;
124 
126  operator=(const GRPCServerImpl&) = delete;
127 
128  void
129  shutdown();
130 
131  // setup the server and listeners
132  // returns true if server started successfully
133  bool
134  start();
135 
136  // the main event loop
137  void
138  handleRpcs();
139 
140  // Create a CallData object for each RPC. Return created objects in vector
142  setupListeners();
143 
144 private:
145  // Class encompasing the state and logic needed to serve a request.
146  template <class Request, class Response>
147  class CallData
148  : public Processor,
149  public std::enable_shared_from_this<CallData<Request, Response>>
150  {
151  private:
152  // The means of communication with the gRPC runtime for an asynchronous
153  // server.
154  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service_;
155 
156  // The producer-consumer queue for asynchronous server notifications.
157  grpc::ServerCompletionQueue& cq_;
158 
159  // Context for the rpc, allowing to tweak aspects of it such as the use
160  // of compression, authentication, as well as to send metadata back to
161  // the client.
162  grpc::ServerContext ctx_;
163 
164  // true if finished processing request
165  // Note, this variable does not need to be atomic, since it is
166  // currently only accessed from one thread. However, isFinished(),
167  // which returns the value of this variable, is public facing. In the
168  // interest of avoiding future concurrency bugs, we make it atomic.
170 
172 
173  // What we get from the client.
174  Request request_;
175 
176  // The means to get back to the client.
177  grpc::ServerAsyncResponseWriter<Response> responder_;
178 
179  // Function that creates a listener for specific request type
181 
182  // Function that processes a request
184 
185  // Function to call to forward to another server
187 
188  // Condition required for this RPC
190 
191  // Load type for this RPC
193 
195 
196  public:
197  virtual ~CallData() = default;
198 
199  // Take in the "service" instance (in this case representing an
200  // asynchronous server) and the completion queue "cq" used for
201  // asynchronous communication with the gRPC runtime.
202  explicit CallData(
203  org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service,
204  grpc::ServerCompletionQueue& cq,
205  Application& app,
206  BindListener<Request, Response> bindListener,
209  RPC::Condition requiredCondition,
210  Resource::Charge loadType,
211  std::vector<boost::asio::ip::address> const& secureGatewayIPs);
212 
213  CallData(const CallData&) = delete;
214 
215  CallData&
216  operator=(const CallData&) = delete;
217 
218  virtual void
219  process() override;
220 
221  virtual bool
222  isFinished() override;
223 
225  clone() override;
226 
227  private:
228  // process the request. Called inside the coroutine passed to JobQueue
229  void
231 
232  // return load type of this RPC
234  getLoadType();
235 
236  // return the Role used for this RPC
237  Role
238  getRole(bool isUnlimited);
239 
240  // register endpoint with ResourceManager and return usage
242  getUsage();
243 
244  // Returns the ip of the client
245  // Empty optional if there was an error decoding the client ip
248 
249  // Returns the endpoint of the client.
250  // Empty optional if there was an error decoding the client
251  // endpoint
254 
255  // If the request was proxied through
256  // another rippled node, returns the ip of the originating client.
257  // Empty optional if request was not proxied or there was an error
258  // decoding the client ip
261 
262  // If the request was proxied through
263  // another rippled node, returns the endpoint of the originating client.
264  // Empty optional if request was not proxied or there was an error
265  // decoding the client endpoint
268 
269  // Returns the user specified in the request. Empty optional if no user
270  // was specified
272  getUser();
273 
274  // Sets is_unlimited in response to value of clientIsUnlimited
275  // Does nothing if is_unlimited is not a field of the response
276  void
277  setIsUnlimited(Response& response, bool isUnlimited);
278 
279  // True if the client is exempt from resource controls
280  bool
282 
283  // True if the request was proxied through another rippled node prior
284  // to arriving here
285  bool
286  wasForwarded();
287 
288  // forward request to a p2p node
289  void
291 
292  }; // CallData
293 
294 }; // GRPCServerImpl
295 
297 {
298 public:
299  explicit GRPCServer(Application& app) : impl_(app)
300  {
301  }
302 
303  GRPCServer(const GRPCServer&) = delete;
304 
305  GRPCServer&
306  operator=(const GRPCServer&) = delete;
307 
308  void
309  start();
310 
311  void
312  stop();
313 
314  ~GRPCServer();
315 
316 private:
319  bool running_ = false;
320 };
321 } // namespace ripple
322 #endif
ripple::Application
Definition: Application.h:115
ripple::Processor
Definition: GRPCServer.h:41
std::string
STL class.
std::shared_ptr
STL class.
ripple::GRPCServer::impl_
GRPCServerImpl impl_
Definition: GRPCServer.h:317
ripple::GRPCServerImpl::CallData::clone
std::shared_ptr< Processor > clone() override
Definition: GRPCServer.cpp:89
ripple::GRPCServerImpl::CallData::operator=
CallData & operator=(const CallData &)=delete
ripple::GRPCServerImpl::CallData::app_
Application & app_
Definition: GRPCServer.h:171
ripple::GRPCServerImpl::apiVersion
static constexpr unsigned apiVersion
Definition: GRPCServer.h:111
ripple::GRPCServerImpl::secureGatewayIPs_
std::vector< boost::asio::ip::address > secureGatewayIPs_
Definition: GRPCServer.h:88
ripple::Processor::process
virtual void process()=0
ripple::GRPCServerImpl::start
bool start()
Definition: GRPCServer.cpp:665
ripple::GRPCServerImpl::CallData::secureGatewayIPs_
std::vector< boost::asio::ip::address > const & secureGatewayIPs_
Definition: GRPCServer.h:194
std::vector
STL class.
ripple::GRPCServerImpl::shutdown
void shutdown()
Definition: GRPCServer.cpp:492
ripple::GRPCServerImpl::CallData::request_
Request request_
Definition: GRPCServer.h:174
ripple::GRPCServerImpl::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService service_
Definition: GRPCServer.h:80
ripple::GRPCServerImpl::requests_
std::vector< std::shared_ptr< Processor > > requests_
Definition: GRPCServer.h:77
ripple::GRPCServerImpl::CallData::cq_
grpc::ServerCompletionQueue & cq_
Definition: GRPCServer.h:157
ripple::GRPCServer::running_
bool running_
Definition: GRPCServer.h:319
ripple::GRPCServerImpl::CallData::requiredCondition_
RPC::Condition requiredCondition_
Definition: GRPCServer.h:189
ripple::GRPCServerImpl::server_
std::unique_ptr< grpc::Server > server_
Definition: GRPCServer.h:82
std::function
ripple::GRPCServerImpl::handleRpcs
void handleRpcs()
Definition: GRPCServer.cpp:514
ripple::GRPCServerImpl::CallData::isFinished
virtual bool isFinished() override
Definition: GRPCServer.cpp:273
ripple::GRPCServerImpl
Definition: GRPCServer.h:70
ripple::GRPCServerImpl::CallData::finished_
std::atomic_bool finished_
Definition: GRPCServer.h:169
ripple::GRPCServerImpl::CallData::handler_
Handler< Request, Response > handler_
Definition: GRPCServer.h:183
ripple::Processor::isFinished
virtual bool isFinished()=0
ripple::GRPCServerImpl::CallData::CallData
CallData(org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService &service, grpc::ServerCompletionQueue &cq, Application &app, BindListener< Request, Response > bindListener, Handler< Request, Response > handler, Forward< Request, Response > forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector< boost::asio::ip::address > const &secureGatewayIPs)
Definition: GRPCServer.cpp:60
ripple::GRPCServer::thread_
std::thread thread_
Definition: GRPCServer.h:318
ripple::GRPCServerImpl::CallData::service_
org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService & service_
Definition: GRPCServer.h:154
ripple::GRPCServerImpl::CallData::forward_
Forward< Request, Response > forward_
Definition: GRPCServer.h:186
ripple::GRPCServerImpl::cq_
std::unique_ptr< grpc::ServerCompletionQueue > cq_
Definition: GRPCServer.h:75
ripple::GRPCServerImpl::CallData::getUsage
Resource::Consumer getUsage()
Definition: GRPCServer.cpp:413
std::thread
STL class.
ripple::GRPCServer::operator=
GRPCServer & operator=(const GRPCServer &)=delete
ripple::RPC::GRPCContext
Definition: Context.h:70
ripple::Processor::operator=
Processor & operator=(const Processor &)=delete
std::enable_shared_from_this
ripple::GRPCServerImpl::operator=
GRPCServerImpl & operator=(const GRPCServerImpl &)=delete
ripple::GRPCServerImpl::GRPCServerImpl
GRPCServerImpl(Application &app)
Definition: GRPCServer.cpp:426
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::RPC::Condition
Condition
Definition: Handler.h:39
std::atomic_bool
ripple::GRPCServerImpl::CallData::getLoadType
Resource::Charge getLoadType()
Definition: GRPCServer.cpp:280
ripple::GRPCServerImpl::CallData::getProxiedClientEndpoint
std::optional< boost::asio::ip::tcp::endpoint > getProxiedClientEndpoint()
Definition: GRPCServer.cpp:352
ripple::GRPCServerImpl::CallData::wasForwarded
bool wasForwarded()
Definition: GRPCServer.cpp:299
ripple::GRPCServerImpl::CallData::setIsUnlimited
void setIsUnlimited(Response &response, bool isUnlimited)
Definition: GRPCServer.cpp:397
ripple::GRPCServerImpl::CallData::bindListener_
BindListener< Request, Response > bindListener_
Definition: GRPCServer.h:180
ripple::GRPCServer::start
void start()
Definition: GRPCServer.cpp:689
ripple::isUnlimited
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition: Role.cpp:124
ripple::GRPCServerImpl::CallData::loadType_
Resource::Charge loadType_
Definition: GRPCServer.h:192
ripple::GRPCServer::~GRPCServer
~GRPCServer()
Definition: GRPCServer.cpp:714
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Processor::Processor
Processor()=default
ripple::GRPCServerImpl::setupListeners
std::vector< std::shared_ptr< Processor > > setupListeners()
Definition: GRPCServer.cpp:585
ripple::GRPCServerImpl::CallData::ctx_
grpc::ServerContext ctx_
Definition: GRPCServer.h:162
ripple::GRPCServer
Definition: GRPCServer.h:296
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::Resource::Charge
A consumption charge.
Definition: Charge.h:30
ripple::GRPCServerImpl::CallData
Definition: GRPCServer.h:147
ripple::Processor::clone
virtual std::shared_ptr< Processor > clone()=0
ripple::GRPCServer::GRPCServer
GRPCServer(Application &app)
Definition: GRPCServer.h:299
ripple::GRPCServer::stop
void stop()
Definition: GRPCServer.cpp:704
ripple::GRPCServerImpl::CallData::getRole
Role getRole(bool isUnlimited)
Definition: GRPCServer.cpp:287
ripple::GRPCServerImpl::CallData::getProxiedClientIpAddress
std::optional< boost::asio::ip::address > getProxiedClientIpAddress()
Definition: GRPCServer.cpp:342
ripple::GRPCServerImpl::CallData::forwardToP2p
void forwardToP2p(RPC::GRPCContext< Request > &context)
Definition: GRPCServer.cpp:233
std::optional< boost::asio::ip::address >
ripple::GRPCServerImpl::CallData::getClientEndpoint
std::optional< boost::asio::ip::tcp::endpoint > getClientEndpoint()
Definition: GRPCServer.cpp:371
ripple::GRPCServerImpl::app_
Application & app_
Definition: GRPCServer.h:84
ripple::GRPCServerImpl::CallData::~CallData
virtual ~CallData()=default
ripple::Processor::~Processor
virtual ~Processor()=default
ripple::GRPCServerImpl::CallData::clientIsUnlimited
bool clientIsUnlimited()
Definition: GRPCServer.cpp:378
std::unique_ptr< grpc::ServerCompletionQueue >
ripple::GRPCServerImpl::journal_
beast::Journal journal_
Definition: GRPCServer.h:90
ripple::GRPCServerImpl::CallData::getUser
std::optional< std::string > getUser()
Definition: GRPCServer.cpp:316
ripple::Role
Role
Indicates the level of administrative permission to grant.
Definition: Role.h:43
ripple::GRPCServerImpl::CallData::process
virtual void process() override
Definition: GRPCServer.cpp:105
ripple::GRPCServerImpl::serverAddress_
std::string serverAddress_
Definition: GRPCServer.h:86
ripple::GRPCServerImpl::CallData::responder_
grpc::ServerAsyncResponseWriter< Response > responder_
Definition: GRPCServer.h:177
ripple::GRPCServerImpl::CallData::getClientIpAddress
std::optional< boost::asio::ip::address > getClientIpAddress()
Definition: GRPCServer.cpp:332