rippled
BaseHTTPPeer.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright(c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED
21 #define RIPPLE_SERVER_BASEHTTPPEER_H_INCLUDED
22 
23 #include <ripple/basics/Log.h>
24 #include <ripple/beast/net/IPAddressConversion.h>
25 #include <ripple/server/Session.h>
26 #include <ripple/server/impl/io_list.h>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/asio/spawn.hpp>
29 #include <boost/asio/ssl/stream.hpp>
30 #include <boost/asio/streambuf.hpp>
31 #include <boost/beast/core/stream_traits.hpp>
32 #include <boost/beast/http/dynamic_body.hpp>
33 #include <boost/beast/http/message.hpp>
34 #include <boost/beast/http/parser.hpp>
35 #include <boost/beast/http/read.hpp>
36 #include <atomic>
37 #include <cassert>
38 #include <chrono>
39 #include <functional>
40 #include <memory>
41 #include <mutex>
42 #include <type_traits>
43 #include <vector>
44 
45 namespace ripple {
46 
48 template <class Handler, class Impl>
49 class BaseHTTPPeer : public io_list::work, public Session
50 {
51 protected:
53  using error_code = boost::system::error_code;
54  using endpoint_type = boost::asio::ip::tcp::endpoint;
55  using yield_context = boost::asio::yield_context;
56 
57  enum {
58  // Size of our read/write buffer
59  bufferSize = 4 * 1024,
60 
61  // Max seconds without completing a message
63  timeoutSecondsLocal = 3 // used for localhost clients
64  };
65 
66  struct buffer
67  {
68  buffer(void const* ptr, std::size_t len)
69  : data(new char[len]), bytes(len), used(0)
70  {
71  memcpy(data.get(), ptr, len);
72  }
73 
77  };
78 
79  Port const& port_;
80  Handler& handler_;
81  boost::asio::executor_work_guard<boost::asio::executor> work_;
82  boost::asio::strand<boost::asio::executor> strand_;
85 
88 
89  boost::asio::streambuf read_buf_;
94  bool graceful_ = false;
95  bool complete_ = false;
96  boost::system::error_code ec_;
97 
98  int request_count_ = 0;
101 
102  //--------------------------------------------------------------------------
103 
104 public:
105  template <class ConstBufferSequence>
106  BaseHTTPPeer(
107  Port const& port,
108  Handler& handler,
109  boost::asio::executor const& executor,
111  endpoint_type remote_address,
112  ConstBufferSequence const& buffers);
113 
114  virtual ~BaseHTTPPeer();
115 
116  Session&
118  {
119  return *this;
120  }
121 
122  void
123  close() override;
124 
125 protected:
126  Impl&
128  {
129  return *static_cast<Impl*>(this);
130  }
131 
132  void
133  fail(error_code ec, char const* what);
134 
135  void
136  start_timer();
137 
138  void
139  cancel_timer();
140 
141  void
142  on_timer();
143 
144  void
145  do_read(yield_context do_yield);
146 
147  void
148  on_write(error_code const& ec, std::size_t bytes_transferred);
149 
150  void
151  do_writer(
152  std::shared_ptr<Writer> const& writer,
153  bool keep_alive,
154  yield_context do_yield);
155 
156  virtual void
157  do_request() = 0;
158 
159  virtual void
160  do_close() = 0;
161 
162  // Session
163 
165  journal() override
166  {
167  return journal_;
168  }
169 
170  Port const&
171  port() override
172  {
173  return port_;
174  }
175 
177  remoteAddress() override
178  {
180  }
181 
183  request() override
184  {
185  return message_;
186  }
187 
188  void
189  write(void const* buffer, std::size_t bytes) override;
190 
191  void
192  write(std::shared_ptr<Writer> const& writer, bool keep_alive) override;
193 
195  detach() override;
196 
197  void
198  complete() override;
199 
200  void
201  close(bool graceful) override;
202 };
203 
204 //------------------------------------------------------------------------------
205 
206 template <class Handler, class Impl>
207 template <class ConstBufferSequence>
209  Port const& port,
210  Handler& handler,
211  boost::asio::executor const& executor,
212  beast::Journal journal,
213  endpoint_type remote_address,
214  ConstBufferSequence const& buffers)
215  : port_(port)
216  , handler_(handler)
217  , work_(executor)
218  , strand_(executor)
219  , remote_address_(remote_address)
220  , journal_(journal)
221 {
222  read_buf_.commit(boost::asio::buffer_copy(
223  read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
224  static std::atomic<int> sid;
225  nid_ = ++sid;
226  id_ = std::string("#") + std::to_string(nid_) + " ";
227  JLOG(journal_.trace()) << id_ << "accept: " << remote_address_.address();
228 }
229 
230 template <class Handler, class Impl>
232 {
233  handler_.onClose(session(), ec_);
234  JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_
235  << ((request_count_ == 1) ? " request"
236  : " requests");
237 }
238 
239 template <class Handler, class Impl>
240 void
242 {
243  if (!strand_.running_in_this_thread())
244  return post(
245  strand_,
246  std::bind(
247  (void (BaseHTTPPeer::*)(void)) & BaseHTTPPeer::close,
248  impl().shared_from_this()));
249  boost::beast::get_lowest_layer(impl().stream_).close();
250 }
251 
252 //------------------------------------------------------------------------------
253 
254 template <class Handler, class Impl>
255 void
257 {
258  if (!ec_ && ec != boost::asio::error::operation_aborted)
259  {
260  ec_ = ec;
261  JLOG(journal_.trace())
262  << id_ << std::string(what) << ": " << ec.message();
263  boost::beast::get_lowest_layer(impl().stream_).close();
264  }
265 }
266 
267 template <class Handler, class Impl>
268 void
270 {
271  boost::beast::get_lowest_layer(impl().stream_)
272  .expires_after(std::chrono::seconds(
273  remote_address_.address().is_loopback() ? timeoutSecondsLocal
274  : timeoutSeconds));
275 }
276 
277 // Convenience for discarding the error code
278 template <class Handler, class Impl>
279 void
281 {
282  boost::beast::get_lowest_layer(impl().stream_).expires_never();
283 }
284 
285 // Called when session times out
286 template <class Handler, class Impl>
287 void
289 {
290  auto ec =
291  boost::system::errc::make_error_code(boost::system::errc::timed_out);
292  fail(ec, "timer");
293 }
294 
295 //------------------------------------------------------------------------------
296 
297 template <class Handler, class Impl>
298 void
300 {
301  complete_ = false;
302  error_code ec;
303  start_timer();
304  boost::beast::http::async_read(
305  impl().stream_, read_buf_, message_, do_yield[ec]);
306  cancel_timer();
307  if (ec == boost::beast::http::error::end_of_stream)
308  return do_close();
309  if (ec == boost::beast::error::timeout)
310  return on_timer();
311  if (ec)
312  return fail(ec, "http::read");
313  do_request();
314 }
315 
316 // Send everything in the write queue.
317 // The write queue must not be empty upon entry.
318 template <class Handler, class Impl>
319 void
321  error_code const& ec,
322  std::size_t bytes_transferred)
323 {
324  cancel_timer();
325  if (ec == boost::beast::error::timeout)
326  return on_timer();
327  if (ec)
328  return fail(ec, "write");
329  bytes_out_ += bytes_transferred;
330  {
331  std::lock_guard lock(mutex_);
332  wq2_.clear();
333  wq2_.reserve(wq_.size());
334  std::swap(wq2_, wq_);
335  }
336  if (!wq2_.empty())
337  {
339  v.reserve(wq2_.size());
340  for (auto const& b : wq2_)
341  v.emplace_back(b.data.get(), b.bytes);
342  start_timer();
343  return boost::asio::async_write(
344  impl().stream_,
345  v,
346  bind_executor(
347  strand_,
348  std::bind(
349  &BaseHTTPPeer::on_write,
350  impl().shared_from_this(),
351  std::placeholders::_1,
352  std::placeholders::_2)));
353  }
354  if (!complete_)
355  return;
356  if (graceful_)
357  return do_close();
358  boost::asio::spawn(
359  strand_,
360  std::bind(
362  impl().shared_from_this(),
363  std::placeholders::_1));
364 }
365 
366 template <class Handler, class Impl>
367 void
369  std::shared_ptr<Writer> const& writer,
370  bool keep_alive,
371  yield_context do_yield)
372 {
373  std::function<void(void)> resume;
374  {
375  auto const p = impl().shared_from_this();
376  resume = std::function<void(void)>([this, p, writer, keep_alive]() {
377  boost::asio::spawn(
378  strand_,
379  std::bind(
381  p,
382  writer,
383  keep_alive,
384  std::placeholders::_1));
385  });
386  }
387 
388  for (;;)
389  {
390  if (!writer->prepare(bufferSize, resume))
391  return;
392  error_code ec;
393  auto const bytes_transferred = boost::asio::async_write(
394  impl().stream_,
395  writer->data(),
396  boost::asio::transfer_at_least(1),
397  do_yield[ec]);
398  if (ec)
399  return fail(ec, "writer");
400  writer->consume(bytes_transferred);
401  if (writer->complete())
402  break;
403  }
404 
405  if (!keep_alive)
406  return do_close();
407 
408  boost::asio::spawn(
409  strand_,
410  std::bind(
412  impl().shared_from_this(),
413  std::placeholders::_1));
414 }
415 
416 //------------------------------------------------------------------------------
417 
418 // Send a copy of the data.
419 template <class Handler, class Impl>
420 void
422 {
423  if (bytes == 0)
424  return;
425  if ([&] {
426  std::lock_guard lock(mutex_);
427  wq_.emplace_back(buf, bytes);
428  return wq_.size() == 1 && wq2_.size() == 0;
429  }())
430  {
431  if (!strand_.running_in_this_thread())
432  return post(
433  strand_,
434  std::bind(
435  &BaseHTTPPeer::on_write,
436  impl().shared_from_this(),
437  error_code{},
438  0));
439  else
440  return on_write(error_code{}, 0);
441  }
442 }
443 
444 template <class Handler, class Impl>
445 void
447  std::shared_ptr<Writer> const& writer,
448  bool keep_alive)
449 {
450  boost::asio::spawn(bind_executor(
451  strand_,
452  std::bind(
454  impl().shared_from_this(),
455  writer,
456  keep_alive,
457  std::placeholders::_1)));
458 }
459 
460 // DEPRECATED
461 // Make the Session asynchronous
462 template <class Handler, class Impl>
465 {
466  return impl().shared_from_this();
467 }
468 
469 // DEPRECATED
470 // Called to indicate the response has been written(but not sent)
471 template <class Handler, class Impl>
472 void
474 {
475  if (!strand_.running_in_this_thread())
476  return post(
477  strand_,
478  std::bind(
480  impl().shared_from_this()));
481 
482  message_ = {};
483  complete_ = true;
484 
485  {
486  std::lock_guard lock(mutex_);
487  if (!wq_.empty() && !wq2_.empty())
488  return;
489  }
490 
491  // keep-alive
492  boost::asio::spawn(bind_executor(
493  strand_,
494  std::bind(
496  impl().shared_from_this(),
497  std::placeholders::_1)));
498 }
499 
500 // DEPRECATED
501 // Called from the Handler to close the session.
502 template <class Handler, class Impl>
503 void
505 {
506  if (!strand_.running_in_this_thread())
507  return post(
508  strand_,
509  std::bind(
510  (void (BaseHTTPPeer::*)(bool)) &
512  impl().shared_from_this(),
513  graceful));
514 
515  complete_ = true;
516  if (graceful)
517  {
518  graceful_ = true;
519  {
520  std::lock_guard lock(mutex_);
521  if (!wq_.empty() || !wq2_.empty())
522  return;
523  }
524  return do_close();
525  }
526 
527  boost::beast::get_lowest_layer(impl().stream_).close();
528 }
529 
530 } // namespace ripple
531 
532 #endif
std::chrono::system_clock
ripple::BaseHTTPPeer::nid_
std::size_t nid_
Definition: BaseHTTPPeer.h:87
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr
STL class.
ripple::BaseHTTPPeer::request
http_request_type & request() override
Returns the current HTTP request.
Definition: BaseHTTPPeer.h:183
ripple::BaseHTTPPeer::start_timer
void start_timer()
Definition: BaseHTTPPeer.h:269
ripple::BaseHTTPPeer
Represents an active connection.
Definition: BaseHTTPPeer.h:49
functional
ripple::BaseHTTPPeer::~BaseHTTPPeer
virtual ~BaseHTTPPeer()
Definition: BaseHTTPPeer.h:231
std::vector::reserve
T reserve(T... args)
ripple::BaseHTTPPeer::handler_
Handler & handler_
Definition: BaseHTTPPeer.h:80
vector
ripple::BaseHTTPPeer::complete_
bool complete_
Definition: BaseHTTPPeer.h:95
std::chrono::seconds
ripple::BaseHTTPPeer::impl
Impl & impl()
Definition: BaseHTTPPeer.h:127
std::unique_ptr::get
T get(T... args)
std::lock_guard
STL class.
ripple::BaseHTTPPeer::close
void close() override
Definition: BaseHTTPPeer.h:241
ripple::BaseHTTPPeer::buffer::bytes
std::size_t bytes
Definition: BaseHTTPPeer.h:75
ripple::BaseHTTPPeer::remote_address_
endpoint_type remote_address_
Definition: BaseHTTPPeer.h:83
std::function
ripple::BaseHTTPPeer::fail
void fail(error_code ec, char const *what)
Definition: BaseHTTPPeer.h:256
ripple::Session
Persistent state information for a connection session.
Definition: Session.h:40
beast::IPAddressConversion::from_asio
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Definition: IPAddressConversion.h:63
ripple::BaseHTTPPeer::message_
http_request_type message_
Definition: BaseHTTPPeer.h:90
ripple::BaseHTTPPeer::cancel_timer
void cancel_timer()
Definition: BaseHTTPPeer.h:280
ripple::BaseHTTPPeer::ec_
boost::system::error_code ec_
Definition: BaseHTTPPeer.h:96
ripple::BaseHTTPPeer::detach
std::shared_ptr< Session > detach() override
Detach the session.
Definition: BaseHTTPPeer.h:464
ripple::BaseHTTPPeer::request_count_
int request_count_
Definition: BaseHTTPPeer.h:98
ripple::BaseHTTPPeer::BaseHTTPPeer
BaseHTTPPeer(Port const &port, Handler &handler, boost::asio::executor const &executor, beast::Journal journal, endpoint_type remote_address, ConstBufferSequence const &buffers)
Definition: BaseHTTPPeer.h:208
ripple::BaseHTTPPeer::do_request
virtual void do_request()=0
ripple::BaseHTTPPeer::buffer::data
std::unique_ptr< char[]> data
Definition: BaseHTTPPeer.h:74
chrono
ripple::BaseHTTPPeer::timeoutSeconds
@ timeoutSeconds
Definition: BaseHTTPPeer.h:62
ripple::BaseHTTPPeer::mutex_
std::mutex mutex_
Definition: BaseHTTPPeer.h:93
ripple::BaseHTTPPeer::bufferSize
@ bufferSize
Definition: BaseHTTPPeer.h:59
ripple::BaseHTTPPeer::buffer::buffer
buffer(void const *ptr, std::size_t len)
Definition: BaseHTTPPeer.h:68
std::to_string
T to_string(T... args)
ripple::BaseHTTPPeer::session
Session & session()
Definition: BaseHTTPPeer.h:117
ripple::BaseHTTPPeer::remoteAddress
beast::IP::Endpoint remoteAddress() override
Returns the remote address of the connection.
Definition: BaseHTTPPeer.h:177
ripple::BaseHTTPPeer< Handler, PlainHTTPPeer< Handler > >::error_code
boost::system::error_code error_code
Definition: BaseHTTPPeer.h:53
ripple::BaseHTTPPeer::on_write
void on_write(error_code const &ec, std::size_t bytes_transferred)
Definition: BaseHTTPPeer.h:320
ripple::io_list::work
Definition: io_list.h:38
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::BaseHTTPPeer::graceful_
bool graceful_
Definition: BaseHTTPPeer.h:94
atomic
ripple::Port
Configuration information for a Server listening port.
Definition: Port.h:48
ripple::BaseHTTPPeer::do_close
virtual void do_close()=0
ripple::BaseHTTPPeer::bytes_out_
std::size_t bytes_out_
Definition: BaseHTTPPeer.h:100
ripple::BaseHTTPPeer::journal
beast::Journal journal() override
Returns the Journal to use for logging.
Definition: BaseHTTPPeer.h:165
memory
ripple::BaseHTTPPeer::port
Port const & port() override
Returns the Port settings for this connection.
Definition: BaseHTTPPeer.h:171
std::swap
T swap(T... args)
ripple::BaseHTTPPeer::id_
std::string id_
Definition: BaseHTTPPeer.h:86
ripple::BaseHTTPPeer::do_read
void do_read(yield_context do_yield)
Definition: BaseHTTPPeer.h:299
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::BaseHTTPPeer::port_
Port const & port_
Definition: BaseHTTPPeer.h:79
ripple::BaseHTTPPeer::bytes_in_
std::size_t bytes_in_
Definition: BaseHTTPPeer.h:99
ripple::BaseHTTPPeer::do_writer
void do_writer(std::shared_ptr< Writer > const &writer, bool keep_alive, yield_context do_yield)
Definition: BaseHTTPPeer.h:368
ripple::BaseHTTPPeer::wq2_
std::vector< buffer > wq2_
Definition: BaseHTTPPeer.h:92
ripple::BaseHTTPPeer::wq_
std::vector< buffer > wq_
Definition: BaseHTTPPeer.h:91
ripple::BaseHTTPPeer< Handler, PlainHTTPPeer< Handler > >::endpoint_type
boost::asio::ip::tcp::endpoint endpoint_type
Definition: BaseHTTPPeer.h:54
cassert
ripple::BaseHTTPPeer::read_buf_
boost::asio::streambuf read_buf_
Definition: BaseHTTPPeer.h:89
mutex
std::size_t
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
ripple::BaseHTTPPeer::write
void write(void const *buffer, std::size_t bytes) override
Definition: BaseHTTPPeer.h:421
ripple::BaseHTTPPeer< Handler, PlainHTTPPeer< Handler > >::yield_context
boost::asio::yield_context yield_context
Definition: BaseHTTPPeer.h:55
ripple::BaseHTTPPeer::timeoutSecondsLocal
@ timeoutSecondsLocal
Definition: BaseHTTPPeer.h:63
ripple::BaseHTTPPeer::buffer
Definition: BaseHTTPPeer.h:66
ripple::BaseHTTPPeer::journal_
const beast::Journal journal_
Definition: BaseHTTPPeer.h:84
ripple::BaseHTTPPeer::on_timer
void on_timer()
Definition: BaseHTTPPeer.h:288
ripple::http_request_type
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handshake.h:47
std::unique_ptr< char[]>
ripple::BaseHTTPPeer::complete
void complete() override
Indicate that the response is complete.
Definition: BaseHTTPPeer.h:473
ripple::BaseHTTPPeer::buffer::used
std::size_t used
Definition: BaseHTTPPeer.h:76
type_traits
ripple::BaseHTTPPeer::work_
boost::asio::executor_work_guard< boost::asio::executor > work_
Definition: BaseHTTPPeer.h:81
ripple::BaseHTTPPeer::strand_
boost::asio::strand< boost::asio::executor > strand_
Definition: BaseHTTPPeer.h:82