rippled
resource/impl/Logic.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_RESOURCE_LOGIC_H_INCLUDED
21 #define RIPPLE_RESOURCE_LOGIC_H_INCLUDED
22 
23 #include <ripple/basics/Log.h>
24 #include <ripple/basics/UnorderedContainers.h>
25 #include <ripple/basics/chrono.h>
26 #include <ripple/beast/clock/abstract_clock.h>
27 #include <ripple/beast/insight/Insight.h>
28 #include <ripple/beast/utility/PropertyStream.h>
29 #include <ripple/json/json_value.h>
30 #include <ripple/protocol/jss.h>
31 #include <ripple/resource/Fees.h>
32 #include <ripple/resource/Gossip.h>
33 #include <ripple/resource/impl/Import.h>
34 #include <cassert>
35 #include <mutex>
36 
37 namespace ripple {
38 namespace Resource {
39 
40 class Logic
41 {
42 private:
47 
48  struct Stats
49  {
51  {
52  warn = collector->make_meter("warn");
53  drop = collector->make_meter("drop");
54  }
55 
58  };
59 
63 
65 
66  // Table of all entries
68 
69  // Because the following are intrusive lists, a given Entry may be in
70  // at most list at a given instant. The Entry must be removed from
71  // one list before placing it in another.
72 
73  // List of all active inbound entries
75 
76  // List of all active outbound entries
78 
79  // List of all active admin entries
81 
82  // List of all inactve entries
84 
85  // All imported gossip data
87 
88  //--------------------------------------------------------------------------
89 public:
91  beast::insight::Collector::ptr const& collector,
92  clock_type& clock,
93  beast::Journal journal)
94  : m_stats(collector), m_clock(clock), m_journal(journal)
95  {
96  }
97 
99  {
100  // These have to be cleared before the Logic is destroyed
101  // since their destructors call back into the class.
102  // Order matters here as well, the import table has to be
103  // destroyed before the consumer table.
104  //
106  table_.clear();
107  }
108 
109  Consumer
111  {
112  Entry* entry(nullptr);
113 
114  {
116  auto [resultIt, resultInserted] = table_.emplace(
117  std::piecewise_construct,
118  std::make_tuple(kindInbound, address.at_port(0)), // Key
119  std::make_tuple(m_clock.now())); // Entry
120 
121  entry = &resultIt->second;
122  entry->key = &resultIt->first;
123  ++entry->refcount;
124  if (entry->refcount == 1)
125  {
126  if (!resultInserted)
127  {
129  }
130  inbound_.push_back(*entry);
131  }
132  }
133 
134  JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
135 
136  return Consumer(*this, *entry);
137  }
138 
139  Consumer
141  {
142  Entry* entry(nullptr);
143 
144  {
146  auto [resultIt, resultInserted] = table_.emplace(
147  std::piecewise_construct,
148  std::make_tuple(kindOutbound, address), // Key
149  std::make_tuple(m_clock.now())); // Entry
150 
151  entry = &resultIt->second;
152  entry->key = &resultIt->first;
153  ++entry->refcount;
154  if (entry->refcount == 1)
155  {
156  if (!resultInserted)
158  outbound_.push_back(*entry);
159  }
160  }
161 
162  JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
163 
164  return Consumer(*this, *entry);
165  }
166 
172  Consumer
174  {
175  Entry* entry(nullptr);
176 
177  {
179  auto [resultIt, resultInserted] = table_.emplace(
180  std::piecewise_construct,
181  std::make_tuple(kindUnlimited, address.at_port(1)), // Key
182  std::make_tuple(m_clock.now())); // Entry
183 
184  entry = &resultIt->second;
185  entry->key = &resultIt->first;
186  ++entry->refcount;
187  if (entry->refcount == 1)
188  {
189  if (!resultInserted)
191  admin_.push_back(*entry);
192  }
193  }
194 
195  JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
196 
197  return Consumer(*this, *entry);
198  }
199 
202  {
203  return getJson(warningThreshold);
204  }
205 
208  getJson(int threshold)
209  {
210  clock_type::time_point const now(m_clock.now());
211 
214 
215  for (auto& inboundEntry : inbound_)
216  {
217  int localBalance = inboundEntry.local_balance.value(now);
218  if ((localBalance + inboundEntry.remote_balance) >= threshold)
219  {
220  Json::Value& entry =
221  (ret[inboundEntry.to_string()] = Json::objectValue);
222  entry[jss::local] = localBalance;
223  entry[jss::remote] = inboundEntry.remote_balance;
224  entry[jss::type] = "inbound";
225  }
226  }
227  for (auto& outboundEntry : outbound_)
228  {
229  int localBalance = outboundEntry.local_balance.value(now);
230  if ((localBalance + outboundEntry.remote_balance) >= threshold)
231  {
232  Json::Value& entry =
233  (ret[outboundEntry.to_string()] = Json::objectValue);
234  entry[jss::local] = localBalance;
235  entry[jss::remote] = outboundEntry.remote_balance;
236  entry[jss::type] = "outbound";
237  }
238  }
239  for (auto& adminEntry : admin_)
240  {
241  int localBalance = adminEntry.local_balance.value(now);
242  if ((localBalance + adminEntry.remote_balance) >= threshold)
243  {
244  Json::Value& entry =
245  (ret[adminEntry.to_string()] = Json::objectValue);
246  entry[jss::local] = localBalance;
247  entry[jss::remote] = adminEntry.remote_balance;
248  entry[jss::type] = "admin";
249  }
250  }
251 
252  return ret;
253  }
254 
255  Gossip
257  {
258  clock_type::time_point const now(m_clock.now());
259 
260  Gossip gossip;
262 
263  gossip.items.reserve(inbound_.size());
264 
265  for (auto& inboundEntry : inbound_)
266  {
267  Gossip::Item item;
268  item.balance = inboundEntry.local_balance.value(now);
269  if (item.balance >= minimumGossipBalance)
270  {
271  item.address = inboundEntry.key->address;
272  gossip.items.push_back(item);
273  }
274  }
275 
276  return gossip;
277  }
278 
279  //--------------------------------------------------------------------------
280 
281  void
282  importConsumers(std::string const& origin, Gossip const& gossip)
283  {
284  auto const elapsed = m_clock.now();
285  {
287  auto [resultIt, resultInserted] = importTable_.emplace(
288  std::piecewise_construct,
289  std::make_tuple(origin), // Key
291  m_clock.now().time_since_epoch().count())); // Import
292 
293  if (resultInserted)
294  {
295  // This is a new import
296  Import& next(resultIt->second);
297  next.whenExpires = elapsed + gossipExpirationSeconds;
298  next.items.reserve(gossip.items.size());
299 
300  for (auto const& gossipItem : gossip.items)
301  {
302  Import::Item item;
303  item.balance = gossipItem.balance;
304  item.consumer = newInboundEndpoint(gossipItem.address);
305  item.consumer.entry().remote_balance += item.balance;
306  next.items.push_back(item);
307  }
308  }
309  else
310  {
311  // Previous import exists so add the new remote
312  // balances and then deduct the old remote balances.
313 
314  Import next;
315  next.whenExpires = elapsed + gossipExpirationSeconds;
316  next.items.reserve(gossip.items.size());
317  for (auto const& gossipItem : gossip.items)
318  {
319  Import::Item item;
320  item.balance = gossipItem.balance;
321  item.consumer = newInboundEndpoint(gossipItem.address);
322  item.consumer.entry().remote_balance += item.balance;
323  next.items.push_back(item);
324  }
325 
326  Import& prev(resultIt->second);
327  for (auto& item : prev.items)
328  {
329  item.consumer.entry().remote_balance -= item.balance;
330  }
331 
332  std::swap(next, prev);
333  }
334  }
335  }
336 
337  //--------------------------------------------------------------------------
338 
339  // Called periodically to expire entries and groom the table.
340  //
341  void
343  {
345 
346  auto const elapsed = m_clock.now();
347 
348  for (auto iter(inactive_.begin()); iter != inactive_.end();)
349  {
350  if (iter->whenExpires <= elapsed)
351  {
352  JLOG(m_journal.debug()) << "Expired " << *iter;
353  auto table_iter = table_.find(*iter->key);
354  ++iter;
355  erase(table_iter);
356  }
357  else
358  {
359  break;
360  }
361  }
362 
363  auto iter = importTable_.begin();
364  while (iter != importTable_.end())
365  {
366  Import& import(iter->second);
367  if (iter->second.whenExpires <= elapsed)
368  {
369  for (auto item_iter(import.items.begin());
370  item_iter != import.items.end();
371  ++item_iter)
372  {
373  item_iter->consumer.entry().remote_balance -=
374  item_iter->balance;
375  }
376 
377  iter = importTable_.erase(iter);
378  }
379  else
380  ++iter;
381  }
382  }
383 
384  //--------------------------------------------------------------------------
385 
386  // Returns the disposition based on the balance and thresholds
387  static Disposition
389  {
390  if (balance >= dropThreshold)
391  return Disposition::drop;
392 
393  if (balance >= warningThreshold)
394  return Disposition::warn;
395 
396  return Disposition::ok;
397  }
398 
399  void
400  erase(Table::iterator iter)
401  {
403  Entry& entry(iter->second);
404  assert(entry.refcount == 0);
406  table_.erase(iter);
407  }
408 
409  void
410  acquire(Entry& entry)
411  {
413  ++entry.refcount;
414  }
415 
416  void
417  release(Entry& entry)
418  {
420  if (--entry.refcount == 0)
421  {
422  JLOG(m_journal.debug()) << "Inactive " << entry;
423 
424  switch (entry.key->kind)
425  {
426  case kindInbound:
428  break;
429  case kindOutbound:
431  break;
432  case kindUnlimited:
433  admin_.erase(admin_.iterator_to(entry));
434  break;
435  default:
436  assert(false);
437  break;
438  }
439  inactive_.push_back(entry);
441  }
442  }
443 
445  charge(Entry& entry, Charge const& fee)
446  {
448  clock_type::time_point const now(m_clock.now());
449  int const balance(entry.add(fee.cost(), now));
450  JLOG(m_journal.trace()) << "Charging " << entry << " for " << fee;
451  return disposition(balance);
452  }
453 
454  bool
455  warn(Entry& entry)
456  {
457  if (entry.isUnlimited())
458  return false;
459 
461  bool notify(false);
462  auto const elapsed = m_clock.now();
463  if (entry.balance(m_clock.now()) >= warningThreshold &&
464  elapsed != entry.lastWarningTime)
465  {
466  charge(entry, feeWarning);
467  notify = true;
468  entry.lastWarningTime = elapsed;
469  }
470  if (notify)
471  {
472  JLOG(m_journal.info()) << "Load warning: " << entry;
473  ++m_stats.warn;
474  }
475  return notify;
476  }
477 
478  bool
480  {
481  if (entry.isUnlimited())
482  return false;
483 
485  bool drop(false);
486  clock_type::time_point const now(m_clock.now());
487  int const balance(entry.balance(now));
488  if (balance >= dropThreshold)
489  {
490  JLOG(m_journal.warn())
491  << "Consumer entry " << entry << " dropped with balance "
492  << balance << " at or above drop threshold " << dropThreshold;
493 
494  // Adding feeDrop at this point keeps the dropped connection
495  // from re-connecting for at least a little while after it is
496  // dropped.
497  charge(entry, feeDrop);
498  ++m_stats.drop;
499  drop = true;
500  }
501  return drop;
502  }
503 
504  int
505  balance(Entry& entry)
506  {
508  return entry.balance(m_clock.now());
509  }
510 
511  //--------------------------------------------------------------------------
512 
513  void
515  clock_type::time_point const now,
517  EntryIntrusiveList& list)
518  {
519  for (auto& entry : list)
520  {
521  beast::PropertyStream::Map item(items);
522  if (entry.refcount != 0)
523  item["count"] = entry.refcount;
524  item["name"] = entry.to_string();
525  item["balance"] = entry.balance(now);
526  if (entry.remote_balance != 0)
527  item["remote_balance"] = entry.remote_balance;
528  }
529  }
530 
531  void
533  {
534  clock_type::time_point const now(m_clock.now());
535 
537 
538  {
539  beast::PropertyStream::Set s("inbound", map);
540  writeList(now, s, inbound_);
541  }
542 
543  {
544  beast::PropertyStream::Set s("outbound", map);
545  writeList(now, s, outbound_);
546  }
547 
548  {
549  beast::PropertyStream::Set s("admin", map);
550  writeList(now, s, admin_);
551  }
552 
553  {
554  beast::PropertyStream::Set s("inactive", map);
555  writeList(now, s, inactive_);
556  }
557  }
558 };
559 
560 } // namespace Resource
561 } // namespace ripple
562 
563 #endif
beast::List::size
size_type size() const noexcept
Returns the number of elements in the list.
Definition: List.h:318
ripple::Resource::Import
A set of imported consumer data from a gossip origin.
Definition: Import.h:30
ripple::Resource::kindOutbound
@ kindOutbound
Definition: Kind.h:34
ripple::Resource::Logic::getJson
Json::Value getJson()
Definition: resource/impl/Logic.h:201
std::make_tuple
T make_tuple(T... args)
ripple::Resource::Logic::onWrite
void onWrite(beast::PropertyStream::Map &map)
Definition: resource/impl/Logic.h:532
std::string
STL class.
std::shared_ptr< Collector >
beast::insight::Meter
A metric for measuring an integral value.
Definition: Meter.h:37
ripple::Resource::Logic::lock_
std::recursive_mutex lock_
Definition: resource/impl/Logic.h:64
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
beast::PropertyStream::Map
Definition: PropertyStream.h:224
ripple::Resource::Import::Item::consumer
Consumer consumer
Definition: Import.h:37
ripple::Resource::Gossip
Data format for exchanging consumption information across peers.
Definition: Gossip.h:29
ripple::Resource::Logic::inactive_
EntryIntrusiveList inactive_
Definition: resource/impl/Logic.h:83
ripple::Resource::kindInbound
@ kindInbound
Definition: Kind.h:34
ripple::Resource::Consumer::entry
Entry & entry()
Definition: Consumer.cpp:136
ripple::Resource::Logic::admin_
EntryIntrusiveList admin_
Definition: resource/impl/Logic.h:80
ripple::Resource::Logic::newInboundEndpoint
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
Definition: resource/impl/Logic.h:110
std::unordered_map::find
T find(T... args)
beast::List::iterator_to
iterator iterator_to(T &element) const noexcept
Obtain an iterator from an element.
Definition: List.h:562
beast::List::end
iterator end() noexcept
Obtain a iterator to the end of the list.
Definition: List.h:394
ripple::Resource::Import::Item
Definition: Import.h:32
ripple::Resource::Key::kind
Kind kind
Definition: Key.h:33
beast::IP::Endpoint::address
Address const & address() const
Returns the address portion of this endpoint.
Definition: IPEndpoint.h:76
std::unordered_map::emplace
T emplace(T... args)
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::recursive_mutex
STL class.
std::lock_guard
STL class.
ripple::Resource::Logic::disconnect
bool disconnect(Entry &entry)
Definition: resource/impl/Logic.h:479
ripple::Resource::secondsUntilExpiration
constexpr std::chrono::seconds secondsUntilExpiration
Definition: resource/impl/Tuning.h:48
ripple::Resource::Logic::balance
int balance(Entry &entry)
Definition: resource/impl/Logic.h:505
beast::PropertyStream::Set
Definition: PropertyStream.h:296
ripple::Resource::feeWarning
const Charge feeWarning
ripple::Resource::minimumGossipBalance
@ minimumGossipBalance
Definition: resource/impl/Tuning.h:44
std::unordered_map::clear
T clear(T... args)
beast::abstract_clock::now
virtual time_point now() const =0
Returns the current time.
ripple::Resource::Logic::acquire
void acquire(Entry &entry)
Definition: resource/impl/Logic.h:410
ripple::Resource::Logic
Definition: resource/impl/Logic.h:40
ripple::Resource::Entry::balance
int balance(clock_type::time_point const now)
Definition: Entry.h:72
ripple::Resource::Logic::table_
Table table_
Definition: resource/impl/Logic.h:67
ripple::Resource::Logic::getJson
Json::Value getJson(int threshold)
Returns a Json::objectValue.
Definition: resource/impl/Logic.h:208
ripple::Resource::Logic::m_clock
Stopwatch & m_clock
Definition: resource/impl/Logic.h:61
ripple::Resource::feeDrop
const Charge feeDrop
ripple::Resource::gossipExpirationSeconds
constexpr std::chrono::seconds gossipExpirationSeconds
Definition: resource/impl/Tuning.h:51
ripple::Resource::kindUnlimited
@ kindUnlimited
Definition: Kind.h:34
ripple::Resource::Entry::lastWarningTime
clock_type::time_point lastWarningTime
Definition: Entry.h:98
ripple::Resource::Entry::isUnlimited
bool isUnlimited() const
Returns true if this connection should have no resource limits applied–it is still possible for certa...
Definition: Entry.h:65
ripple::Resource::drop
@ drop
Definition: Disposition.h:37
ripple::Resource::Logic::m_stats
Stats m_stats
Definition: resource/impl/Logic.h:60
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
beast::List::begin
iterator begin() noexcept
Obtain an iterator to the beginning of the list.
Definition: List.h:367
ripple::Resource::Logic::warn
bool warn(Entry &entry)
Definition: resource/impl/Logic.h:455
ripple::Resource::Entry::whenExpires
clock_type::time_point whenExpires
Definition: Entry.h:101
ripple::Resource::Logic::m_journal
beast::Journal m_journal
Definition: resource/impl/Logic.h:62
ripple::Resource::Logic::erase
void erase(Table::iterator iter)
Definition: resource/impl/Logic.h:400
ripple::Resource::Logic::importConsumers
void importConsumers(std::string const &origin, Gossip const &gossip)
Definition: resource/impl/Logic.h:282
ripple::Resource::Logic::importTable_
Imports importTable_
Definition: resource/impl/Logic.h:86
ripple::Resource::Gossip::items
std::vector< Item > items
Definition: Gossip.h:42
ripple::Resource::Logic::outbound_
EntryIntrusiveList outbound_
Definition: resource/impl/Logic.h:77
ripple::Resource::Charge::cost
value_type cost() const
Return the cost of the charge in Resource::Manager units.
Definition: Charge.cpp:38
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::unordered_map::erase
T erase(T... args)
ripple::Resource::Logic::inbound_
EntryIntrusiveList inbound_
Definition: resource/impl/Logic.h:74
ripple::Resource::Logic::release
void release(Entry &entry)
Definition: resource/impl/Logic.h:417
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::Resource::Logic::Stats
Definition: resource/impl/Logic.h:48
ripple::Resource::Logic::~Logic
~Logic()
Definition: resource/impl/Logic.h:98
beast::abstract_clock< std::chrono::steady_clock >
ripple::Resource::Entry::refcount
int refcount
Definition: Entry.h:89
std::swap
T swap(T... args)
ripple::Resource::Import::Item::balance
int balance
Definition: Import.h:36
ripple::Resource::Logic::Logic
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
Definition: resource/impl/Logic.h:90
ripple::Resource::Entry
Definition: Entry.h:37
ripple::Resource::Logic::Stats::Stats
Stats(beast::insight::Collector::ptr const &collector)
Definition: resource/impl/Logic.h:50
ripple::Resource::Gossip::Item
Describes a single consumer.
Definition: Gossip.h:34
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Resource::Logic::writeList
void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
Definition: resource/impl/Logic.h:514
std::unordered_map::begin
T begin(T... args)
cassert
ripple::Resource::Logic::newUnlimitedEndpoint
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
Definition: resource/impl/Logic.h:173
ripple::Resource::Gossip::Item::address
beast::IP::Endpoint address
Definition: Gossip.h:39
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::Resource::Charge
A consumption charge.
Definition: Charge.h:30
ripple::Resource::Gossip::Item::balance
int balance
Definition: Gossip.h:38
ripple::Resource::dropThreshold
@ dropThreshold
Definition: resource/impl/Tuning.h:35
ripple::Resource::Disposition
Disposition
The disposition of a consumer after applying a load charge.
Definition: Disposition.h:27
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::Resource::Logic::Stats::drop
beast::insight::Meter drop
Definition: resource/impl/Logic.h:57
ripple::Resource::Logic::exportConsumers
Gossip exportConsumers()
Definition: resource/impl/Logic.h:256
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
std::unordered_map::end
T end(T... args)
beast::List::erase
iterator erase(iterator pos) noexcept
Remove an element.
Definition: List.h:472
ripple::Resource::Entry::remote_balance
int remote_balance
Definition: Entry.h:95
beast::IP::Endpoint::at_port
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition: IPEndpoint.h:69
ripple::Resource::Entry::key
Key const * key
Definition: Entry.h:86
std::unordered_map
STL class.
ripple::Stopwatch
beast::abstract_clock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
Definition: chrono.h:81
beast::abstract_clock< std::chrono::steady_clock >::time_point
typename std::chrono::steady_clock ::time_point time_point
Definition: abstract_clock.h:63
ripple::Resource::Logic::periodicActivity
void periodicActivity()
Definition: resource/impl/Logic.h:342
ripple::Resource::Entry::add
int add(int charge, clock_type::time_point const now)
Definition: Entry.h:80
ripple::Resource::Logic::newOutboundEndpoint
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Definition: resource/impl/Logic.h:140
ripple::Resource::Logic::charge
Disposition charge(Entry &entry, Charge const &fee)
Definition: resource/impl/Logic.h:445
beast::List::push_back
iterator push_back(T &element) noexcept
Append an element at the end of the list.
Definition: List.h:509
ripple::Resource::warningThreshold
@ warningThreshold
Definition: resource/impl/Tuning.h:31
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::Resource::Logic::disposition
static Disposition disposition(int balance)
Definition: resource/impl/Logic.h:388
ripple::Resource::Logic::Stats::warn
beast::insight::Meter warn
Definition: resource/impl/Logic.h:56
beast::List< Entry >