20 #ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED 
   21 #define RIPPLE_OVERLAY_SLOT_H_INCLUDED 
   23 #include <ripple/basics/Log.h> 
   24 #include <ripple/basics/chrono.h> 
   25 #include <ripple/beast/container/aged_unordered_map.h> 
   26 #include <ripple/beast/utility/Journal.h> 
   27 #include <ripple/overlay/Peer.h> 
   28 #include <ripple/overlay/ReduceRelayCommon.h> 
   29 #include <ripple/overlay/Squelch.h> 
   30 #include <ripple/protocol/PublicKey.h> 
   31 #include <ripple.pb.h> 
   43 namespace reduce_relay {
 
   45 template <
typename clock_type>
 
   60 template <
typename Unit, 
typename TP>
 
   64     return std::chrono::duration_cast<Unit>(t.time_since_epoch());
 
  103 template <
typename clock_type>
 
  188         unordered_map<id_t, std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
 
  238 template <
typename clock_type>
 
  243     auto now = clock_type::now();
 
  244     for (
auto it = peers_.begin(); it != peers_.end();)
 
  246         auto& peer = it->second;
 
  249         if (now - peer.lastMessage > 
IDLED)
 
  251             JLOG(journal_.trace())
 
  252                 << 
"deleteIdlePeer: " << 
Slice(validator) << 
" " << 
id 
  254                 << duration_cast<seconds>(now - peer.lastMessage).count()
 
  256             deletePeer(validator, 
id, 
false);
 
  261 template <
typename clock_type>
 
  266     protocol::MessageType type)
 
  269     auto now = clock_type::now();
 
  270     auto it = peers_.find(
id);
 
  272     if (it == peers_.end())
 
  274         JLOG(journal_.trace())
 
  275             << 
"update: adding peer " << 
Slice(validator) << 
" " << id;
 
  284         JLOG(journal_.trace())
 
  285             << 
"update: squelch expired " << 
Slice(validator) << 
" " << id;
 
  287         it->second.lastMessage = now;
 
  292     auto& peer = it->second;
 
  294     JLOG(journal_.trace())
 
  295         << 
"update: existing peer " << 
Slice(validator) << 
" " << 
id 
  296         << 
" slot state " << 
static_cast<int>(state_) << 
" peer state " 
  297         << 
static_cast<int>(peer.state) << 
" count " << peer.count << 
" last " 
  298         << duration_cast<milliseconds>(now - peer.lastMessage).count()
 
  299         << 
" pool " << considered_.
size() << 
" threshold " << reachedThreshold_
 
  300         << 
" " << (type == protocol::mtVALIDATION ? 
"validation" : 
"proposal");
 
  302     peer.lastMessage = now;
 
  308         considered_.insert(
id);
 
  314         JLOG(journal_.trace())
 
  315             << 
"update: resetting due to inactivity " << 
Slice(validator) << 
" " 
  316             << 
id << 
" " << duration_cast<seconds>(now - lastSelected_).count();
 
  330         auto const consideredPoolSize = considered_.
size();
 
  334                 considered_.size() == 1 ? 0 : 
rand_int(considered_.size() - 1);
 
  335             auto it = 
std::next(considered_.begin(), i);
 
  337             considered_.erase(it);
 
  338             auto const& itpeers = peers_.find(
id);
 
  339             if (itpeers == peers_.end())
 
  341                 JLOG(journal_.error()) << 
"update: peer not found " 
  342                                        << 
Slice(validator) << 
" " << id;
 
  345             if (now - itpeers->second.lastMessage < 
IDLED)
 
  351             JLOG(journal_.trace())
 
  352                 << 
"update: selection failed " << 
Slice(validator) << 
" " << id;
 
  359         auto s = selected.
begin();
 
  360         JLOG(journal_.trace())
 
  361             << 
"update: " << 
Slice(validator) << 
" " << 
id << 
" pool size " 
  362             << consideredPoolSize << 
" selected " << *s << 
" " 
  370         for (
auto& [k, v] : peers_)
 
  374             if (selected.
find(k) != selected.
end())
 
  378                 if (journal_.trace())
 
  387         JLOG(journal_.trace()) << 
"update: squelching " << 
Slice(validator)
 
  388                                << 
" " << 
id << 
" " << str.
str();
 
  390         reachedThreshold_ = 0;
 
  395 template <
typename clock_type>
 
  405         JLOG(journal_.warn())
 
  406             << 
"getSquelchDuration: unexpected squelch duration " << npeers;
 
  411 template <
typename clock_type>
 
  415     auto it = peers_.find(
id);
 
  416     if (it != peers_.end())
 
  418         JLOG(journal_.trace())
 
  419             << 
"deletePeer: " << 
Slice(validator) << 
" " << 
id << 
" selected " 
  421             << (considered_.find(
id) != considered_.end()) << 
" erase " 
  423         auto now = clock_type::now();
 
  426             for (
auto& [k, v] : peers_)
 
  429                     handler_.unsquelch(validator, k);
 
  436             reachedThreshold_ = 0;
 
  439         else if (considered_.find(
id) != considered_.end())
 
  443             considered_.erase(
id);
 
  446         it->second.lastMessage = now;
 
  447         it->second.count = 0;
 
  454 template <
typename clock_type>
 
  458     for (
auto& [_, peer] : peers_)
 
  465 template <
typename clock_type>
 
  471     reachedThreshold_ = 0;
 
  475 template <
