rippled
WorkBase.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2016 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_APP_MISC_DETAIL_WORKBASE_H_INCLUDED
21 #define RIPPLE_APP_MISC_DETAIL_WORKBASE_H_INCLUDED
22 
23 #include <ripple/app/misc/detail/Work.h>
24 #include <ripple/basics/random.h>
25 #include <ripple/protocol/BuildInfo.h>
26 
27 #include <boost/asio.hpp>
28 #include <boost/beast/core/multi_buffer.hpp>
29 #include <boost/beast/http/empty_body.hpp>
30 #include <boost/beast/http/read.hpp>
31 #include <boost/beast/http/write.hpp>
32 
33 #include <vector>
34 
35 namespace ripple {
36 
37 namespace detail {
38 
39 template <class Impl>
40 class WorkBase : public Work
41 {
42 protected:
43  using error_code = boost::system::error_code;
44  using endpoint_type = boost::asio::ip::tcp::endpoint;
45 
46 public:
48  void(error_code const&, endpoint_type const&, response_type&&)>;
49 
50 protected:
51  using socket_type = boost::asio::ip::tcp::socket;
52  using resolver_type = boost::asio::ip::tcp::resolver;
53  using results_type = boost::asio::ip::tcp::resolver::results_type;
54  using request_type =
55  boost::beast::http::request<boost::beast::http::empty_body>;
56 
61  boost::asio::io_service& ios_;
62  boost::asio::io_service::strand strand_;
67  boost::beast::multi_buffer readBuf_;
70 
71 public:
72  WorkBase(
73  std::string const& host,
74  std::string const& path,
75  std::string const& port,
76  boost::asio::io_service& ios,
77  endpoint_type const& lastEndpoint,
78  bool lastStatus,
79  callback_type cb);
80  ~WorkBase();
81 
82  Impl&
83  impl()
84  {
85  return *static_cast<Impl*>(this);
86  }
87 
88  void
89  run() override;
90 
91  void
92  cancel() override;
93 
94  void
95  fail(error_code const& ec);
96 
97  void
98  onResolve(error_code const& ec, results_type results);
99 
100  void
101  onStart();
102 
103  void
104  onRequest(error_code const& ec);
105 
106  void
107  onResponse(error_code const& ec);
108 
109 private:
110  void
111  close();
112 };
113 
114 //------------------------------------------------------------------------------
115 
116 template <class Impl>
118  std::string const& host,
119  std::string const& path,
120  std::string const& port,
121  boost::asio::io_service& ios,
122  endpoint_type const& lastEndpoint,
123  bool lastStatus,
124  callback_type cb)
125  : host_(host)
126  , path_(path)
127  , port_(port)
128  , cb_(std::move(cb))
129  , ios_(ios)
130  , strand_(ios)
131  , resolver_(ios)
132  , socket_(ios)
133  , lastEndpoint_{lastEndpoint}
134  , lastStatus_(lastStatus)
135 {
136 }
137 
138 template <class Impl>
140 {
141  if (cb_)
142  cb_(make_error_code(boost::system::errc::not_a_socket),
143  lastEndpoint_,
144  std::move(res_));
145  close();
146 }
147 
148 template <class Impl>
149 void
151 {
152  if (!strand_.running_in_this_thread())
153  return ios_.post(
154  strand_.wrap(std::bind(&WorkBase::run, impl().shared_from_this())));
155 
156  resolver_.async_resolve(
157  host_,
158  port_,
159  strand_.wrap(std::bind(
160  &WorkBase::onResolve,
161  impl().shared_from_this(),
162  std::placeholders::_1,
163  std::placeholders::_2)));
164 }
165 
166 template <class Impl>
167 void
169 {
170  if (!strand_.running_in_this_thread())
171  {
172  return ios_.post(strand_.wrap(
173  std::bind(&WorkBase::cancel, impl().shared_from_this())));
174  }
175 
176  error_code ec;
177  resolver_.cancel();
178  socket_.cancel(ec);
179 }
180 
181 template <class Impl>
182 void
184 {
185  if (cb_)
186  {
187  cb_(ec, lastEndpoint_, std::move(res_));
188  cb_ = nullptr;
189  }
190 }
191 
192 template <class Impl>
193 void
195 {
196  if (ec)
197  return fail(ec);
198 
199  // Use last endpoint if it is successfully connected
200  // and is in the list, otherwise pick a random endpoint
201  // from the list (excluding last endpoint). If there is
202  // only one endpoint and it is the last endpoint then
203  // use the last endpoint.
204  lastEndpoint_ = [&]() -> endpoint_type {
205  int foundIndex = 0;
206  auto const foundIt = std::find_if(
207  results.begin(), results.end(), [&](endpoint_type const& e) {
208  if (e == lastEndpoint_)
209  return true;
210  foundIndex++;
211  return false;
212  });
213  if (foundIt != results.end() && lastStatus_)
214  return lastEndpoint_;
215  else if (results.size() == 1)
216  return *results.begin();
217  else if (foundIt == results.end())
218  return *std::next(results.begin(), rand_int(results.size() - 1));
219 
220  // lastEndpoint_ is part of the collection
221  // Pick a random number from the n-1 valid choices, if we use
222  // this as an index, note the last element will never be chosen
223  // and the `lastEndpoint_` index may be chosen. So when the
224  // `lastEndpoint_` index is chosen, that is treated as if the
225  // last element was chosen.
226  auto randIndex =
227  (results.size() > 2) ? rand_int(results.size() - 2) : 0;
228  if (randIndex == foundIndex)
229  randIndex = results.size() - 1;
230  return *std::next(results.begin(), randIndex);
231  }();
232 
233  socket_.async_connect(
234  lastEndpoint_,
235  strand_.wrap(std::bind(
236  &Impl::onConnect,
237  impl().shared_from_this(),
238  std::placeholders::_1)));
239 }
240 
241 template <class Impl>
242 void
244 {
245  req_.method(boost::beast::http::verb::get);
246  req_.target(path_.empty() ? "/" : path_);
247  req_.version(11);
248  req_.set("Host", host_ + ":" + port_);
249  req_.set("User-Agent", BuildInfo::getFullVersionString());
250  req_.prepare_payload();
251  boost::beast::http::async_write(
252  impl().stream(),
253  req_,
254  strand_.wrap(std::bind(
255  &WorkBase::onRequest,
256  impl().shared_from_this(),
257  std::placeholders::_1)));
258 }
259 
260 template <class Impl>
261 void
263 {
264  if (ec)
265  return fail(ec);
266 
267  boost::beast::http::async_read(
268  impl().stream(),
269  readBuf_,
270  res_,
271  strand_.wrap(std::bind(
272  &WorkBase::onResponse,
273  impl().shared_from_this(),
274  std::placeholders::_1)));
275 }
276 
277 template <class Impl>
278 void
280 {
281  if (ec)
282  return fail(ec);
283 
284  close();
285  assert(cb_);
286  cb_(ec, lastEndpoint_, std::move(res_));
287  cb_ = nullptr;
288 }
289 
290 template <class Impl>
291 void
293 {
294  if (socket_.is_open())
295  {
296  error_code ec;
297  socket_.shutdown(boost::asio::socket_base::shutdown_send, ec);
298  if (ec)
299  return;
300  socket_.close(ec);
301  }
302 }
303 
304 } // namespace detail
305 
306 } // namespace ripple
307 
308 #endif
ripple::detail::WorkBase::close
void close()
Definition: WorkBase.h:292
ripple::detail::WorkBase::resolver_
resolver_type resolver_
Definition: WorkBase.h:63
ripple::detail::WorkBase< WorkPlain >::endpoint_type
boost::asio::ip::tcp::endpoint endpoint_type
Definition: WorkBase.h:44
std::bind
T bind(T... args)
std::string
STL class.
ripple::detail::WorkBase::impl
Impl & impl()
Definition: WorkBase.h:83
ripple::detail::WorkBase< WorkPlain >::request_type
boost::beast::http::request< boost::beast::http::empty_body > request_type
Definition: WorkBase.h:55
ripple::detail::WorkBase::~WorkBase
~WorkBase()
Definition: WorkBase.h:139
ripple::detail::WorkBase::fail
void fail(error_code const &ec)
Definition: WorkBase.h:183
vector
std::find_if
T find_if(T... args)
ripple::detail::WorkBase::lastEndpoint_
endpoint_type lastEndpoint_
Definition: WorkBase.h:68
ripple::detail::WorkBase::ios_
boost::asio::io_service & ios_
Definition: WorkBase.h:61
ripple::detail::WorkBase< WorkPlain >::resolver_type
boost::asio::ip::tcp::resolver resolver_type
Definition: WorkBase.h:52
ripple::detail::WorkBase::readBuf_
boost::beast::multi_buffer readBuf_
Definition: WorkBase.h:67
ripple::detail::WorkBase::host_
std::string host_
Definition: WorkBase.h:57
ripple::detail::WorkBase::socket_
socket_type socket_
Definition: WorkBase.h:64
std::function< void(error_code const &, endpoint_type const &, response_type &&)>
ripple::detail::response_type
boost::beast::http::response< boost::beast::http::string_body > response_type
Definition: Work.h:31
ripple::detail::WorkBase::req_
request_type req_
Definition: WorkBase.h:65
ripple::rand_int
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
Definition: ripple/basics/random.h:115
ripple::detail::WorkBase< WorkPlain >::results_type
boost::asio::ip::tcp::resolver::results_type results_type
Definition: WorkBase.h:53
ripple::detail::WorkBase::onStart
void onStart()
Definition: WorkBase.h:243
ripple::detail::WorkBase::onResolve
void onResolve(error_code const &ec, results_type results)
Definition: WorkBase.h:194
ripple::detail::WorkBase::port_
std::string port_
Definition: WorkBase.h:59
ripple::detail::WorkBase
Definition: WorkBase.h:40
ripple::detail::WorkBase::cb_
callback_type cb_
Definition: WorkBase.h:60
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::detail::WorkBase::WorkBase
WorkBase(std::string const &host, std::string const &path, std::string const &port, boost::asio::io_service &ios, endpoint_type const &lastEndpoint, bool lastStatus, callback_type cb)
Definition: WorkBase.h:117
ripple::detail::WorkBase::cancel
void cancel() override
Definition: WorkBase.h:168
std
STL namespace.
ripple::detail::WorkBase::run
void run() override
Definition: WorkBase.h:150
ripple::detail::WorkBase< WorkPlain >::error_code
boost::system::error_code error_code
Definition: WorkBase.h:43
ripple::detail::WorkBase::onRequest
void onRequest(error_code const &ec)
Definition: WorkBase.h:262
ripple::detail::WorkBase::strand_
boost::asio::io_service::strand strand_
Definition: WorkBase.h:62
ripple::detail::WorkBase< WorkPlain >::socket_type
boost::asio::ip::tcp::socket socket_type
Definition: WorkBase.h:51
ripple::detail::WorkBase::onResponse
void onResponse(error_code const &ec)
Definition: WorkBase.h:279
ripple::detail::WorkBase::lastStatus_
bool lastStatus_
Definition: WorkBase.h:69
ripple::detail::WorkBase::path_
std::string path_
Definition: WorkBase.h:58
ripple::detail::WorkBase::res_
response_type res_
Definition: WorkBase.h:66
ripple::detail::Work
Definition: Work.h:33
std::next
T next(T... args)