rippled
BaseWSPeer.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright(c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
21 #define RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
22 
23 #include <ripple/basics/safe_cast.h>
24 #include <ripple/beast/utility/rngfill.h>
25 #include <ripple/crypto/csprng.h>
26 #include <ripple/protocol/BuildInfo.h>
27 #include <ripple/server/impl/BasePeer.h>
28 #include <ripple/server/impl/LowestLayer.h>
29 #include <boost/beast/core/multi_buffer.hpp>
30 #include <boost/beast/http/message.hpp>
31 #include <boost/beast/websocket.hpp>
32 #include <cassert>
33 #include <functional>
34 
35 namespace ripple {
36 
38 template <class Handler, class Impl>
39 class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
40 {
41 protected:
43  using error_code = boost::system::error_code;
44  using endpoint_type = boost::asio::ip::tcp::endpoint;
45  using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
47 
48 private:
49  friend class BasePeer<Handler, Impl>;
50 
52  boost::beast::multi_buffer rb_;
53  boost::beast::multi_buffer wb_;
55  bool do_close_ = false;
56  boost::beast::websocket::close_reason cr_;
58  bool close_on_timer_ = false;
59  bool ping_active_ = false;
60  boost::beast::websocket::ping_data payload_;
63  void(boost::beast::websocket::frame_type, boost::beast::string_view)>
65 
66 public:
67  template <class Body, class Headers>
68  BaseWSPeer(
69  Port const& port,
70  Handler& handler,
71  boost::asio::executor const& executor,
72  waitable_timer timer,
73  endpoint_type remote_address,
74  boost::beast::http::request<Body, Headers>&& request,
75  beast::Journal journal);
76 
77  void
78  run() override;
79 
80  //
81  // WSSession
82  //
83 
84  Port const&
85  port() const override
86  {
87  return this->port_;
88  }
89 
90  http_request_type const&
91  request() const override
92  {
93  return this->request_;
94  }
95 
96  boost::asio::ip::tcp::endpoint const&
97  remote_endpoint() const override
98  {
99  return this->remote_address_;
100  }
101 
102  void
103  send(std::shared_ptr<WSMsg> w) override;
104 
105  void
106  close() override;
107 
108  void
109  close(boost::beast::websocket::close_reason const& reason) override;
110 
111  void
112  complete() override;
113 
114 protected:
115  Impl&
117  {
118  return *static_cast<Impl*>(this);
119  }
120 
121  void
122  on_ws_handshake(error_code const& ec);
123 
124  void
125  do_write();
126 
127  void
128  on_write(error_code const& ec);
129 
130  void
131  on_write_fin(error_code const& ec);
132 
133  void
134  do_read();
135 
136  void
137  on_read(error_code const& ec);
138 
139  void
140  on_close(error_code const& ec);
141 
142  void
143  start_timer();
144 
145  void
146  cancel_timer();
147 
148  void
149  on_ping(error_code const& ec);
150 
151  void
152  on_ping_pong(
153  boost::beast::websocket::frame_type kind,
154  boost::beast::string_view payload);
155 
156  void
157  on_timer(error_code ec);
158 
159  template <class String>
160  void
161  fail(error_code ec, String const& what);
162 };
163 
164 //------------------------------------------------------------------------------
165 
166 template <class Handler, class Impl>
167 template <class Body, class Headers>
169  Port const& port,
170  Handler& handler,
171  boost::asio::executor const& executor,
172  waitable_timer timer,
173  endpoint_type remote_address,
174  boost::beast::http::request<Body, Headers>&& request,
175  beast::Journal journal)
176  : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
177  , request_(std::move(request))
178  , timer_(std::move(timer))
179  , payload_("12345678") // ensures size is 8 bytes
180 {
181 }
182 
183 template <class Handler, class Impl>
184 void
186 {
187  if (!strand_.running_in_this_thread())
188  return post(
189  strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
190  impl().ws_.set_option(port().pmd_options);
191  // Must manage the control callback memory outside of the `control_callback`
192  // function
193  control_callback_ = std::bind(
194  &BaseWSPeer::on_ping_pong,
195  this,
196  std::placeholders::_1,
197  std::placeholders::_2);
198  impl().ws_.control_callback(control_callback_);
199  start_timer();
200  close_on_timer_ = true;
201  impl().ws_.set_option(
202  boost::beast::websocket::stream_base::decorator([](auto& res) {
203  res.set(
204  boost::beast::http::field::server,
205  BuildInfo::getFullVersionString());
206  }));
207  impl().ws_.async_accept(
208  request_,
209  bind_executor(
210  strand_,
211  std::bind(
212  &BaseWSPeer::on_ws_handshake,
213  impl().shared_from_this(),
214  std::placeholders::_1)));
215 }
216 
217 template <class Handler, class Impl>
218 void
220 {
221  if (!strand_.running_in_this_thread())
222  return post(
223  strand_,
224  std::bind(
225  &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
226  if (do_close_)
227  return;
228  if (wq_.size() > port().ws_queue_limit)
229  {
230  cr_.code = safe_cast<decltype(cr_.code)>(
231  boost::beast::websocket::close_code::policy_error);
232  cr_.reason = "Policy error: client is too slow.";
233  JLOG(this->j_.info()) << cr_.reason;
234  wq_.erase(std::next(wq_.begin()), wq_.end());
235  close(cr_);
236  return;
237  }
238  wq_.emplace_back(std::move(w));
239  if (wq_.size() == 1)
240  on_write({});
241 }
242 
243 template <class Handler, class Impl>
244 void
246 {
247  close(boost::beast::websocket::close_reason{});
248 }
249 
250 template <class Handler, class Impl>
251 void
253  boost::beast::websocket::close_reason const& reason)
254 {
255  if (!strand_.running_in_this_thread())
256  return post(strand_, [self = impl().shared_from_this(), reason] {
257  self->close(reason);
258  });
259  do_close_ = true;
260  if (wq_.empty())
261  {
262  impl().ws_.async_close(
263  reason,
264  bind_executor(
265  strand_,
266  [self = impl().shared_from_this()](
267  boost::beast::error_code const& ec) {
268  self->on_close(ec);
269  }));
270  }
271  else
272  {
273  cr_ = reason;
274  }
275 }
276 
277 template <class Handler, class Impl>
278 void
280 {
281  if (!strand_.running_in_this_thread())
282  return post(
283  strand_,
284  std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
285  do_read();
286 }
287 
288 template <class Handler, class Impl>
289 void
291 {
292  if (ec)
293  return fail(ec, "on_ws_handshake");
294  close_on_timer_ = false;
295  do_read();
296 }
297 
298 template <class Handler, class Impl>
299 void
301 {
302  if (!strand_.running_in_this_thread())
303  return post(
304  strand_,
305  std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
306  on_write({});
307 }
308 
309 template <class Handler, class Impl>
310 void
312 {
313  if (ec)
314  return fail(ec, "write");
315  auto& w = *wq_.front();
316  auto const result = w.prepare(
317  65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
318  if (boost::indeterminate(result.first))
319  return;
320  start_timer();
321  if (!result.first)
322  impl().ws_.async_write_some(
323  static_cast<bool>(result.first),
324  result.second,
325  bind_executor(
326  strand_,
327  std::bind(
328  &BaseWSPeer::on_write,
329  impl().shared_from_this(),
330  std::placeholders::_1)));
331  else
332  impl().ws_.async_write_some(
333  static_cast<bool>(result.first),
334  result.second,
335  bind_executor(
336  strand_,
337  std::bind(
338  &BaseWSPeer::on_write_fin,
339  impl().shared_from_this(),
340  std::placeholders::_1)));
341 }
342 
343 template <class Handler, class Impl>
344 void
346 {
347  if (ec)
348  return fail(ec, "write_fin");
349  wq_.pop_front();
350  if (do_close_)
351  impl().ws_.async_close(
352  cr_,
353  bind_executor(
354  strand_,
355  std::bind(
356  &BaseWSPeer::on_close,
357  impl().shared_from_this(),
358  std::placeholders::_1)));
359  else if (!wq_.empty())
360  on_write({});
361 }
362 
363 template <class Handler, class Impl>
364 void
366 {
367  if (!strand_.running_in_this_thread())
368  return post(
369  strand_,
370  std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
371  impl().ws_.async_read(
372  rb_,
373  bind_executor(
374  strand_,
375  std::bind(
376  &BaseWSPeer::on_read,
377  impl().shared_from_this(),
378  std::placeholders::_1)));
379 }
380 
381 template <class Handler, class Impl>
382 void
384 {
385  if (ec == boost::beast::websocket::error::closed)
386  return on_close({});
387  if (ec)
388  return fail(ec, "read");
389  auto const& data = rb_.data();
391  b.reserve(std::distance(data.begin(), data.end()));
392  std::copy(data.begin(), data.end(), std::back_inserter(b));
393  this->handler_.onWSMessage(impl().shared_from_this(), b);
394  rb_.consume(rb_.size());
395 }
396 
397 template <class Handler, class Impl>
398 void
400 {
401  cancel_timer();
402 }
403 
404 template <class Handler, class Impl>
405 void
407 {
408  // Max seconds without completing a message
409  static constexpr std::chrono::seconds timeout{30};
410  static constexpr std::chrono::seconds timeoutLocal{3};
411  error_code ec;
412  timer_.expires_from_now(
413  remote_endpoint().address().is_loopback() ? timeoutLocal : timeout, ec);
414  if (ec)
415  return fail(ec, "start_timer");
416  timer_.async_wait(bind_executor(
417  strand_,
418  std::bind(
420  impl().shared_from_this(),
421  std::placeholders::_1)));
422 }
423 
424 // Convenience for discarding the error code
425 template <class Handler, class Impl>
426 void
428 {
429  error_code ec;
430  timer_.cancel(ec);
431 }
432 
433 template <class Handler, class Impl>
434 void
436 {
437  if (ec == boost::asio::error::operation_aborted)
438  return;
439  ping_active_ = false;
440  if (!ec)
441  return;
442  fail(ec, "on_ping");
443 }
444 
445 template <class Handler, class Impl>
446 void
448  boost::beast::websocket::frame_type kind,
449  boost::beast::string_view payload)
450 {
451  if (kind == boost::beast::websocket::frame_type::pong)
452  {
453  boost::beast::string_view p(payload_.begin());
454  if (payload == p)
455  {
456  close_on_timer_ = false;
457  JLOG(this->j_.trace()) << "got matching pong";
458  }
459  else
460  {
461  JLOG(this->j_.trace()) << "got pong";
462  }
463  }
464 }
465 
466 template <class Handler, class Impl>
467 void
469 {
470  if (ec == boost::asio::error::operation_aborted)
471  return;
472  if (!ec)
473  {
474  if (!close_on_timer_ || !ping_active_)
475  {
476  start_timer();
477  close_on_timer_ = true;
478  ping_active_ = true;
479  // cryptographic is probably overkill..
480  beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
481  impl().ws_.async_ping(
482  payload_,
483  bind_executor(
484  strand_,
485  std::bind(
486  &BaseWSPeer::on_ping,
487  impl().shared_from_this(),
488  std::placeholders::_1)));
489  JLOG(this->j_.trace()) << "sent ping";
490  return;
491  }
492  ec = boost::system::errc::make_error_code(
493  boost::system::errc::timed_out);
494  }
495  fail(ec, "timer");
496 }
497 
498 template <class Handler, class Impl>
499 template <class String>
500 void
502 {
503  assert(strand_.running_in_this_thread());
504 
505  cancel_timer();
506  if (!ec_ && ec != boost::asio::error::operation_aborted)
507  {
508  ec_ = ec;
509  JLOG(this->j_.trace()) << what << ": " << ec.message();
510  ripple::get_lowest_layer(impl().ws_).socket().close(ec);
511  }
512 }
513 
514 } // namespace ripple
515 
516 #endif
ripple::BaseWSPeer::do_read
void do_read()
Definition: BaseWSPeer.h:365
ripple::BaseWSPeer::on_ws_handshake
void on_ws_handshake(error_code const &ec)
Definition: BaseWSPeer.h:290
std::chrono::system_clock
ripple::BaseWSPeer::close
void close() override
Definition: BaseWSPeer.h:245
ripple::BaseWSPeer::on_timer
void on_timer(error_code ec)
Definition: BaseWSPeer.h:468
ripple::BaseWSPeer::error_code
boost::system::error_code error_code
Definition: BaseWSPeer.h:43
std::bind
T bind(T... args)
ripple::BaseWSPeer::close_on_timer_
bool close_on_timer_
Definition: BaseWSPeer.h:58
std::shared_ptr
STL class.
std::list
STL class.
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::BaseWSPeer::impl
Impl & impl()
Definition: BaseWSPeer.h:116
ripple::BaseWSPeer::on_ping_pong
void on_ping_pong(boost::beast::websocket::frame_type kind, boost::beast::string_view payload)
Definition: BaseWSPeer.h:447
functional
ripple::BasePeer
Definition: BasePeer.h:37
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
ripple::BaseWSPeer::wb_
boost::beast::multi_buffer wb_
Definition: BaseWSPeer.h:53
ripple::BaseWSPeer::timer_
waitable_timer timer_
Definition: BaseWSPeer.h:57
std::back_inserter
T back_inserter(T... args)
ripple::BaseWSPeer::ping_active_
bool ping_active_
Definition: BaseWSPeer.h:59
std::chrono::seconds
ripple::crypto_prng
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
Definition: csprng.cpp:99
std::distance
T distance(T... args)
ripple::BaseWSPeer::cr_
boost::beast::websocket::close_reason cr_
Definition: BaseWSPeer.h:56
std::function
ripple::BaseWSPeer::on_write
void on_write(error_code const &ec)
Definition: BaseWSPeer.h:311
ripple::BaseWSPeer::wq_
std::list< std::shared_ptr< WSMsg > > wq_
Definition: BaseWSPeer.h:54
ripple::BaseWSPeer::run
void run() override
Definition: BaseWSPeer.h:185
ripple::BaseWSPeer::control_callback_
std::function< void(boost::beast::websocket::frame_type, boost::beast::string_view)> control_callback_
Definition: BaseWSPeer.h:64
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::waitable_timer
boost::asio::basic_waitable_timer< clock_type > waitable_timer
Definition: BasePeer.h:43
ripple::BaseWSPeer::ec_
error_code ec_
Definition: BaseWSPeer.h:61
ripple::BaseWSPeer::start_timer
void start_timer()
Definition: BaseWSPeer.h:406
ripple::BaseWSPeer::on_ping
void on_ping(error_code const &ec)
Definition: BaseWSPeer.h:435
ripple::BaseWSPeer::do_close_
bool do_close_
Definition: BaseWSPeer.h:55
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::error_code
boost::system::error_code error_code
Definition: BasePeer.h:41
ripple::BaseWSPeer::send
void send(std::shared_ptr< WSMsg > w) override
Send a WebSockets message.
Definition: BaseWSPeer.h:219
ripple::BaseWSPeer::payload_
boost::beast::websocket::ping_data payload_
Definition: BaseWSPeer.h:60
ripple::BaseWSPeer::remote_endpoint
boost::asio::ip::tcp::endpoint const & remote_endpoint() const override
Definition: BaseWSPeer.h:97
ripple::BaseWSPeer::on_write_fin
void on_write_fin(error_code const &ec)
Definition: BaseWSPeer.h:345
ripple::safe_cast
constexpr std::enable_if_t< std::is_same_v< typename Dest::unit_type, typename Src::unit_type > &&std::is_integral_v< typename Dest::value_type > &&std::is_integral_v< typename Src::value_type >, Dest > safe_cast(Src s) noexcept
Definition: FeeUnits.h:532
ripple::BaseWSPeer::on_close
void on_close(error_code const &ec)
Definition: BaseWSPeer.h:399
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::BasePeer::port_
Port const & port_
Definition: BasePeer.h:45
std::copy
T copy(T... args)
ripple::BaseWSPeer::do_write
void do_write()
Definition: BaseWSPeer.h:300
ripple::BasePeer< Handler, PlainWSPeer< Handler > >::endpoint_type
boost::asio::ip::tcp::endpoint endpoint_type
Definition: BasePeer.h:42
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::Port
Configuration information for a Server listening port.
Definition: Port.h:48
ripple::BaseWSPeer::complete
void complete() override
Indicate that the response is complete.
Definition: BaseWSPeer.h:279
ripple::BaseWSPeer::fail
void fail(error_code ec, String const &what)
Definition: BaseWSPeer.h:501
ripple::WSSession
Definition: WSSession.h:107
ripple::BaseWSPeer::cancel_timer
void cancel_timer()
Definition: BaseWSPeer.h:427
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
beast::rngfill
void rngfill(void *buffer, std::size_t bytes, Generator &g)
Definition: rngfill.h:33
std
STL namespace.
cassert
ripple::BaseWSPeer::request
http_request_type const & request() const override
Definition: BaseWSPeer.h:91
ripple::BaseWSPeer::on_read
void on_read(error_code const &ec)
Definition: BaseWSPeer.h:383
ripple::BasePeer::remote_address_
endpoint_type remote_address_
Definition: BasePeer.h:47
ripple::BaseWSPeer::rb_
boost::beast::multi_buffer rb_
Definition: BaseWSPeer.h:52
ripple::get_lowest_layer
decltype(auto) get_lowest_layer(T &t) noexcept
Definition: LowestLayer.h:35
ripple::http_request_type
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handshake.h:47
ripple::BaseWSPeer::BaseWSPeer
BaseWSPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, waitable_timer timer, endpoint_type remote_address, boost::beast::http::request< Body, Headers > &&request, beast::Journal journal)
Definition: BaseWSPeer.h:168
ripple::BaseWSPeer::request_
http_request_type request_
Definition: BaseWSPeer.h:51
ripple::BaseWSPeer::port
Port const & port() const override
Definition: BaseWSPeer.h:85
ripple::BaseWSPeer
Represents an active WebSocket connection.
Definition: BaseWSPeer.h:39
std::next
T next(T... args)