20 #ifndef RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
21 #define RIPPLE_PEERFINDER_LOGIC_H_INCLUDED
23 #include <ripple/basics/Log.h>
24 #include <ripple/basics/contract.h>
25 #include <ripple/basics/random.h>
26 #include <ripple/beast/container/aged_container_utility.h>
27 #include <ripple/beast/net/IPAddressConversion.h>
28 #include <ripple/peerfinder/PeerfinderManager.h>
29 #include <ripple/peerfinder/impl/Bootcache.h>
30 #include <ripple/peerfinder/impl/Counts.h>
31 #include <ripple/peerfinder/impl/Fixed.h>
32 #include <ripple/peerfinder/impl/Handouts.h>
33 #include <ripple/peerfinder/impl/Livecache.h>
34 #include <ripple/peerfinder/impl/Reporting.h>
35 #include <ripple/peerfinder/impl/SlotImp.h>
36 #include <ripple/peerfinder/impl/Source.h>
37 #include <ripple/peerfinder/impl/Store.h>
38 #include <ripple/peerfinder/impl/iosformat.h>
47 namespace PeerFinder {
53 template <
class Checker>
188 if (addresses.
empty())
191 <<
"Could not resolve fixed slot '" << name <<
"'";
195 for (
auto const& remote_address : addresses)
197 if (remote_address.port() == 0)
199 Throw<std::runtime_error>(
200 "Port not specified for address:" +
201 remote_address.to_string());
204 auto result(
fixed_.emplace(
205 std::piecewise_construct,
213 <<
"' at " << remote_address;
226 boost::system::error_code ec)
228 if (ec == boost::asio::error::operation_aborted)
237 <<
beast::leftw(18) <<
"Logic tested " << checkedAddress
238 <<
" but the connection was closed";
252 <<
" with error, " << ec.message();
260 << checkedAddress <<
" succeeded";
271 <<
beast::leftw(18) <<
"Logic accept" << remote_endpoint
272 <<
" on local " << local_endpoint;
277 if (is_public(remote_endpoint))
285 << remote_endpoint <<
" because of ip limits.";
297 auto const result(
slots_.
emplace(slot->remote_endpoint(), slot));
299 assert(result.second);
306 return result.first->second;
314 <<
beast::leftw(18) <<
"Logic connect " << remote_endpoint;
322 <<
beast::leftw(18) <<
"Logic dropping " << remote_endpoint
323 <<
" as duplicate connect";
332 auto const result =
slots_.
emplace(slot->remote_endpoint(), slot);
334 assert(result.second);
342 return result.first->second;
351 <<
beast::leftw(18) <<
"Logic connected" << slot->remote_endpoint()
352 <<
" on local " << local_endpoint;
359 slot->local_endpoint(local_endpoint);
367 iter->second->local_endpoint() == slot->remote_endpoint());
370 << slot->remote_endpoint() <<
" as self connect";
386 <<
beast::leftw(18) <<
"Logic handshake " << slot->remote_endpoint()
387 <<
" with " << (reserved ?
"reserved " :
"") <<
"key " << key;
404 slot->reserved(reserved);
410 if (!slot->inbound())
417 slot->public_key(key);
419 auto const result =
keys_.insert(key);
421 assert(result.second);
430 if (!slot->inbound())
434 if (slot->fixed() && !slot->inbound())
436 auto iter(
fixed_.find(slot->remote_endpoint()));
437 assert(iter !=
fixed_.end());
440 << slot->remote_endpoint() <<
" success";
457 return std::move(h.
list());
482 for (
auto const& s :
slots_)
534 << ((h.
list().
size() > 1) ?
"endpoints" :
"endpoint");
572 << ((h.
list().
size() > 1) ?
"addresses" :
"address");
600 [&slots](Slots::value_type
const& value) {
601 if (value.second->state() == Slot::active)
602 slots.emplace_back(value.second);
612 targets.emplace_back(slot);
641 for (
auto& t : targets)
654 for (
auto const& t : targets)
657 auto const& list = t.list();
660 << slot->remote_endpoint() <<
" with " << list.size()
661 << ((list.size() == 1) ?
" endpoint" :
" endpoints");
680 for (
auto const& entry :
slots_)
681 entry.second->expire();
695 bool neighbor(
false);
696 for (
auto iter = list.
begin(); iter != list.
end();)
705 <<
" for excess hops " << ep.
hops;
706 iter = list.
erase(iter);
718 slot->remote_endpoint().at_port(ep.
address.
port());
724 <<
" for extra self";
725 iter = list.
erase(iter);
734 << ep.
address <<
" as invalid";
735 iter = list.
erase(iter);
743 [ep](Endpoints::value_type
const& other) {
744 return ep.address == other.address;
748 << ep.
address <<
" as duplicate";
749 iter = list.
erase(iter);
773 <<
beast::leftw(18) <<
"Endpoints from " << slot->remote_endpoint()
774 <<
" contained " << list.
size()
775 << ((list.
size() > 1) ?
" entries" :
" entry");
788 if (slot->whenAcceptEndpoints > now)
793 for (
auto const& ep : list)
795 assert(ep.hops != 0);
797 slot->recent.insert(ep.address, ep.hops);
804 if (slot->connectivityCheckInProgress)
808 <<
" already in progress";
815 slot->connectivityCheckInProgress =
true;
825 slot->remote_endpoint(),
827 std::placeholders::_1));
838 if (!slot->canAccept)
859 auto const iter =
slots_.
find(slot->remote_endpoint());
866 if (slot->public_key() != std::nullopt)
868 auto const iter =
keys_.find(*slot->public_key());
870 assert(iter !=
keys_.end());
894 if (slot->fixed() && !slot->inbound() && slot->state() !=
Slot::active)
896 auto iter(
fixed_.find(slot->remote_endpoint()));
897 assert(iter !=
fixed_.end());
900 << slot->remote_endpoint() <<
" failed";
904 switch (slot->state())
908 << slot->remote_endpoint() <<
" failed";
923 << slot->remote_endpoint();
928 << slot->remote_endpoint();
946 template <
class FwdIter>
951 boost::asio::ip::tcp::endpoint
const& remote_address);
960 for (
auto const& entry :
fixed_)
961 if (entry.first == endpoint)
972 for (
auto const& entry :
fixed_)
973 if (entry.first.address() == address)
985 template <
class Container>
993 for (
auto iter =
fixed_.begin(); needed && iter !=
fixed_.end(); ++iter)
995 auto const& address(iter->first.address());
996 if (iter->second.when() <= now &&
997 squelches.
find(address) == squelches.
end() &&
1001 [address](Slots::value_type
const& v) {
1002 return address == v.first.address();
1005 squelches.
insert(iter->first.address());
1006 c.push_back(iter->first);
1040 for (
auto addr : list)
1079 <<
beast::leftw(18) <<
"Logic added " << count <<
" new "
1080 << ((count == 1) ?
"address" :
"addresses") <<
" from "
1086 <<
"'" << source->name() <<
"' fetch, "
1087 << results.
error.message();
1101 if (is_unspecified(address))
1103 if (!is_public(address))
1105 if (address.
port() == 0)
1119 for (
auto const& entry : slots)
1122 SlotImp const& slot(*entry.second);
1127 item[
"inbound"] =
"yes";
1129 item[
"fixed"] =
"yes";
1131 item[
"reserved"] =
"yes";
1210 template <
class Checker>
1211 template <
class FwdIter>
1216 boost::asio::ip::tcp::endpoint
const& remote_address)
1224 JLOG(m_journal.trace()) <<
beast::leftw(18) <<
"Logic add " << n
1225 <<
" redirect IPs from " << remote_address;
Tests remote listening sockets to make sure they are connectible.
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
std::vector< std::shared_ptr< Source > > m_sources
bool inbound() const override
Returns true if this is an inbound connection.
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
Stream trace() const
Severity stream access functions.
Receives handouts for making automatic connections.
The Logic for maintaining the list of Slot addresses.
std::set< PublicKey > keys_
SlotImp::ptr new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
void on_failure(SlotImp::ptr const &slot)
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
constexpr std::chrono::seconds secondsPerMessage(151)
clock_type::time_point m_whenBroadcast
Receives handouts for redirecting a connection.
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
std::map< beast::IP::Endpoint, Fixed > fixed_
Address const & address() const
Returns the address portion of this endpoint.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool >>::type
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
constexpr std::uint32_t numberOfEndpointsMax
Counts const & counts() const
void periodicActivity()
Stores the cache in the persistent database on a timer.
Abstract persistence for PeerFinder data.
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
int inboundSlots() const
Returns the total number of inbound slots.
void load()
Load the persisted data from the Store into the container.
virtual time_point now() const =0
Returns the current time.
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
int out_max() const
Returns the total number of outbound slots.
Manages the count of available connections for the various slots.
std::vector< Endpoint > & list()
map_type::size_type size() const
Returns the number of entries in the cache.
bool wantIncoming
true if we want to accept incoming connections.
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
iterator find(K const &k)
boost::asio::ip::address Address
void addStaticSource(std::shared_ptr< Source > const &source)
std::recursive_mutex lock_
Stores IP addresses useful for gaining initial connections.
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
void onWrite(beast::PropertyStream::Map &map)
int out_active() const
Returns the number of outbound peers assigned an open slot.
const_iterator end() const
void shuffle()
Shuffle each hop list.
reverse_iterator rbegin()
std::multiset< beast::IP::Address > connectedAddresses_
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
beast::xor_shift_engine & default_prng()
Return the default random engine.
void stop()
Stop the logic.
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Port port() const
Returns the port number on the endpoint.
void onConfig(Config const &config)
Called when the config is set or changed.
bool fixed(beast::IP::Address const &address) const
The Livecache holds the short-lived relayed Endpoint messages.
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
A generic endpoint for log messages.
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
T forward_as_tuple(T... args)
constexpr std::chrono::seconds recentAttemptDuration(60)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
std::shared_ptr< Source > fetchSource_
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
void addSource(std::shared_ptr< Source > const &source)
boost::asio::ip::address_v6 AddressV6
boost::system::error_code error
void fetch(std::shared_ptr< Source > const &source)
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
static std::string stateString(Slot::State state)
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
void remove(SlotImp::ptr const &slot)
class ripple::PeerFinder::Livecache::hops_t hops
Left justifies a field at the specified width.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::uint16_t listeningPort
The listening port number.
Associative container where each element is also indexed by time.
std::size_t fixed_active() const
Returns the number of active fixed connections.
bool fixed(beast::IP::Endpoint const &endpoint) const
bool try_insert(beast::IP::Endpoint const &endpoint)
std::size_t attempts() const
Returns the number of outbound connection attempts.
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
void set_listening_port(std::uint16_t port)
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
void expire()
Erase entries whose time has expired.
SlotImp::ptr new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
bool is_valid_address(beast::IP::Endpoint const &address)
A version-independent IP address and port combination.
std::shared_ptr< SlotImp > ptr
bool autoConnect
true if we want to establish connections automatically
constexpr std::uint32_t maxHops
PeerFinder configuration settings.
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
void on_closed(SlotImp::ptr const &slot)
State state() const override
Returns the state of the connection.
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Describes a connectible peer address along with some metadata.
void config(Config const &c)
bool reserved() const override
Returns true if this is a reserved connection.
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
typename Clock::time_point time_point
ConnectHandouts::Squelches m_squelches
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
beast::IP::Endpoint address
int ipLimit
Limit how many incoming connections we allow per IP.
bool connectivityCheckInProgress
bool fixed() const override
Returns true if this is a fixed connection.
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
int addBootcacheAddresses(IPAddresses const &list)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
Result
Possible results from activating a slot.