20 #ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED
21 #define RIPPLE_OVERLAY_SLOT_H_INCLUDED
23 #include <ripple/basics/Log.h>
24 #include <ripple/basics/chrono.h>
25 #include <ripple/beast/container/aged_unordered_map.h>
26 #include <ripple/beast/utility/Journal.h>
27 #include <ripple/overlay/Peer.h>
28 #include <ripple/overlay/ReduceRelayCommon.h>
29 #include <ripple/overlay/Squelch.h>
30 #include <ripple/protocol/PublicKey.h>
31 #include <ripple.pb.h>
43 namespace reduce_relay {
45 template <
typename clock_type>
60 template <
typename Unit,
typename TP>
64 return std::chrono::duration_cast<Unit>(t.time_since_epoch());
103 template <
typename clock_type>
188 unordered_map<id_t, std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
238 template <
typename clock_type>
243 auto now = clock_type::now();
244 for (
auto it = peers_.begin(); it != peers_.end();)
246 auto& peer = it->second;
249 if (now - peer.lastMessage >
IDLED)
251 JLOG(journal_.trace())
252 <<
"deleteIdlePeer: " <<
Slice(validator) <<
" " <<
id
254 << duration_cast<seconds>(now - peer.lastMessage).count()
256 deletePeer(validator,
id,
false);
261 template <
typename clock_type>
266 protocol::MessageType type)
269 auto now = clock_type::now();
270 auto it = peers_.find(
id);
272 if (it == peers_.end())
274 JLOG(journal_.trace())
275 <<
"update: adding peer " <<
Slice(validator) <<
" " << id;
284 JLOG(journal_.trace())
285 <<
"update: squelch expired " <<
Slice(validator) <<
" " << id;
287 it->second.lastMessage = now;
292 auto& peer = it->second;
294 JLOG(journal_.trace())
295 <<
"update: existing peer " <<
Slice(validator) <<
" " <<
id
296 <<
" slot state " <<
static_cast<int>(state_) <<
" peer state "
297 <<
static_cast<int>(peer.state) <<
" count " << peer.count <<
" last "
298 << duration_cast<milliseconds>(now - peer.lastMessage).count()
299 <<
" pool " << considered_.
size() <<
" threshold " << reachedThreshold_
300 <<
" " << (type == protocol::mtVALIDATION ?
"validation" :
"proposal");
302 peer.lastMessage = now;
308 considered_.insert(
id);
314 JLOG(journal_.trace())
315 <<
"update: resetting due to inactivity " <<
Slice(validator) <<
" "
316 <<
id <<
" " << duration_cast<seconds>(now - lastSelected_).count();
330 auto const consideredPoolSize = considered_.
size();
334 considered_.size() == 1 ? 0 :
rand_int(considered_.size() - 1);
335 auto it =
std::next(considered_.begin(), i);
337 considered_.erase(it);
338 auto const& itpeers = peers_.find(
id);
339 if (itpeers == peers_.end())
341 JLOG(journal_.error()) <<
"update: peer not found "
342 <<
Slice(validator) <<
" " << id;
345 if (now - itpeers->second.lastMessage <
IDLED)
351 JLOG(journal_.trace())
352 <<
"update: selection failed " <<
Slice(validator) <<
" " << id;
359 auto s = selected.
begin();
360 JLOG(journal_.trace())
361 <<
"update: " <<
Slice(validator) <<
" " <<
id <<
" pool size "
362 << consideredPoolSize <<
" selected " << *s <<
" "
370 for (
auto& [k, v] : peers_)
374 if (selected.
find(k) != selected.
end())
378 if (journal_.trace())
387 JLOG(journal_.trace()) <<
"update: squelching " <<
Slice(validator)
388 <<
" " <<
id <<
" " << str.
str();
390 reachedThreshold_ = 0;
395 template <
typename clock_type>
405 JLOG(journal_.warn())
406 <<
"getSquelchDuration: unexpected squelch duration " << npeers;
411 template <
typename clock_type>
415 auto it = peers_.find(
id);
416 if (it != peers_.end())
418 JLOG(journal_.trace())
419 <<
"deletePeer: " <<
Slice(validator) <<
" " <<
id <<
" selected "
421 << (considered_.find(
id) != considered_.end()) <<
" erase "
423 auto now = clock_type::now();
426 for (
auto& [k, v] : peers_)
429 handler_.unsquelch(validator, k);
436 reachedThreshold_ = 0;
439 else if (considered_.find(
id) != considered_.end())
443 considered_.erase(
id);
446 it->second.lastMessage = now;
447 it->second.count = 0;
454 template <
typename clock_type>
458 for (
auto& [_, peer] : peers_)
465 template <
typename clock_type>
471 reachedThreshold_ = 0;
475 template <
typename clock_type>
479 return std::count_if(peers_.begin(), peers_.end(), [&](
auto const& it) {
480 return (it.second.state == state);
484 template <
typename clock_type>
488 return std::count_if(peers_.begin(), peers_.end(), [&](
auto const& it) {
489 return (it.second.state != state);
493 template <
typename clock_type>
498 for (
auto const& [
id, info] : peers_)
504 template <
typename clock_type>
515 for (
auto const& [
id, info] : peers_)
521 epoch<milliseconds>(info.expire).count(),
522 epoch<milliseconds>(info.lastMessage).count()))));
531 template <
typename clock_type>
563 protocol::MessageType type);
575 auto const& it =
slots_.find(validator);
577 return it->second.inState(state);
585 auto const& it =
slots_.find(validator);
587 return it->second.notInState(state);
595 auto const& it =
slots_.find(validator);
597 return it->second.state_ == state;
605 auto const& it =
slots_.find(validator);
607 return it->second.getSelected();
619 auto const& it =
slots_.find(validator);
621 return it->second.getPeers();
629 auto const& it =
slots_.find(validator);
631 return it->second.getState();
660 beast::get_abstract_clock<clock_type>()};
663 template <
typename clock_type>
671 auto it = peersWithMessage_.find(key);
672 if (it == peersWithMessage_.end())
674 JLOG(journal_.trace())
675 <<
"addPeerMessage: new " <<
to_string(key) <<
" " << id;
680 if (it->second.find(
id) != it->second.end())
682 JLOG(journal_.trace()) <<
"addPeerMessage: duplicate message "
687 JLOG(journal_.trace())
688 <<
"addPeerMessage: added " <<
to_string(key) <<
" " << id;
696 template <
typename clock_type>
702 protocol::MessageType type)
704 if (!addPeerMessage(key,
id))
707 auto it = slots_.find(validator);
708 if (it == slots_.end())
710 JLOG(journal_.trace())
711 <<
"updateSlotAndSquelch: new slot " <<
Slice(validator);
717 it->second.update(validator,
id, type);
720 it->second.update(validator,
id, type);
723 template <
typename clock_type>
727 for (
auto& [validator, slot] : slots_)
728 slot.deletePeer(validator,
id,
erase);
731 template <
typename clock_type>
735 auto now = clock_type::now();
737 for (
auto it = slots_.begin(); it != slots_.end();)
739 it->second.deleteIdlePeer(it->first);
742 JLOG(journal_.trace())
743 <<
"deleteIdlePeers: deleting idle slot " <<
Slice(it->first);
744 it = slots_.erase(it);
755 #endif // RIPPLE_OVERLAY_SLOT_H_INCLUDED
std::size_t size() const noexcept
Returns the number of bytes in the storage.
std::unordered_map< id_t, PeerInfo > peers_
Manages partitions for logging.
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator.
An immutable linear range of bytes.
std::unordered_map< id_t, std::tuple< PeerState, uint16_t, uint32_t, uint32_t > > getPeers() const
Get peers info.
const beast::Journal journal_
SquelchHandler const & handler_
std::uint32_t id_t
Uniquely identifies a peer.
static constexpr auto SQUELCH_PER_PEER
void initCounting()
Initialize slot to Counting state.
static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT
Slot(SquelchHandler const &handler, beast::Journal journal)
Constructor.
std::uint16_t reachedThreshold_
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
virtual void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t duration) const =0
Squelch handler.
std::chrono::seconds getSquelchDuration(std::size_t npeers)
Get random squelch duration between MIN_UNSQUELCH_EXPIRE and min(max(MAX_UNSQUELCH_EXPIRE_DEFAULT,...
typename ripple::UptimeClock ::time_point time_point
std::optional< std::uint16_t > notInState(PublicKey const &validator, PeerState state) const
Return number of peers not in state.
bool inState(PublicKey const &validator, SlotState state) const
Return true if Slot is in state.
hash_map< PublicKey, Slot< clock_type > > slots_
void erase(STObject &st, TypedField< U > const &f)
Remove a field in an STObject.
const beast::Journal journal_
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
void resetCounts()
Reset counts of peers in Selected or Counting state.
std::optional< std::uint16_t > inState(PublicKey const &validator, PeerState state) const
Return number of peers in state.
static constexpr auto IDLED
void deleteIdlePeers()
Check if peers stopped relaying messages and if slots stopped receiving messages from the validator.
void deleteIdlePeer(PublicKey const &validator)
Check if peers stopped relaying messages.
Seed functor once per construction.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
A generic endpoint for log messages.
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
static messages peersWithMessage_
static constexpr uint16_t MIN_MESSAGE_THRESHOLD
std::optional< SlotState > getState(PublicKey const &validator)
Get Slot's state.
std::unordered_map< typename Peer::id_t, std::tuple< PeerState, uint16_t, uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
Get peers info.
typename ripple::UptimeClock ::time_point time_point
void deletePeer(PublicKey const &validator, id_t id, bool erase)
Handle peer deletion when a peer disconnects.
SlotState getState() const
Return Slot's state.
static constexpr auto MIN_UNSQUELCH_EXPIRE
Associative container where each element is also indexed by time.
bool addPeerMessage(uint256 const &key, id_t id)
Add message/peer if have not seen this message from the peer.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Slots(Logs &logs, SquelchHandler const &handler)
std::unordered_set< id_t > considered_
clock_type::time_point lastSelected_
virtual void unsquelch(PublicKey const &validator, Peer::id_t id) const =0
Unsquelch handler.
std::set< id_t > getSelected(PublicKey const &validator)
Get selected peers.
std::set< id_t > getSelected() const
Return selected peers.
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
std::uint16_t notInState(PeerState state) const
Return number of peers not in state.
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS
Data maintained for each peer.
SquelchHandler const & handler_
std::uint16_t inState(PeerState state) const
Return number of peers in state.
const time_point & getLastSelected() const
Get the time of the last peer selection round.
virtual ~SquelchHandler()
void update(PublicKey const &validator, id_t id, protocol::MessageType type)
Update peer info.
void deletePeer(id_t id, bool erase)
Called when a peer is deleted.
static constexpr uint16_t MAX_SELECTED_PEERS
Slot is associated with a specific validator via validator's public key.