20 #ifndef RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
21 #define RIPPLE_SERVER_BASEWSPEER_H_INCLUDED
23 #include <ripple/basics/safe_cast.h>
24 #include <ripple/beast/utility/rngfill.h>
25 #include <ripple/crypto/csprng.h>
26 #include <ripple/protocol/BuildInfo.h>
27 #include <ripple/server/impl/BasePeer.h>
28 #include <ripple/server/impl/LowestLayer.h>
29 #include <boost/beast/core/multi_buffer.hpp>
30 #include <boost/beast/http/message.hpp>
31 #include <boost/beast/websocket.hpp>
38 template <
class Handler,
class Impl>
52 boost::beast::multi_buffer
rb_;
53 boost::beast::multi_buffer
wb_;
56 boost::beast::websocket::close_reason
cr_;
63 void(boost::beast::websocket::frame_type, boost::beast::string_view)>
67 template <
class Body,
class Headers>
71 boost::asio::executor
const& executor,
74 boost::beast::http::request<Body, Headers>&&
request,
96 boost::asio::ip::tcp::endpoint
const&
109 close(boost::beast::websocket::close_reason
const& reason)
override;
118 return *
static_cast<Impl*
>(
this);
153 boost::beast::websocket::frame_type kind,
154 boost::beast::string_view payload);
159 template <
class String>
166 template <
class Handler,
class Impl>
167 template <
class Body,
class Headers>
171 boost::asio::executor
const& executor,
174 boost::beast::http::request<Body, Headers>&& request,
176 :
BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
177 , request_(
std::move(request))
178 , timer_(
std::move(timer))
179 , payload_(
"12345678")
183 template <
class Handler,
class Impl>
187 if (!strand_.running_in_this_thread())
189 strand_,
std::bind(&BaseWSPeer::run, impl().shared_from_this()));
190 impl().ws_.set_option(port().pmd_options);
194 &BaseWSPeer::on_ping_pong,
196 std::placeholders::_1,
197 std::placeholders::_2);
198 impl().ws_.control_callback(control_callback_);
200 close_on_timer_ =
true;
201 impl().ws_.set_option(
202 boost::beast::websocket::stream_base::decorator([](
auto& res) {
204 boost::beast::http::field::server,
205 BuildInfo::getFullVersionString());
207 impl().ws_.async_accept(
212 &BaseWSPeer::on_ws_handshake,
213 impl().shared_from_this(),
214 std::placeholders::_1)));
217 template <
class Handler,
class Impl>
221 if (!strand_.running_in_this_thread())
225 &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
228 if (wq_.size() > port().ws_queue_limit)
230 cr_.code =
safe_cast<decltype(cr_.code)>(
231 boost::beast::websocket::close_code::policy_error);
232 cr_.reason =
"Policy error: client is too slow.";
233 JLOG(this->j_.
info()) << cr_.reason;
234 wq_.erase(
std::next(wq_.begin()), wq_.end());
238 wq_.emplace_back(std::move(w));
243 template <
class Handler,
class Impl>
247 close(boost::beast::websocket::close_reason{});
250 template <
class Handler,
class Impl>
253 boost::beast::websocket::close_reason
const& reason)
255 if (!strand_.running_in_this_thread())
256 return post(strand_, [
self = impl().shared_from_this(), reason] {
262 impl().ws_.async_close(
266 [
self = impl().shared_from_this()](
267 boost::beast::error_code
const& ec) {
277 template <
class Handler,
class Impl>
281 if (!strand_.running_in_this_thread())
284 std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
288 template <
class Handler,
class Impl>
293 return fail(ec,
"on_ws_handshake");
294 close_on_timer_ =
false;
298 template <
class Handler,
class Impl>
302 if (!strand_.running_in_this_thread())
305 std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
309 template <
class Handler,
class Impl>
314 return fail(ec,
"write");
315 auto& w = *wq_.front();
316 auto const result = w.prepare(
317 65536,
std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
318 if (boost::indeterminate(result.first))
322 impl().ws_.async_write_some(
323 static_cast<bool>(result.first),
328 &BaseWSPeer::on_write,
329 impl().shared_from_this(),
330 std::placeholders::_1)));
332 impl().ws_.async_write_some(
333 static_cast<bool>(result.first),
338 &BaseWSPeer::on_write_fin,
339 impl().shared_from_this(),
340 std::placeholders::_1)));
343 template <
class Handler,
class Impl>
348 return fail(ec,
"write_fin");
351 impl().ws_.async_close(
356 &BaseWSPeer::on_close,
357 impl().shared_from_this(),
358 std::placeholders::_1)));
359 else if (!wq_.empty())
363 template <
class Handler,
class Impl>
367 if (!strand_.running_in_this_thread())
370 std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
371 impl().ws_.async_read(
376 &BaseWSPeer::on_read,
377 impl().shared_from_this(),
378 std::placeholders::_1)));
381 template <
class Handler,
class Impl>
385 if (ec == boost::beast::websocket::error::closed)
388 return fail(ec,
"read");
389 auto const& data = rb_.data();
393 this->handler_.onWSMessage(impl().shared_from_this(), b);
394 rb_.consume(rb_.size());
397 template <
class Handler,
class Impl>
404 template <
class Handler,
class Impl>
412 timer_.expires_from_now(
413 remote_endpoint().address().is_loopback() ? timeoutLocal : timeout, ec);
415 return fail(ec,
"start_timer");
416 timer_.async_wait(bind_executor(
420 impl().shared_from_this(),
421 std::placeholders::_1)));
425 template <
class Handler,
class Impl>
433 template <
class Handler,
class Impl>
437 if (ec == boost::asio::error::operation_aborted)
439 ping_active_ =
false;
445 template <
class Handler,
class Impl>
448 boost::beast::websocket::frame_type kind,
449 boost::beast::string_view payload)
451 if (kind == boost::beast::websocket::frame_type::pong)
453 boost::beast::string_view p(payload_.begin());
456 close_on_timer_ =
false;
457 JLOG(this->j_.
trace()) <<
"got matching pong";
461 JLOG(this->j_.
trace()) <<
"got pong";
466 template <
class Handler,
class Impl>
470 if (ec == boost::asio::error::operation_aborted)
474 if (!close_on_timer_ || !ping_active_)
477 close_on_timer_ =
true;
481 impl().ws_.async_ping(
486 &BaseWSPeer::on_ping,
487 impl().shared_from_this(),
488 std::placeholders::_1)));
489 JLOG(this->j_.
trace()) <<
"sent ping";
492 ec = boost::system::errc::make_error_code(
493 boost::system::errc::timed_out);
498 template <
class Handler,
class Impl>
499 template <
class String>
503 assert(strand_.running_in_this_thread());
506 if (!ec_ && ec != boost::asio::error::operation_aborted)
509 JLOG(this->j_.
trace()) << what <<
": " << ec.message();