typename clock_type>
 
  479     return std::count_if(peers_.begin(), peers_.end(), [&](
auto const& it) {
 
  480         return (it.second.state == state);
 
  484 template <
typename clock_type>
 
  488     return std::count_if(peers_.begin(), peers_.end(), [&](
auto const& it) {
 
  489         return (it.second.state != state);
 
  493 template <
typename clock_type>
 
  498     for (
auto const& [
id, info] : peers_)
 
  504 template <
typename clock_type>
 
  515     for (
auto const& [
id, info] : peers_)
 
  521                 epoch<milliseconds>(info.expire).count(),
 
  522                 epoch<milliseconds>(info.lastMessage).count()))));
 
  531 template <
typename clock_type>
 
  563         protocol::MessageType type);
 
  575         auto const& it = 
slots_.find(validator);
 
  577             return it->second.inState(state);
 
  585         auto const& it = 
slots_.find(validator);
 
  587             return it->second.notInState(state);
 
  595         auto const& it = 
slots_.find(validator);
 
  597             return it->second.state_ == state;
 
  605         auto const& it = 
slots_.find(validator);
 
  607             return it->second.getSelected();
 
  619         auto const& it = 
slots_.find(validator);
 
  621             return it->second.getPeers();
 
  629         auto const& it = 
slots_.find(validator);
 
  631             return it->second.getState();
 
  660         beast::get_abstract_clock<clock_type>()};
 
  663 template <
typename clock_type>
 
  671         auto it = peersWithMessage_.find(key);
 
  672         if (it == peersWithMessage_.end())
 
  674             JLOG(journal_.trace())
 
  675                 << 
"addPeerMessage: new " << 
to_string(key) << 
" " << id;
 
  680         if (it->second.find(
id) != it->second.end())
 
  682             JLOG(journal_.trace()) << 
"addPeerMessage: duplicate message " 
  687         JLOG(journal_.trace())
 
  688             << 
"addPeerMessage: added " << 
to_string(key) << 
" " << id;
 
  696 template <
typename clock_type>
 
  702     protocol::MessageType type)
 
  704     if (!addPeerMessage(key, 
id))
 
  707     auto it = slots_.find(validator);
 
  708     if (it == slots_.end())
 
  710         JLOG(journal_.trace())
 
  711             << 
"updateSlotAndSquelch: new slot " << 
Slice(validator);
 
  717         it->second.update(validator, 
id, type);
 
  720         it->second.update(validator, 
id, type);
 
  723 template <
typename clock_type>
 
  727     for (
auto& [validator, slot] : slots_)
 
  728         slot.deletePeer(validator, 
id, 
erase);
 
  731 template <
typename clock_type>
 
  735     auto now = clock_type::now();
 
  737     for (
auto it = slots_.begin(); it != slots_.end();)
 
  739         it->second.deleteIdlePeer(it->first);
 
  742             JLOG(journal_.trace())
 
  743                 << 
"deleteIdlePeers: deleting idle slot " << 
Slice(it->first);
 
  744             it = slots_.erase(it);
 
  755 #endif  // RIPPLE_OVERLAY_SLOT_H_INCLUDED 
  
std::size_t size() const noexcept
Returns the number of bytes in the storage.
 
std::unordered_map< id_t, PeerInfo > peers_
 
Manages partitions for logging.
 
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator.
 
An immutable linear range of bytes.
 
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
 
const beast::Journal journal_
 
SquelchHandler const  & handler_
 
std::uint32_t id_t
Uniquely identifies a peer.
 
static constexpr auto SQUELCH_PER_PEER
 
void initCounting()
Initialize slot to Counting state.
 
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT
 
Slot(SquelchHandler const &handler, beast::Journal journal)
Constructor.
 
std::uint16_t reachedThreshold_
 
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
 
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
 
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between MIN_UNSQUELCH_EXPIRE and min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT,...
 
typename ripple::UptimeClock ::time_point time_point
 
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
 
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
 
hash_map< PublicKey, Slot< clock_type > > slots_
 
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
 
const beast::Journal journal_
 
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
 
void resetCounts()
Reset counts of peers in Selected or Counting state.
 
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
 
static constexpr auto IDLED
 
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
 
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
 
Seed functor once per construction.
 
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
 
A generic endpoint for log messages.
 
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
 
static messages peersWithMessage_
 
static constexpr uint16_t MIN_MESSAGE_THRESHOLD
 
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
 
std::unordered_map< typename Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
 
typename ripple::UptimeClock ::time_point time_point
 
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
 
SlotState getState() const
Return Slot's state.
 
static constexpr auto MIN_UNSQUELCH_EXPIRE
 
Associative container where each element is also indexed by time.
 
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
Slots(Logs &logs, SquelchHandler const &handler)
 
std::unordered_set< id_t > considered_
 
clock_type::time_point lastSelected_
 
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
 
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
 
std::set< id_t > getSelected() const
Return selected peers.
 
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
 
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
 
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS
 
Data maintained for each peer.
 
SquelchHandler const  & handler_
 
std::uint16_t inState(PeerState state) const
Return number of peers in state.
 
const time_point & getLastSelected() const
Get the time of the last peer selection round.
 
virtual ~SquelchHandler()
 
void update(PublicKey const &validator, id_t id, protocol::MessageType type)
Update peer info.
 
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
 
static constexpr uint16_t MAX_SELECTED_PEERS
 
Slot is associated with a specific validator via validator's public key.