rippled
SNTPClock.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/Log.h>
21 #include <ripple/basics/random.h>
22 #include <ripple/beast/core/CurrentThreadName.h>
23 #include <ripple/core/impl/SNTPClock.h>
24 #include <boost/asio.hpp>
25 #include <cmath>
26 #include <deque>
27 #include <map>
28 #include <memory>
29 #include <mutex>
30 #include <optional>
31 #include <thread>
32 
33 namespace ripple {
34 
35 static uint8_t SNTPQueryData[48] = {
36  0x1B, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
37  0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
38 
39 using namespace std::chrono_literals;
40 // NTP query frequency - 4 minutes
41 auto constexpr NTP_QUERY_FREQUENCY = 4min;
42 
43 // NTP minimum interval to query same servers - 3 minutes
44 auto constexpr NTP_MIN_QUERY = 3min;
45 
46 // NTP sample window (should be odd)
47 #define NTP_SAMPLE_WINDOW 9
48 
49 // NTP timestamp constant
50 auto constexpr NTP_UNIX_OFFSET = 0x83AA7E80s;
51 
52 // NTP timestamp validity
54 
55 // SNTP packet offsets
56 #define NTP_OFF_INFO 0
57 #define NTP_OFF_ROOTDELAY 1
58 #define NTP_OFF_ROOTDISP 2
59 #define NTP_OFF_REFERENCEID 3
60 #define NTP_OFF_REFTS_INT 4
61 #define NTP_OFF_REFTS_FRAC 5
62 #define NTP_OFF_ORGTS_INT 6
63 #define NTP_OFF_ORGTS_FRAC 7
64 #define NTP_OFF_RECVTS_INT 8
65 #define NTP_OFF_RECVTS_FRAC 9
66 #define NTP_OFF_XMITTS_INT 10
67 #define NTP_OFF_XMITTS_FRAC 11
68 
69 class SNTPClientImp : public SNTPClock
70 {
71 private:
72  template <class Duration>
74 
76 
77  struct Query
78  {
79  bool replied;
82 
83  explicit Query(sys_seconds j = sys_seconds::max())
84  : replied(false), sent(j)
85  {
86  }
87  };
88 
90  std::mutex mutable mutex_;
92  boost::asio::io_service io_service_;
94 
96  boost::asio::ip::udp::socket socket_;
97  boost::asio::basic_waitable_timer<std::chrono::system_clock> timer_;
98  boost::asio::ip::udp::resolver resolver_;
104  boost::asio::ip::udp::endpoint ep_;
105 
106 public:
107  using error_code = boost::system::error_code;
108 
110  : j_(j)
111  , work_(io_service_)
112  , socket_(io_service_)
113  , timer_(io_service_)
114  , resolver_(io_service_)
115  , offset_(0)
116  , lastUpdate_(sys_seconds::max())
117  , buf_(256)
118  {
119  }
120 
121  ~SNTPClientImp() override
122  {
123  if (thread_.joinable())
124  {
125  error_code ec;
126  timer_.cancel(ec);
127  socket_.cancel(ec);
128  work_ = std::nullopt;
129  thread_.join();
130  }
131  }
132 
133  //--------------------------------------------------------------------------
134 
135  void
136  run(const std::vector<std::string>& servers) override
137  {
139 
140  if (it == servers.end())
141  {
142  JLOG(j_.info()) << "SNTP: no server specified";
143  return;
144  }
145 
146  {
147  std::lock_guard lock(mutex_);
148  for (auto const& item : servers)
149  servers_.emplace_back(item, sys_seconds::max());
150  }
151  queryAll();
152 
153  using namespace boost::asio;
154  socket_.open(ip::udp::v4());
155  socket_.bind(ep_);
156  socket_.async_receive_from(
157  buffer(buf_, 256),
158  ep_,
159  std::bind(
161  this,
162  std::placeholders::_1,
163  std::placeholders::_2));
164  timer_.expires_from_now(NTP_QUERY_FREQUENCY);
165  timer_.async_wait(
166  std::bind(&SNTPClientImp::onTimer, this, std::placeholders::_1));
167 
168  thread_ = std::thread(&SNTPClientImp::doRun, this);
169  }
170 
171  time_point
172  now() const override
173  {
174  std::lock_guard lock(mutex_);
175  using namespace std::chrono;
176  auto const when = time_point_cast<seconds>(clock_type::now());
177  if ((lastUpdate_ == sys_seconds::max()) ||
178  ((lastUpdate_ + NTP_TIMESTAMP_VALID) <
179  time_point_cast<seconds>(clock_type::now())))
180  return when;
181  return when + offset_;
182  }
183 
184  duration
185  offset() const override
186  {
187  std::lock_guard lock(mutex_);
188  return offset_;
189  }
190 
191  //--------------------------------------------------------------------------
192 
193  void
195  {
196  beast::setCurrentThreadName("rippled: SNTPClock");
197  io_service_.run();
198  }
199 
200  void
201  onTimer(error_code const& ec)
202  {
203  using namespace boost::asio;
204  if (ec == error::operation_aborted)
205  return;
206  if (ec)
207  {
208  JLOG(j_.error()) << "SNTPClock::onTimer: " << ec.message();
209  return;
210  }
211 
212  doQuery();
213  timer_.expires_from_now(NTP_QUERY_FREQUENCY);
214  timer_.async_wait(
215  std::bind(&SNTPClientImp::onTimer, this, std::placeholders::_1));
216  }
217 
218  void
219  onRead(error_code const& ec, std::size_t bytes_xferd)
220  {
221  using namespace boost::asio;
222  using namespace std::chrono;
223  if (ec == error::operation_aborted)
224  return;
225 
226  // VFALCO Should we return on any error?
227  /*
228  if (ec)
229  return;
230  */
231 
232  if (!ec)
233  {
234  JLOG(j_.trace()) << "SNTP: Packet from " << ep_;
235  std::lock_guard lock(mutex_);
236  auto const query = queries_.find(ep_);
237  if (query == queries_.end())
238  {
239  JLOG(j_.debug()) << "SNTP: Reply from " << ep_
240  << " found without matching query";
241  }
242  else if (query->second.replied)
243  {
244  JLOG(j_.debug()) << "SNTP: Duplicate response from " << ep_;
245  }
246  else
247  {
248  query->second.replied = true;
249 
250  if (time_point_cast<seconds>(clock_type::now()) >
251  (query->second.sent + 1s))
252  {
253  JLOG(j_.warn()) << "SNTP: Late response from " << ep_;
254  }
255  else if (bytes_xferd < 48)
256  {
257  JLOG(j_.warn()) << "SNTP: Short reply from " << ep_ << " ("
258  << bytes_xferd << ") " << buf_.size();
259  }
260  else if (
261  reinterpret_cast<std::uint32_t*>(
262  &buf_[0])[NTP_OFF_ORGTS_FRAC] != query->second.nonce)
263  {
264  JLOG(j_.warn())
265  << "SNTP: Reply from " << ep_ << "had wrong nonce";
266  }
267  else
268  {
269  processReply();
270  }
271  }
272  }
273 
274  socket_.async_receive_from(
275  buffer(buf_, 256),
276  ep_,
277  std::bind(
279  this,
280  std::placeholders::_1,
281  std::placeholders::_2));
282  }
283 
284  //--------------------------------------------------------------------------
285 
286  void
287  addServer(std::string const& server)
288  {
289  std::lock_guard lock(mutex_);
290  servers_.push_back(std::make_pair(server, sys_seconds::max()));
291  }
292 
293  void
295  {
296  while (doQuery())
297  {
298  }
299  }
300 
301  bool
303  {
304  std::lock_guard lock(mutex_);
305  auto best = servers_.end();
306 
307  for (auto iter = servers_.begin(), end = best; iter != end; ++iter)
308  if ((best == end) || (iter->second == sys_seconds::max()) ||
309  (iter->second < best->second))
310  best = iter;
311 
312  if (best == servers_.end())
313  {
314  JLOG(j_.trace()) << "SNTP: No server to query";
315  return false;
316  }
317 
318  using namespace std::chrono;
319  auto now = time_point_cast<seconds>(clock_type::now());
320 
321  if ((best->second != sys_seconds::max()) &&
322  ((best->second + NTP_MIN_QUERY) >= now))
323  {
324  JLOG(j_.trace()) << "SNTP: All servers recently queried";
325  return false;
326  }
327 
328  best->second = now;
329 
330  boost::asio::ip::udp::resolver::query query(
331  boost::asio::ip::udp::v4(), best->first, "ntp");
332  resolver_.async_resolve(
333  query,
334  std::bind(
336  this,
337  std::placeholders::_1,
338  std::placeholders::_2));
339  JLOG(j_.trace()) << "SNTPClock: Resolve pending for " << best->first;
340  return true;
341  }
342 
343  void
345  error_code const& ec,
346  boost::asio::ip::udp::resolver::iterator it)
347  {
348  using namespace boost::asio;
349  if (ec == error::operation_aborted)
350  return;
351  if (ec)
352  {
353  JLOG(j_.trace()) << "SNTPClock::resolveComplete: " << ec.message();
354  return;
355  }
356 
357  assert(it != ip::udp::resolver::iterator());
358 
359  auto sel = it;
360  int i = 1;
361 
362  while (++it != ip::udp::resolver::iterator())
363  {
364  if (rand_int(i++) == 0)
365  sel = it;
366  }
367 
368  if (sel != ip::udp::resolver::iterator())
369  {
370  std::lock_guard lock(mutex_);
371  Query& query = queries_[*sel];
372  using namespace std::chrono;
373  auto now = time_point_cast<seconds>(clock_type::now());
374 
375  if ((query.sent == now) || ((query.sent + 1s) == now))
376  {
377  // This can happen if the same IP address is reached through
378  // multiple names
379  JLOG(j_.trace()) << "SNTP: Redundant query suppressed";
380  return;
381  }
382 
383  query.replied = false;
384  query.sent = now;
385  query.nonce = rand_int<std::uint32_t>();
386  // The following line of code will overflow at 2036-02-07 06:28:16
387  // UTC
388  // due to the 32 bit cast.
389  reinterpret_cast<std::uint32_t*>(
390  SNTPQueryData)[NTP_OFF_XMITTS_INT] =
391  static_cast<std::uint32_t>(
392  (time_point_cast<seconds>(clock_type::now()) +
394  .time_since_epoch()
395  .count());
396  reinterpret_cast<std::uint32_t*>(
397  SNTPQueryData)[NTP_OFF_XMITTS_FRAC] = query.nonce;
398  socket_.async_send_to(
399  buffer(SNTPQueryData, 48),
400  *sel,
401  std::bind(
403  this,
404  std::placeholders::_1,
405  std::placeholders::_2));
406  }
407  }
408 
409  void
411  {
412  if (ec == boost::asio::error::operation_aborted)
413  return;
414 
415  if (ec)
416  {
417  JLOG(j_.warn()) << "SNTPClock::onSend: " << ec.message();
418  return;
419  }
420  }
421 
422  void
424  {
425  using namespace std::chrono;
426  assert(buf_.size() >= 48);
427  std::uint32_t* recvBuffer =
428  reinterpret_cast<std::uint32_t*>(&buf_.front());
429 
430  unsigned info = ntohl(recvBuffer[NTP_OFF_INFO]);
431  auto timev = seconds{ntohl(recvBuffer[NTP_OFF_RECVTS_INT])};
432  unsigned stratum = (info >> 16) & 0xff;
433 
434  if ((info >> 30) == 3)
435  {
436  JLOG(j_.info()) << "SNTP: Alarm condition " << ep_;
437  return;
438  }
439 
440  if ((stratum == 0) || (stratum > 14))
441  {
442  JLOG(j_.info()) << "SNTP: Unreasonable stratum (" << stratum
443  << ") from " << ep_;
444  return;
445  }
446 
447  using namespace std::chrono;
448  auto now = time_point_cast<seconds>(clock_type::now());
449  timev -= now.time_since_epoch();
450  timev -= NTP_UNIX_OFFSET;
451 
452  // add offset to list, replacing oldest one if appropriate
453  offsets_.push_back(timev);
454 
455  if (offsets_.size() >= NTP_SAMPLE_WINDOW)
456  offsets_.pop_front();
457 
458  lastUpdate_ = now;
459 
460  // select median time
461  auto offsetList = offsets_;
462  std::sort(offsetList.begin(), offsetList.end());
463  auto j = offsetList.size();
464  auto it = std::next(offsetList.begin(), j / 2);
465  offset_ = *it;
466 
467  if ((j % 2) == 0)
468  offset_ = (offset_ + (*--it)) / 2;
469 
470  // debounce: small corrections likely
471  // do more harm than good
472  if ((offset_ == -1s) || (offset_ == 1s))
473  offset_ = 0s;
474 
475  if (timev != 0s || offset_ != 0s)
476  {
477  JLOG(j_.trace()) << "SNTP: Offset is " << timev.count()
478  << ", new system offset is " << offset_.count();
479  }
480  }
481 };
482 
483 //------------------------------------------------------------------------------
484 
487 {
488  return std::make_unique<SNTPClientImp>(j);
489 }
490 
491 } // namespace ripple
ripple::NTP_UNIX_OFFSET
constexpr auto NTP_UNIX_OFFSET
Definition: SNTPClock.cpp:50
ripple::SNTPClientImp::timer_
boost::asio::basic_waitable_timer< std::chrono::system_clock > timer_
Definition: SNTPClock.cpp:97
std::bind
T bind(T... args)
std::string
STL class.
ripple::SNTPClientImp::now
time_point now() const override
Definition: SNTPClock.cpp:172
ripple::SNTPClientImp::queries_
std::map< boost::asio::ip::udp::endpoint, Query > queries_
Definition: SNTPClock.cpp:95
ripple::SNTPClientImp::error_code
boost::system::error_code error_code
Definition: SNTPClock.cpp:107
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::SNTPClientImp::Query::sent
sys_seconds sent
Definition: SNTPClock.cpp:80
ripple::SNTPClientImp
Definition: SNTPClock.cpp:69
std::deque::pop_front
T pop_front(T... args)
std::vector
STL class.
std::map::find
T find(T... args)
std::vector::size
T size(T... args)
ripple::SNTPClientImp::Query
Definition: SNTPClock.cpp:77
std::chrono::seconds
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
ripple::SNTPClientImp::lastUpdate_
sys_seconds lastUpdate_
Definition: SNTPClock.cpp:101
ripple::NTP_QUERY_FREQUENCY
constexpr auto NTP_QUERY_FREQUENCY
Definition: SNTPClock.cpp:41
ripple::SNTPClientImp::onRead
void onRead(error_code const &ec, std::size_t bytes_xferd)
Definition: SNTPClock.cpp:219
ripple::SNTPClientImp::addServer
void addServer(std::string const &server)
Definition: SNTPClock.cpp:287
cmath
std::vector::front
T front(T... args)
std::sort
T sort(T... args)
ripple::SNTPClientImp::Query::replied
bool replied
Definition: SNTPClock.cpp:79
std::vector::push_back
T push_back(T... args)
ripple::SNTPClientImp::onSend
void onSend(error_code const &ec, std::size_t)
Definition: SNTPClock.cpp:410
ripple::SNTPClientImp::resolver_
boost::asio::ip::udp::resolver resolver_
Definition: SNTPClock.cpp:98
std::thread::joinable
T joinable(T... args)
ripple::SNTPClientImp::buf_
std::vector< uint8_t > buf_
Definition: SNTPClock.cpp:103
ripple::SNTPClientImp::socket_
boost::asio::ip::udp::socket socket_
Definition: SNTPClock.cpp:96
ripple::make_SNTPClock
std::unique_ptr< SNTPClock > make_SNTPClock(beast::Journal j)
Definition: SNTPClock.cpp:486
ripple::rand_int
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
Definition: ripple/basics/random.h:115
thread
ripple::SNTPClientImp::run
void run(const std::vector< std::string > &servers) override
Definition: SNTPClock.cpp:136
ripple::SNTPClientImp::j_
const beast::Journal j_
Definition: SNTPClock.cpp:89
ripple::NTP_MIN_QUERY
constexpr auto NTP_MIN_QUERY
Definition: SNTPClock.cpp:44
ripple::SNTPClientImp::processReply
void processReply()
Definition: SNTPClock.cpp:423
ripple::SNTPClientImp::servers_
std::vector< std::pair< std::string, sys_seconds > > servers_
Definition: SNTPClock.cpp:99
boost::asio
Definition: Overlay.h:41
ripple::SNTPClientImp::Query::Query
Query(sys_seconds j=sys_seconds::max())
Definition: SNTPClock.cpp:83
ripple::SNTPClientImp::ep_
boost::asio::ip::udp::endpoint ep_
Definition: SNTPClock.cpp:104
ripple::SNTPClientImp::offsets_
std::deque< std::chrono::seconds > offsets_
Definition: SNTPClock.cpp:102
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::chrono::time_point
deque
ripple::SNTPClientImp::Query::nonce
std::uint32_t nonce
Definition: SNTPClock.cpp:81
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
ripple::SNTPClientImp::doQuery
bool doQuery()
Definition: SNTPClock.cpp:302
map
memory
ripple::SNTPClientImp::io_service_
boost::asio::io_service io_service_
Definition: SNTPClock.cpp:92
ripple::SNTPClientImp::queryAll
void queryAll()
Definition: SNTPClock.cpp:294
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::SNTPQueryData
static uint8_t SNTPQueryData[48]
Definition: SNTPClock.cpp:35
ripple::SNTPClientImp::offset
duration offset() const override
Definition: SNTPClock.cpp:185
std::vector::begin
T begin(T... args)
ripple::SNTPClientImp::mutex_
std::mutex mutex_
Definition: SNTPClock.cpp:90
ripple::SNTPClock
A clock based on system_clock and adjusted for SNTP.
Definition: SNTPClock.h:33
ripple::SNTPClientImp::onTimer
void onTimer(error_code const &ec)
Definition: SNTPClock.cpp:201
ripple::SNTPClientImp::doRun
void doRun()
Definition: SNTPClock.cpp:194
std::chrono::seconds::count
T count(T... args)
ripple::SNTPClientImp::thread_
std::thread thread_
Definition: SNTPClock.cpp:91
optional
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::SNTPClientImp::SNTPClientImp
SNTPClientImp(beast::Journal j)
Definition: SNTPClock.cpp:109
std::make_pair
T make_pair(T... args)
std::vector::end
T end(T... args)
std::unique_ptr
STL class.
ripple::SNTPClientImp::work_
std::optional< boost::asio::io_service::work > work_
Definition: SNTPClock.cpp:93
ripple::SNTPClientImp::resolveComplete
void resolveComplete(error_code const &ec, boost::asio::ip::udp::resolver::iterator it)
Definition: SNTPClock.cpp:344
ripple::SNTPClientImp::offset_
std::chrono::seconds offset_
Definition: SNTPClock.cpp:100
std::thread::join
T join(T... args)
ripple::SNTPClientImp::~SNTPClientImp
~SNTPClientImp() override
Definition: SNTPClock.cpp:121
ripple::NTP_TIMESTAMP_VALID
constexpr auto NTP_TIMESTAMP_VALID
Definition: SNTPClock.cpp:53
std::next
T next(T... args)
std::chrono