rippled
ResolverAsio.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/Log.h>
21 #include <ripple/basics/ResolverAsio.h>
22 #include <ripple/beast/net/IPAddressConversion.h>
23 #include <ripple/beast/net/IPEndpoint.h>
24 #include <boost/asio.hpp>
25 #include <atomic>
26 #include <cassert>
27 #include <condition_variable>
28 #include <deque>
29 #include <locale>
30 #include <memory>
31 #include <mutex>
32 
33 namespace ripple {
34 
39 template <class Derived>
41 {
42 protected:
44  {
45  }
46 
47 public:
49  {
50  // Destroying the object with I/O pending? Not a clean exit!
51  assert(m_pending.load() == 0);
52  }
53 
59  {
60  public:
61  explicit CompletionCounter(Derived* owner) : m_owner(owner)
62  {
63  ++m_owner->m_pending;
64  }
65 
67  : m_owner(other.m_owner)
68  {
69  ++m_owner->m_pending;
70  }
71 
73  {
74  if (--m_owner->m_pending == 0)
75  m_owner->asyncHandlersComplete();
76  }
77 
79  operator=(CompletionCounter const&) = delete;
80 
81  private:
82  Derived* m_owner;
83  };
84 
85  void
87  {
88  ++m_pending;
89  }
90 
91  void
93  {
94  if (--m_pending == 0)
95  (static_cast<Derived*>(this))->asyncHandlersComplete();
96  }
97 
98 private:
99  // The number of handlers pending.
101 };
102 
104  public AsyncObject<ResolverAsioImpl>
105 {
106 public:
108 
110 
111  boost::asio::io_service& m_io_service;
112  boost::asio::io_service::strand m_strand;
113  boost::asio::ip::tcp::resolver m_resolver;
114 
118 
121 
122  // Represents a unit of work for the resolver to do
123  struct Work
124  {
127 
128  template <class StringSequence>
129  Work(StringSequence const& names_, HandlerType const& handler_)
130  : handler(handler_)
131  {
132  names.reserve(names_.size());
133 
135  names_.begin(), names_.end(), std::back_inserter(names));
136  }
137  };
138 
140 
142  boost::asio::io_service& io_service,
143  beast::Journal journal)
144  : m_journal(journal)
145  , m_io_service(io_service)
146  , m_strand(io_service)
147  , m_resolver(io_service)
149  , m_stop_called(false)
150  , m_stopped(true)
151  {
152  }
153 
154  ~ResolverAsioImpl() override
155  {
156  assert(m_work.empty());
157  assert(m_stopped);
158  }
159 
160  //-------------------------------------------------------------------------
161  // AsyncObject
162  void
164  {
167  m_cv.notify_all();
168  }
169 
170  //--------------------------------------------------------------------------
171  //
172  // Resolver
173  //
174  //--------------------------------------------------------------------------
175 
176  void
177  start() override
178  {
179  assert(m_stopped == true);
180  assert(m_stop_called == false);
181 
182  if (m_stopped.exchange(false) == true)
183  {
184  {
186  m_asyncHandlersCompleted = false;
187  }
188  addReference();
189  }
190  }
191 
192  void
193  stop_async() override
194  {
195  if (m_stop_called.exchange(true) == false)
196  {
197  m_io_service.dispatch(m_strand.wrap(std::bind(
198  &ResolverAsioImpl::do_stop, this, CompletionCounter(this))));
199 
200  JLOG(m_journal.debug()) << "Queued a stop request";
201  }
202  }
203 
204  void
205  stop() override
206  {
207  stop_async();
208 
209  JLOG(m_journal.debug()) << "Waiting to stop";
211  m_cv.wait(lk, [this] { return m_asyncHandlersCompleted; });
212  lk.unlock();
213  JLOG(m_journal.debug()) << "Stopped";
214  }
215 
216  void
217  resolve(std::vector<std::string> const& names, HandlerType const& handler)
218  override
219  {
220  assert(m_stop_called == false);
221  assert(!names.empty());
222 
223  // TODO NIKB use rvalue references to construct and move
224  // reducing cost.
225  m_io_service.dispatch(m_strand.wrap(std::bind(
227  this,
228  names,
229  handler,
230  CompletionCounter(this))));
231  }
232 
233  //-------------------------------------------------------------------------
234  // Resolver
235  void do_stop(CompletionCounter)
236  {
237  assert(m_stop_called == true);
238 
239  if (m_stopped.exchange(true) == false)
240  {
241  m_work.clear();
242  m_resolver.cancel();
243 
244  removeReference();
245  }
246  }
247 
248  void
250  std::string name,
251  boost::system::error_code const& ec,
252  HandlerType handler,
253  boost::asio::ip::tcp::resolver::iterator iter,
254  CompletionCounter)
255  {
256  if (ec == boost::asio::error::operation_aborted)
257  return;
258 
260 
261  // If we get an error message back, we don't return any
262  // results that we may have gotten.
263  if (!ec)
264  {
265  while (iter != boost::asio::ip::tcp::resolver::iterator())
266  {
267  addresses.push_back(
269  ++iter;
270  }
271  }
272 
273  handler(name, addresses);
274 
275  m_io_service.post(m_strand.wrap(std::bind(
276  &ResolverAsioImpl::do_work, this, CompletionCounter(this))));
277  }
278 
280  parseName(std::string const& str)
281  {
282  // first attempt to parse as an endpoint (IP addr + port).
283  // If that doesn't succeed, fall back to generic name + port parsing
284 
285  if (auto const result = beast::IP::Endpoint::from_string_checked(str))
286  {
287  return make_pair(
288  result->address().to_string(), std::to_string(result->port()));
289  }
290 
291  // generic name/port parsing, which doesn't work for
292  // IPv6 addresses in particular because it considers a colon
293  // a port separator
294 
295  // Attempt to find the first and last non-whitespace
296  auto const find_whitespace = std::bind(
297  &std::isspace<std::string::value_type>,
298  std::placeholders::_1,
299  std::locale());
300 
301  auto host_first =
302  std::find_if_not(str.begin(), str.end(), find_whitespace);
303 
304  auto port_last =
305  std::find_if_not(str.rbegin(), str.rend(), find_whitespace).base();
306 
307  // This should only happen for all-whitespace strings
308  if (host_first >= port_last)
310 
311  // Attempt to find the first and last valid port separators
312  auto const find_port_separator = [](char const c) -> bool {
313  if (std::isspace(static_cast<unsigned char>(c)))
314  return true;
315 
316  if (c == ':')
317  return true;
318 
319  return false;
320  };
321 
322  auto host_last =
323  std::find_if(host_first, port_last, find_port_separator);
324 
325  auto port_first =
326  std::find_if_not(host_last, port_last, find_port_separator);
327 
328  return make_pair(
329  std::string(host_first, host_last),
330  std::string(port_first, port_last));
331  }
332 
333  void do_work(CompletionCounter)
334  {
335  if (m_stop_called == true)
336  return;
337 
338  // We don't have any work to do at this time
339  if (m_work.empty())
340  return;
341 
342  std::string const name(m_work.front().names.back());
343  HandlerType handler(m_work.front().handler);
344 
345  m_work.front().names.pop_back();
346 
347  if (m_work.front().names.empty())
348  m_work.pop_front();
349 
350  auto const [host, port] = parseName(name);
351 
352  if (host.empty())
353  {
354  JLOG(m_journal.error()) << "Unable to parse '" << name << "'";
355 
356  m_io_service.post(m_strand.wrap(std::bind(
357  &ResolverAsioImpl::do_work, this, CompletionCounter(this))));
358 
359  return;
360  }
361 
362  boost::asio::ip::tcp::resolver::query query(host, port);
363 
364  m_resolver.async_resolve(
365  query,
366  std::bind(
368  this,
369  name,
370  std::placeholders::_1,
371  handler,
372  std::placeholders::_2,
373  CompletionCounter(this)));
374  }
375 
376  void
378  std::vector<std::string> const& names,
379  HandlerType const& handler,
380  CompletionCounter)
381  {
382  assert(!names.empty());
383 
384  if (m_stop_called == false)
385  {
386  m_work.emplace_back(names, handler);
387 
388  JLOG(m_journal.debug())
389  << "Queued new job with " << names.size() << " tasks. "
390  << m_work.size() << " jobs outstanding.";
391 
392  if (m_work.size() > 0)
393  {
394  m_io_service.post(m_strand.wrap(std::bind(
396  this,
397  CompletionCounter(this))));
398  }
399  }
400  }
401 };
402 
403 //-----------------------------------------------------------------------------
404 
406 ResolverAsio::New(boost::asio::io_service& io_service, beast::Journal journal)
407 {
408  return std::make_unique<ResolverAsioImpl>(io_service, journal);
409 }
410 
411 //-----------------------------------------------------------------------------
412 Resolver::~Resolver() = default;
413 } // namespace ripple
ripple::AsyncObject::m_pending
std::atomic< int > m_pending
Definition: ResolverAsio.cpp:100
ripple::ResolverAsioImpl::m_work
std::deque< Work > m_work
Definition: ResolverAsio.cpp:139
std::reverse_copy
T reverse_copy(T... args)
locale
std::bind
T bind(T... args)
std::string
STL class.
ripple::ResolverAsioImpl::stop
void stop() override
Issue a synchronous stop request.
Definition: ResolverAsio.cpp:205
ripple::ResolverAsioImpl::Work
Definition: ResolverAsio.cpp:123
std::pair
std::vector::reserve
T reserve(T... args)
ripple::ResolverAsioImpl::m_io_service
boost::asio::io_service & m_io_service
Definition: ResolverAsio.cpp:111
ripple::ResolverAsioImpl::Work::names
std::vector< std::string > names
Definition: ResolverAsio.cpp:125
std::vector< std::string >
std::find_if_not
T find_if_not(T... args)
std::vector::size
T size(T... args)
ripple::Resolver::~Resolver
virtual ~Resolver()=0
std::back_inserter
T back_inserter(T... args)
ripple::ResolverAsioImpl::m_mut
std::mutex m_mut
Definition: ResolverAsio.cpp:116
ripple::ResolverAsioImpl::m_asyncHandlersCompleted
bool m_asyncHandlersCompleted
Definition: ResolverAsio.cpp:117
ripple::ResolverAsioImpl
Definition: ResolverAsio.cpp:103
std::lock_guard
STL class.
std::function
ripple::AsyncObject::CompletionCounter::CompletionCounter
CompletionCounter(Derived *owner)
Definition: ResolverAsio.cpp:61
ripple::ResolverAsio::New
static std::unique_ptr< ResolverAsio > New(boost::asio::io_service &, beast::Journal)
Definition: ResolverAsio.cpp:406
ripple::AsyncObject::CompletionCounter::operator=
CompletionCounter & operator=(CompletionCounter const &)=delete
ripple::AsyncObject::CompletionCounter::CompletionCounter
CompletionCounter(CompletionCounter const &other)
Definition: ResolverAsio.cpp:66
ripple::ResolverAsioImpl::parseName
HostAndPort parseName(std::string const &str)
Definition: ResolverAsio.cpp:280
ripple::ResolverAsioImpl::Work::handler
HandlerType handler
Definition: ResolverAsio.cpp:126
std::vector::push_back
T push_back(T... args)
beast::IPAddressConversion::from_asio
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Definition: IPAddressConversion.h:63
ripple::ResolverAsioImpl::HostAndPort
std::pair< std::string, std::string > HostAndPort
Definition: ResolverAsio.cpp:107
std::atomic::load
T load(T... args)
ripple::ResolverAsioImpl::Work::Work
Work(StringSequence const &names_, HandlerType const &handler_)
Definition: ResolverAsio.cpp:129
ripple::ResolverAsioImpl::m_journal
beast::Journal m_journal
Definition: ResolverAsio.cpp:109
ripple::ResolverAsioImpl::m_cv
std::condition_variable m_cv
Definition: ResolverAsio.cpp:115
std::unique_lock
STL class.
std::to_string
T to_string(T... args)
ripple::AsyncObject::CompletionCounter
RAII container that maintains the count of pending I/O.
Definition: ResolverAsio.cpp:58
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::AsyncObject::removeReference
void removeReference()
Definition: ResolverAsio.cpp:92
deque
ripple::ResolverAsioImpl::do_finish
void do_finish(std::string name, boost::system::error_code const &ec, HandlerType handler, boost::asio::ip::tcp::resolver::iterator iter, CompletionCounter)
Definition: ResolverAsio.cpp:249
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::condition_variable::wait
T wait(T... args)
atomic
ripple::AsyncObject::addReference
void addReference()
Definition: ResolverAsio.cpp:86
ripple::ResolverAsioImpl::resolve
void resolve(std::vector< std::string > const &names, HandlerType const &handler) override
Definition: ResolverAsio.cpp:217
ripple::ResolverAsioImpl::m_stopped
std::atomic< bool > m_stopped
Definition: ResolverAsio.cpp:120
ripple::ResolverAsioImpl::~ResolverAsioImpl
~ResolverAsioImpl() override
Definition: ResolverAsio.cpp:154
memory
std::string::rend
T rend(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::atomic::exchange
T exchange(T... args)
ripple::ResolverAsioImpl::stop_async
void stop_async() override
Issue an asynchronous stop request.
Definition: ResolverAsio.cpp:193
ripple::ResolverAsioImpl::start
void start() override
Issue a synchronous start request.
Definition: ResolverAsio.cpp:177
ripple::ResolverAsioImpl::asyncHandlersComplete
void asyncHandlersComplete()
Definition: ResolverAsio.cpp:163
ripple::ResolverAsioImpl::m_stop_called
std::atomic< bool > m_stop_called
Definition: ResolverAsio.cpp:119
std::string::begin
T begin(T... args)
cassert
ripple::AsyncObject::AsyncObject
AsyncObject()
Definition: ResolverAsio.cpp:43
ripple::ResolverAsioImpl::m_strand
boost::asio::io_service::strand m_strand
Definition: ResolverAsio.cpp:112
condition_variable
ripple::ResolverAsioImpl::ResolverAsioImpl
ResolverAsioImpl(boost::asio::io_service &io_service, beast::Journal journal)
Definition: ResolverAsio.cpp:141
ripple::ResolverAsio
Definition: ResolverAsio.h:29
std::vector::empty
T empty(T... args)
mutex
ripple::AsyncObject::CompletionCounter::m_owner
Derived * m_owner
Definition: ResolverAsio.cpp:82
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::AsyncObject::CompletionCounter::~CompletionCounter
~CompletionCounter()
Definition: ResolverAsio.cpp:72
std::make_pair
T make_pair(T... args)
std::string::end
T end(T... args)
ripple::AsyncObject
Mix-in to track when all pending I/O is complete.
Definition: ResolverAsio.cpp:40
ripple::ResolverAsioImpl::do_resolve
void do_resolve(std::vector< std::string > const &names, HandlerType const &handler, CompletionCounter)
Definition: ResolverAsio.cpp:377
std::unique_ptr
STL class.
ripple::ResolverAsioImpl::do_work
void do_work(CompletionCounter)
Definition: ResolverAsio.cpp:333
ripple::ResolverAsioImpl::m_resolver
boost::asio::ip::tcp::resolver m_resolver
Definition: ResolverAsio.cpp:113
beast::IP::Endpoint::from_string_checked
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Definition: IPEndpoint.cpp:35
std::condition_variable::notify_all
T notify_all(T... args)
std::string::rbegin
T rbegin(T... args)
ripple::ResolverAsioImpl::do_stop
void do_stop(CompletionCounter)
Definition: ResolverAsio.cpp:235
ripple::AsyncObject::~AsyncObject
~AsyncObject()
Definition: ResolverAsio.cpp:48