20 #include <ripple/app/consensus/RCLValidations.h>
21 #include <ripple/app/ledger/InboundLedgers.h>
22 #include <ripple/app/ledger/InboundTransactions.h>
23 #include <ripple/app/ledger/LedgerMaster.h>
24 #include <ripple/app/ledger/TransactionMaster.h>
25 #include <ripple/app/misc/HashRouter.h>
26 #include <ripple/app/misc/LoadFeeTrack.h>
27 #include <ripple/app/misc/NetworkOPs.h>
28 #include <ripple/app/misc/Transaction.h>
29 #include <ripple/app/misc/ValidatorList.h>
30 #include <ripple/app/tx/apply.h>
31 #include <ripple/basics/UptimeClock.h>
32 #include <ripple/basics/base64.h>
33 #include <ripple/basics/random.h>
34 #include <ripple/basics/safe_cast.h>
35 #include <ripple/beast/core/LexicalCast.h>
36 #include <ripple/beast/core/SemanticVersion.h>
37 #include <ripple/nodestore/DatabaseShard.h>
38 #include <ripple/overlay/Cluster.h>
39 #include <ripple/overlay/impl/PeerImp.h>
40 #include <ripple/overlay/impl/Tuning.h>
41 #include <ripple/overlay/predicates.h>
42 #include <ripple/protocol/digest.h>
44 #include <boost/algorithm/string.hpp>
45 #include <boost/algorithm/string/predicate.hpp>
46 #include <boost/beast/core/ostream.hpp>
54 using namespace std::chrono_literals;
79 , sink_(app_.journal(
"Peer"), makePrefix(id))
80 , p_sink_(app_.journal(
"Protocol"), makePrefix(id))
83 , stream_ptr_(
std::move(stream_ptr))
84 , socket_(stream_ptr_->next_layer().socket())
85 , stream_(*stream_ptr_)
86 , strand_(socket_.get_executor())
88 , remote_address_(slot->remote_endpoint())
92 , tracking_(Tracking::unknown)
93 , trackingTime_(clock_type::now())
94 , publicKey_(publicKey)
95 , lastPingTime_(clock_type::now())
96 , creationTime_(clock_type::now())
97 , squelch_(app_.journal(
"Squelch"))
101 , request_(std::move(request))
103 , compressionEnabled_(
108 app_.config().COMPRESSION)
114 app_.config().TX_REDUCE_RELAY_ENABLE))
118 app_.config().VP_REDUCE_RELAY_ENABLE))
122 app_.config().LEDGER_REPLAY))
123 , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
125 JLOG(journal_.info()) <<
"compression enabled "
126 << (compressionEnabled_ == Compressed::On)
127 <<
" vp reduce-relay enabled "
128 << vpReduceRelayEnabled_
129 <<
" tx reduce-relay enabled "
130 << txReduceRelayEnabled_ <<
" on " << remote_address_
136 const bool inCluster{
cluster()};
159 if (!
strand_.running_in_this_thread())
162 auto parseLedgerHash =
176 if (
auto const iter =
headers_.find(
"Closed-Ledger");
179 closed = parseLedgerHash(iter->value().to_string());
182 fail(
"Malformed handshake data (1)");
185 if (
auto const iter =
headers_.find(
"Previous-Ledger");
188 previous = parseLedgerHash(iter->value().to_string());
191 fail(
"Malformed handshake data (2)");
194 if (previous && !closed)
195 fail(
"Malformed handshake data (3)");
217 if (!
strand_.running_in_this_thread())
243 if (!
strand_.running_in_this_thread())
250 auto validator = m->getValidatorKey();
251 if (validator && !
squelch_.expireSquelch(*validator))
255 safe_cast<TrafficCount::category>(m->getCategory()),
273 <<
" sendq: " << sendq_size;
281 boost::asio::async_write(
290 std::placeholders::_1,
291 std::placeholders::_2)));
297 if (!
strand_.running_in_this_thread())
303 protocol::TMHaveTransactions ht;
305 ht.add_hashes(hash.data(), hash.size());
309 send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
316 if (!
strand_.running_in_this_thread())
333 if (!
strand_.running_in_this_thread())
338 auto removed =
txQueue_.erase(hash);
350 fail(
"charge: Resources");
359 auto const iter =
headers_.find(
"Crawl");
362 return boost::iequals(iter->value(),
"public");
375 return headers_[
"User-Agent"].to_string();
376 return headers_[
"Server"].to_string();
388 ret[jss::inbound] =
true;
392 ret[jss::cluster] =
true;
400 ret[jss::server_domain] =
domain();
403 ret[jss::network_id] = nid;
408 ret[jss::version] = version;
419 std::chrono::duration_cast<std::chrono::seconds>(
uptime()).count());
424 if ((minSeq != 0) || (maxSeq != 0))
425 ret[jss::complete_ledgers] =
431 ret[jss::track] =
"diverged";
435 ret[jss::track] =
"unknown";
444 protocol::TMStatusChange last_status;
451 if (closedLedgerHash != beast::zero)
452 ret[jss::ledger] =
to_string(closedLedgerHash);
454 if (last_status.has_newstatus())
456 switch (last_status.newstatus())
458 case protocol::nsCONNECTING:
459 ret[jss::status] =
"connecting";
462 case protocol::nsCONNECTED:
463 ret[jss::status] =
"connected";
466 case protocol::nsMONITORING:
467 ret[jss::status] =
"monitoring";
470 case protocol::nsVALIDATING:
471 ret[jss::status] =
"validating";
474 case protocol::nsSHUTTING:
475 ret[jss::status] =
"shutting";
480 <<
"Unknown status: " << last_status.newstatus();
485 ret[jss::metrics][jss::total_bytes_recv] =
487 ret[jss::metrics][jss::total_bytes_sent] =
489 ret[jss::metrics][jss::avg_bps_recv] =
491 ret[jss::metrics][jss::avg_bps_sent] =
534 return boost::icl::contains(it->second.finalized(), shardIndex);
580 assert(
strand_.running_in_this_thread());
602 if (!
strand_.running_in_this_thread())
613 <<
" failed: " << reason;
621 assert(
strand_.running_in_this_thread());
641 assert(
strand_.running_in_this_thread());
648 stream_.async_shutdown(bind_executor(
658 timer_.expires_from_now(peerTimerInterval, ec);
665 timer_.async_wait(bind_executor(
695 if (ec == boost::asio::error::operation_aborted)
707 fail(
"Large send queue");
713 clock_type::duration duration;
734 fail(
"Ping Timeout");
741 protocol::TMPing message;
742 message.set_type(protocol::TMPing::ptPING);
745 send(std::make_shared<Message>(message, protocol::mtPING));
757 JLOG(
journal_.
error()) <<
"onShutdown: expected error condition";
760 if (ec != boost::asio::error::eof)
761 return fail(
"onShutdown", ec);
778 return fail(
"makeSharedValue: Unexpected failure");
799 auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
812 boost::asio::async_write(
814 write_buffer->data(),
815 boost::asio::transfer_all(),
822 if (ec == boost::asio::error::operation_aborted)
825 return fail(
"onWriteResponse", ec);
826 if (write_buffer->size() == bytes_transferred)
828 return fail(
"Failed to write header");
842 return headers_[
"Server-Domain"].to_string();
884 protocol::TMGetPeerShardInfoV2 tmGPS;
886 send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
897 if (ec == boost::asio::error::operation_aborted)
899 if (ec == boost::asio::error::eof)
905 return fail(
"onReadMessage", ec);
908 if (bytes_transferred > 0)
909 stream <<
"onReadMessage: " << bytes_transferred <<
" bytes";
911 stream <<
"onReadMessage";
914 metrics_.recv.add_message(bytes_transferred);
926 return fail(
"onReadMessage", ec);
931 if (bytes_consumed == 0)
944 std::placeholders::_1,
945 std::placeholders::_2)));
953 if (ec == boost::asio::error::operation_aborted)
956 return fail(
"onWriteMessage", ec);
959 if (bytes_transferred > 0)
960 stream <<
"onWriteMessage: " << bytes_transferred <<
" bytes";
962 stream <<
"onWriteMessage";
965 metrics_.sent.add_message(bytes_transferred);
972 return boost::asio::async_write(
981 std::placeholders::_1,
982 std::placeholders::_2)));
987 return stream_.async_shutdown(bind_executor(
992 std::placeholders::_1)));
1022 if ((type == MessageType::mtTRANSACTION ||
1023 type == MessageType::mtHAVE_TRANSACTIONS ||
1024 type == MessageType::mtTRANSACTIONS ||
1026 category == TrafficCount::category::get_transactions ||
1028 category == TrafficCount::category::ld_tsc_get ||
1029 category == TrafficCount::category::ld_tsc_share ||
1031 category == TrafficCount::category::gl_tsc_share ||
1032 category == TrafficCount::category::gl_tsc_get) &&
1036 static_cast<MessageType
>(type),
static_cast<std::uint64_t>(size));
1038 JLOG(
journal_.
trace()) <<
"onMessageBegin: " << type <<
" " << size <<
" "
1039 << uncompressed_size <<
" " << isCompressed;
1054 auto const s = m->list_size();
1074 if (m->type() == protocol::TMPing::ptPING)
1078 m->set_type(protocol::TMPing::ptPONG);
1079 send(std::make_shared<Message>(*m, protocol::mtPING));
1083 if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1093 auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1118 for (
int i = 0; i < m->clusternodes().size(); ++i)
1120 protocol::TMClusterNode
const& node = m->clusternodes(i);
1123 if (node.has_nodename())
1124 name = node.nodename();
1126 auto const publicKey =
1133 auto const reportTime =
1137 *publicKey,
name, node.nodeload(), reportTime);
1141 int loadSources = m->loadsources().size();
1142 if (loadSources != 0)
1145 gossip.
items.reserve(loadSources);
1146 for (
int i = 0; i < m->loadsources().size(); ++i)
1148 protocol::TMLoadSource
const& node = m->loadsources(i);
1153 gossip.
items.push_back(item);
1166 if (status.getReportTime() >= thresh)
1167 fees.push_back(status.getLoadFee());
1172 auto const index = fees.size() / 2;
1174 clusterFee = fees[index];
1202 return badData(
"Invalid relays");
1211 auto const peerChainSz{m->peerchain_size()};
1212 if (peerChainSz > 0)
1215 return badData(
"Invalid peer chain size");
1218 return badData(
"Invalid relays and peer chain size");
1220 for (
int i = 0; i < peerChainSz; ++i)
1222 auto const slice{
makeSlice(m->peerchain(i).publickey())};
1226 return badData(
"Invalid peer public key");
1229 if (!pubKeyChain.
emplace(slice).second)
1230 return badData(
"Invalid peer public key");
1237 auto reply{shardStore->getShardInfo()->makeMessage(
app_)};
1238 if (peerChainSz > 0)
1239 *(reply.mutable_peerchain()) = m->peerchain();
1240 send(std::make_shared<Message>(reply, protocol::mtPEER_SHARD_INFO_V2));
1243 if (m->relays() == 0)
1247 if (peerChainSz == 0)
1254 m->set_relays(m->relays() - 1);
1256 std::make_shared<Message>(*m, protocol::mtGET_PEER_SHARD_INFO_V2),
1258 return pubKeyChain.
find(peer->getNodePublic()) != pubKeyChain.
end();
1270 if (curLedgerSeq >= db.earliestLedgerSeq())
1271 return db.seqToShardIndex(curLedgerSeq);
1272 return std::nullopt;
1287 auto const timestamp{
1290 if (timestamp > (now + 5s))
1291 return badData(
"Invalid timestamp");
1294 using namespace std::chrono_literals;
1295 if (timestamp < (now - 5min))
1296 return badData(
"Stale timestamp");
1298 s.
add32(m->timestamp());
1303 auto const numIncomplete{m->incomplete_size()};
1304 if (numIncomplete > 0)
1306 if (latestShardIndex && numIncomplete > *latestShardIndex)
1307 return badData(
"Invalid number of incomplete shards");
1310 for (
int i = 0; i < numIncomplete; ++i)
1312 auto const& incomplete{m->incomplete(i)};
1313 auto const shardIndex{incomplete.shardindex()};
1316 if (shardIndex < earliestShardIndex ||
1317 (latestShardIndex && shardIndex > latestShardIndex))
1319 return badData(
"Invalid incomplete shard index");
1321 s.
add32(shardIndex);
1324 auto const state{
static_cast<ShardState>(incomplete.state())};
1336 return badData(
"Invalid incomplete shard state");
1338 s.
add32(incomplete.state());
1342 if (incomplete.has_progress())
1344 progress = incomplete.progress();
1345 if (progress < 1 || progress > 100)
1346 return badData(
"Invalid incomplete shard progress");
1351 if (!
shardInfo.update(shardIndex, state, progress))
1352 return badData(
"Invalid duplicate incomplete shards");
1357 if (m->has_finalized())
1359 auto const& str{m->finalized()};
1361 return badData(
"Invalid finalized shards");
1363 if (!
shardInfo.setFinalizedFromString(str))
1364 return badData(
"Invalid finalized shard indexes");
1367 auto const numFinalized{boost::icl::length(
finalized)};
1368 if (numFinalized == 0 ||
1369 boost::icl::first(
finalized) < earliestShardIndex ||
1370 (latestShardIndex &&
1371 boost::icl::last(
finalized) > latestShardIndex))
1373 return badData(
"Invalid finalized shard indexes");
1376 if (latestShardIndex &&
1377 (numFinalized + numIncomplete) > *latestShardIndex)
1379 return badData(
"Invalid number of finalized and incomplete shards");
1382 s.
addRaw(str.data(), str.size());
1388 return badData(
"Invalid public key");
1393 return badData(
"Invalid public key");
1397 return badData(
"Invalid signature");
1400 auto const peerChainSz{m->peerchain_size()};
1401 if (peerChainSz > 0)
1405 return badData(
"Invalid peer chain size");
1413 for (
int i = 0; i < peerChainSz; ++i)
1416 slice =
makeSlice(m->peerchain(i).publickey());
1418 return badData(
"Invalid peer public key");
1421 if (!pubKeyChain.
emplace(slice).second)
1422 return badData(
"Invalid peer public key");
1427 makeSlice(m->peerchain(peerChainSz - 1).publickey()));
1430 m->mutable_peerchain()->RemoveLast();
1432 std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO_V2));
1434 <<
"Relayed TMPeerShardInfoV2 from peer IP "
1436 << peer->getRemoteAddress().to_string();
1446 <<
"Consumed TMPeerShardInfoV2 originating from public key "
1449 << (
shardInfo.incomplete().empty() ?
"empty"
1459 else if (
shardInfo.msgTimestamp() > it->second.msgTimestamp())
1464 if (peerChainSz == 0)
1478 if (m->endpoints_v2().size() >= 1024)
1485 endpoints.
reserve(m->endpoints_v2().size());
1487 for (
auto const& tm : m->endpoints_v2())
1494 << tm.endpoint() <<
"}";
1511 if (!endpoints.
empty())
1534 <<
"Need network ledger";
1542 auto stx = std::make_shared<STTx const>(sit);
1543 uint256 txID = stx->getTransactionID();
1567 bool checkSignature =
true;
1570 if (!m->has_deferred() || !m->deferred())
1574 flags |= SF_TRUSTED;
1581 checkSignature =
false;
1588 <<
"No new transactions until synchronized";
1601 "recvTransaction->checkTransaction",
1606 if (
auto peer = weak.lock())
1607 peer->checkTransaction(flags, checkSignature, stx);
1614 <<
"Transaction invalid: " <<
strHex(m->rawtransaction())
1615 <<
". Exception: " << ex.
what();
1626 auto const itype{m->itype()};
1629 if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1630 return badData(
"Invalid ledger info type");
1635 return std::nullopt;
1638 if (itype == protocol::liTS_CANDIDATE)
1640 if (!m->has_ledgerhash())
1641 return badData(
"Invalid TX candidate set, missing TX set hash");
1644 !m->has_ledgerhash() && !m->has_ledgerseq() &&
1645 !(ltype && *ltype == protocol::ltCLOSED))
1647 return badData(
"Invalid request");
1651 if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1652 return badData(
"Invalid ledger type");
1656 return badData(
"Invalid ledger hash");
1659 if (m->has_ledgerseq())
1661 auto const ledgerSeq{m->ledgerseq()};
1671 using namespace std::chrono_literals;
1681 if (itype != protocol::liBASE)
1683 if (m->nodeids_size() <= 0)
1684 return badData(
"Invalid ledger node IDs");
1686 for (
auto const& nodeId : m->nodeids())
1689 return badData(
"Invalid SHAMap node ID");
1694 if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1695 return badData(
"Invalid query type");
1698 if (m->has_querydepth())
1701 itype == protocol::liBASE)
1703 return badData(
"Invalid query depth");
1710 if (
auto peer = weak.
lock())
1711 peer->processLedgerRequest(m);
1729 if (
auto peer = weak.
lock())
1732 peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1733 if (reply.has_error())
1735 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1736 peer->charge(Resource::feeInvalidRequest);
1738 peer->charge(Resource::feeRequestNoReply);
1742 peer->send(std::make_shared<Message>(
1743 reply, protocol::mtPROOF_PATH_RESPONSE));
1778 if (
auto peer = weak.
lock())
1781 peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1782 if (reply.has_error())
1784 if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1785 peer->charge(Resource::feeInvalidRequest);
1787 peer->charge(Resource::feeRequestNoReply);
1791 peer->send(std::make_shared<Message>(
1792 reply, protocol::mtREPLAY_DELTA_RESPONSE));
1823 return badData(
"Invalid ledger hash");
1827 auto const ledgerSeq{m->ledgerseq()};
1828 if (m->type() == protocol::liTS_CANDIDATE)
1847 using namespace std::chrono_literals;
1858 if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1859 return badData(
"Invalid ledger info type");
1862 if (m->has_error() &&
1863 (m->error() < protocol::reNO_LEDGER ||
1864 m->error() > protocol::reBAD_REQUEST))
1866 return badData(
"Invalid reply error");
1873 "Invalid Ledger/TXset nodes " +
std::to_string(m->nodes_size()));
1877 if (m->has_requestcookie())
1881 m->clear_requestcookie();
1882 peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1886 JLOG(
p_journal_.
info()) <<
"Unable to route TX/ledger data reply";
1891 uint256 const ledgerHash{m->ledgerhash()};
1894 if (m->type() == protocol::liTS_CANDIDATE)
1898 jtTXN_DATA,
"recvPeerData", [weak, ledgerHash, m]() {
1899 if (
auto peer = weak.lock())
1901 peer->app_.getInboundTransactions().gotData(
1902 ledgerHash, peer, m);
1915 protocol::TMProposeSet&
set = *m;
1921 if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1949 uint256 const proposeHash{
set.currenttxhash()};
1950 uint256 const prevLedger{
set.previousledger()};
1962 if (
auto [added, relayed] =
1971 suppression, publicKey,
id_, protocol::mtPROPOSE_LEDGER);
1981 <<
"Proposal: Dropping untrusted (peer divergence)";
1993 <<
"Proposal: " << (isTrusted ?
"trusted" :
"untrusted");
2010 "recvPropose->checkPropose",
2012 if (
auto peer = weak.lock())
2013 peer->checkPropose(isTrusted, m,
proposal);
2022 if (!m->has_networktime())
2027 if (!
last_status_.has_newstatus() || m->has_newstatus())
2032 protocol::NodeStatus status =
last_status_.newstatus();
2034 m->set_newstatus(status);
2038 if (m->newevent() == protocol::neLOST_SYNC)
2040 bool outOfSync{
false};
2061 bool const peerChangedLedgers{
2068 if (peerChangedLedgers)
2079 if (m->has_ledgerhashprevious() &&
2090 if (peerChangedLedgers)
2100 if (m->has_firstseq() && m->has_lastseq())
2111 if (m->has_ledgerseq() &&
2121 if (m->has_newstatus())
2123 switch (m->newstatus())
2125 case protocol::nsCONNECTING:
2126 j[jss::status] =
"CONNECTING";
2128 case protocol::nsCONNECTED:
2129 j[jss::status] =
"CONNECTED";
2131 case protocol::nsMONITORING:
2132 j[jss::status] =
"MONITORING";
2134 case protocol::nsVALIDATING:
2135 j[jss::status] =
"VALIDATING";
2137 case protocol::nsSHUTTING:
2138 j[jss::status] =
"SHUTTING";
2143 if (m->has_newevent())
2145 switch (m->newevent())
2147 case protocol::neCLOSING_LEDGER:
2148 j[jss::action] =
"CLOSING_LEDGER";
2150 case protocol::neACCEPTED_LEDGER:
2151 j[jss::action] =
"ACCEPTED_LEDGER";
2153 case protocol::neSWITCHED_LEDGER:
2154 j[jss::action] =
"SWITCHED_LEDGER";
2156 case protocol::neLOST_SYNC:
2157 j[jss::action] =
"LOST_SYNC";
2162 if (m->has_ledgerseq())
2164 j[jss::ledger_index] = m->ledgerseq();
2167 if (m->has_ledgerhash())
2169 uint256 closedLedgerHash{};
2171 std::lock_guard sl(recentLock_);
2172 closedLedgerHash = closedLedgerHash_;
2174 j[jss::ledger_hash] =
to_string(closedLedgerHash);
2177 if (m->has_networktime())
2179 j[jss::date] = Json::UInt(m->networktime());
2182 if (m->has_firstseq() && m->has_lastseq())
2184 j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2185 j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2201 serverSeq = maxLedger_;
2207 checkTracking(serverSeq, validationSeq);
2216 if (diff < Tuning::convergedLedgerLimit)
2219 tracking_ = Tracking::converged;
2222 if ((diff > Tuning::divergedLedgerLimit) &&
2223 (tracking_.load() != Tracking::diverged))
2228 tracking_ = Tracking::diverged;
2229 trackingTime_ = clock_type::now();
2238 fee_ = Resource::feeInvalidRequest;
2242 uint256 const hash{m->hash()};
2244 if (m->status() == protocol::tsHAVE)
2248 if (
std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2249 recentTxSets_.end())
2251 fee_ = Resource::feeUnwantedData;
2255 recentTxSets_.push_back(hash);
2260 PeerImp::onValidatorListMessage(
2270 JLOG(p_journal_.warn()) <<
"Ignored malformed " << messageType
2271 <<
" from peer " << remote_address_;
2273 fee_ = Resource::feeHighBurdenPeer;
2279 JLOG(p_journal_.debug())
2280 <<
"Received " << messageType <<
" from " << remote_address_.to_string()
2281 <<
" (" << id_ <<
")";
2283 if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2285 JLOG(p_journal_.debug())
2286 << messageType <<
": received duplicate " << messageType;
2290 fee_ = Resource::feeUnwantedData;
2294 auto const applyResult = app_.validators().applyListsAndBroadcast(
2298 remote_address_.to_string(),
2301 app_.getHashRouter(),
2304 JLOG(p_journal_.debug())
2305 <<
"Processed " << messageType <<
" version " << version <<
" from "
2306 << (applyResult.publisherKey ?
strHex(*applyResult.publisherKey)
2307 :
"unknown or invalid publisher")
2308 <<
" from " << remote_address_.to_string() <<
" (" << id_
2309 <<
") with best result " << to_string(applyResult.bestDisposition());
2312 switch (applyResult.bestDisposition())
2315 case ListDisposition::accepted:
2317 case ListDisposition::expired:
2319 case ListDisposition::pending: {
2322 assert(applyResult.publisherKey);
2323 auto const& pubKey = *applyResult.publisherKey;
2325 if (
auto const iter = publisherListSequences_.find(pubKey);
2326 iter != publisherListSequences_.end())
2328 assert(iter->second < applyResult.sequence);
2331 publisherListSequences_[pubKey] = applyResult.sequence;
2334 case ListDisposition::same_sequence:
2335 case ListDisposition::known_sequence:
2339 assert(applyResult.sequence && applyResult.publisherKey);
2341 publisherListSequences_[*applyResult.publisherKey] <=
2342 applyResult.sequence);
2347 case ListDisposition::stale:
2348 case ListDisposition::untrusted:
2349 case ListDisposition::invalid:
2350 case ListDisposition::unsupported_version:
2357 switch (applyResult.worstDisposition())
2359 case ListDisposition::accepted:
2360 case ListDisposition::expired:
2361 case ListDisposition::pending:
2364 case ListDisposition::same_sequence:
2365 case ListDisposition::known_sequence:
2369 fee_ = Resource::feeUnwantedData;
2371 case ListDisposition::stale:
2374 fee_ = Resource::feeBadData;
2376 case ListDisposition::untrusted:
2380 fee_ = Resource::feeUnwantedData;
2382 case ListDisposition::invalid:
2384 fee_ = Resource::feeInvalidSignature;
2386 case ListDisposition::unsupported_version:
2389 fee_ = Resource::feeBadData;
2396 for (
auto const& [disp, count] : applyResult.dispositions)
2401 case ListDisposition::accepted:
2402 JLOG(p_journal_.debug())
2403 <<
"Applied " << count <<
" new " << messageType
2404 <<
"(s) from peer " << remote_address_;
2407 case ListDisposition::expired:
2408 JLOG(p_journal_.debug())
2409 <<
"Applied " << count <<
" expired " << messageType
2410 <<
"(s) from peer " << remote_address_;
2413 case ListDisposition::pending:
2414 JLOG(p_journal_.debug())
2415 <<
"Processed " << count <<
" future " << messageType
2416 <<
"(s) from peer " << remote_address_;
2418 case ListDisposition::same_sequence:
2419 JLOG(p_journal_.warn())
2420 <<
"Ignored " << count <<
" " << messageType
2421 <<
"(s) with current sequence from peer "
2424 case ListDisposition::known_sequence:
2425 JLOG(p_journal_.warn())
2426 <<
"Ignored " << count <<
" " << messageType
2427 <<
"(s) with future sequence from peer " << remote_address_;
2429 case ListDisposition::stale:
2430 JLOG(p_journal_.warn())
2431 <<
"Ignored " << count <<
"stale " << messageType
2432 <<
"(s) from peer " << remote_address_;
2434 case ListDisposition::untrusted:
2435 JLOG(p_journal_.warn())
2436 <<
"Ignored " << count <<
" untrusted " << messageType
2437 <<
"(s) from peer " << remote_address_;
2439 case ListDisposition::unsupported_version:
2440 JLOG(p_journal_.warn())
2441 <<
"Ignored " << count <<
"unsupported version "
2442 << messageType <<
"(s) from peer " << remote_address_;
2444 case ListDisposition::invalid:
2445 JLOG(p_journal_.warn())
2446 <<
"Ignored " << count <<
"invalid " << messageType
2447 <<
"(s) from peer " << remote_address_;
2460 if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2462 JLOG(p_journal_.debug())
2463 <<
"ValidatorList: received validator list from peer using "
2464 <<
"protocol version " << to_string(protocol_)
2465 <<
" which shouldn't support this feature.";
2466 fee_ = Resource::feeUnwantedData;
2469 onValidatorListMessage(
2473 ValidatorList::parseBlobs(*m));
2477 JLOG(p_journal_.warn()) <<
"ValidatorList: Exception, " << e.
what()
2478 <<
" from peer " << remote_address_;
2479 fee_ = Resource::feeBadData;
2489 if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2491 JLOG(p_journal_.debug())
2492 <<
"ValidatorListCollection: received validator list from peer "
2493 <<
"using protocol version " << to_string(protocol_)
2494 <<
" which shouldn't support this feature.";
2495 fee_ = Resource::feeUnwantedData;
2498 else if (m->version() < 2)
2500 JLOG(p_journal_.debug())
2501 <<
"ValidatorListCollection: received invalid validator list "
2503 << m->version() <<
" from peer using protocol version "
2504 << to_string(protocol_);
2505 fee_ = Resource::feeBadData;
2508 onValidatorListMessage(
2509 "ValidatorListCollection",
2512 ValidatorList::parseBlobs(*m));
2516 JLOG(p_journal_.warn()) <<
"ValidatorListCollection: Exception, "
2517 << e.
what() <<
" from peer " << remote_address_;
2518 fee_ = Resource::feeBadData;
2525 if (m->validation().size() < 50)
2527 JLOG(p_journal_.warn()) <<
"Validation: Too small";
2528 fee_ = Resource::feeInvalidRequest;
2534 auto const closeTime = app_.timeKeeper().closeTime();
2539 val = std::make_shared<STValidation>(
2543 app_.validatorManifests().getMasterKey(pk));
2546 val->setSeen(closeTime);
2550 app_.getValidations().parms(),
2551 app_.timeKeeper().closeTime(),
2553 val->getSeenTime()))
2555 JLOG(p_journal_.trace()) <<
"Validation: Not current";
2556 fee_ = Resource::feeUnwantedData;
2563 auto const isTrusted =
2564 app_.validators().trusted(val->getSignerPublic());
2569 if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2574 if (
auto [added, relayed] =
2575 app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2582 if (reduceRelayReady() && relayed &&
2583 (
stopwatch().now() - *relayed) < reduce_relay::IDLED)
2584 overlay_.updateSlotAndSquelch(
2585 key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2586 JLOG(p_journal_.trace()) <<
"Validation: duplicate";
2590 if (!isTrusted && (tracking_.load() == Tracking::diverged))
2592 JLOG(p_journal_.debug())
2593 <<
"Dropping untrusted validation from diverged peer";
2595 else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2599 isTrusted ?
"Trusted validation" :
"Untrusted validation";
2604 to_string(val->getNodeID());
2611 app_.getJobQueue().addJob(
2614 [weak, val, m, key]() {
2615 if (
auto peer = weak.
lock())
2616 peer->checkValidation(val, key, m);
2621 JLOG(p_journal_.debug())
2622 <<
"Dropping untrusted validation for load";
2627 JLOG(p_journal_.warn())
2628 <<
"Exception processing validation: " << e.
what();
2629 fee_ = Resource::feeInvalidRequest;
2636 protocol::TMGetObjectByHash& packet = *m;
2638 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash " << packet.type()
2639 <<
" " << packet.objects_size();
2644 if (send_queue_.size() >= Tuning::dropSendQueue)
2646 JLOG(p_journal_.debug()) <<
"GetObject: Large send queue";
2650 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2656 if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2658 if (!txReduceRelayEnabled())
2660 JLOG(p_journal_.error())
2661 <<
"TMGetObjectByHash: tx reduce-relay is disabled";
2662 fee_ = Resource::feeInvalidRequest;
2667 app_.getJobQueue().addJob(
2669 if (
auto peer = weak.
lock())
2670 peer->doTransactions(m);
2675 fee_ = Resource::feeMediumBurdenPeer;
2677 protocol::TMGetObjectByHash reply;
2679 reply.set_query(
false);
2681 if (packet.has_seq())
2682 reply.set_seq(packet.seq());
2684 reply.set_type(packet.type());
2686 if (packet.has_ledgerhash())
2690 fee_ = Resource::feeInvalidRequest;
2694 reply.set_ledgerhash(packet.ledgerhash());
2698 for (
int i = 0; i < packet.objects_size(); ++i)
2700 auto const& obj = packet.objects(i);
2703 uint256 const hash{obj.hash()};
2706 std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2707 auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2710 if (
auto shardStore = app_.getShardStore())
2712 if (seq >= shardStore->earliestLedgerSeq())
2713 nodeObject = shardStore->fetchNodeObject(hash, seq);
2718 protocol::TMIndexedObject& newObj = *reply.add_objects();
2719 newObj.set_hash(hash.begin(), hash.size());
2721 &nodeObject->getData().front(),
2722 nodeObject->getData().size());
2724 if (obj.has_nodeid())
2725 newObj.set_index(obj.nodeid());
2726 if (obj.has_ledgerseq())
2727 newObj.set_ledgerseq(obj.ledgerseq());
2734 JLOG(p_journal_.trace()) <<
"GetObj: " << reply.objects_size() <<
" of "
2735 << packet.objects_size();
2736 send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2743 bool progress =
false;
2745 for (
int i = 0; i < packet.objects_size(); ++i)
2747 const protocol::TMIndexedObject& obj = packet.objects(i);
2751 if (obj.has_ledgerseq())
2753 if (obj.ledgerseq() != pLSeq)
2755 if (pLDo && (pLSeq != 0))
2757 JLOG(p_journal_.debug())
2758 <<
"GetObj: Full fetch pack for " << pLSeq;
2760 pLSeq = obj.ledgerseq();
2761 pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2765 JLOG(p_journal_.debug())
2766 <<
"GetObj: Late fetch pack for " << pLSeq;
2775 uint256 const hash{obj.hash()};
2777 app_.getLedgerMaster().addFetchPack(
2779 std::make_shared<Blob>(
2780 obj.data().begin(), obj.data().end()));
2785 if (pLDo && (pLSeq != 0))
2787 JLOG(p_journal_.debug())
2788 <<
"GetObj: Partial fetch pack for " << pLSeq;
2790 if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2791 app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2798 if (!txReduceRelayEnabled())
2800 JLOG(p_journal_.error())
2801 <<
"TMHaveTransactions: tx reduce-relay is disabled";
2802 fee_ = Resource::feeInvalidRequest;
2807 app_.getJobQueue().addJob(
2809 if (
auto peer = weak.
lock())
2810 peer->handleHaveTransactions(m);
2815 PeerImp::handleHaveTransactions(
2818 protocol::TMGetObjectByHash tmBH;
2819 tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2820 tmBH.set_query(
true);
2822 JLOG(p_journal_.trace())
2823 <<
"received TMHaveTransactions " << m->hashes_size();
2829 JLOG(p_journal_.error())
2830 <<
"TMHaveTransactions with invalid hash size";
2831 fee_ = Resource::feeInvalidRequest;
2837 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2839 JLOG(p_journal_.trace()) <<
"checking transaction " << (bool)txn;
2843 JLOG(p_journal_.debug()) <<
"adding transaction to request";
2845 auto obj = tmBH.add_objects();
2846 obj->set_hash(hash.
data(), hash.
size());
2853 removeTxQueue(hash);
2857 JLOG(p_journal_.trace())
2858 <<
"transaction request object is " << tmBH.objects_size();
2860 if (tmBH.objects_size() > 0)
2861 send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2867 if (!txReduceRelayEnabled())
2869 JLOG(p_journal_.error())
2870 <<
"TMTransactions: tx reduce-relay is disabled";
2871 fee_ = Resource::feeInvalidRequest;
2875 JLOG(p_journal_.trace())
2876 <<
"received TMTransactions " << m->transactions_size();
2878 overlay_.addTxMetrics(m->transactions_size());
2883 m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2890 using on_message_fn =
2892 if (!strand_.running_in_this_thread())
2896 (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2898 if (!m->has_validatorpubkey())
2900 charge(Resource::feeBadData);
2903 auto validator = m->validatorpubkey();
2907 charge(Resource::feeBadData);
2913 if (!app_.validators().listed(key))
2915 charge(Resource::feeBadData);
2916 JLOG(p_journal_.debug())
2917 <<
"onMessage: TMSquelch discarding non-validator squelch "
2923 if (key == app_.getValidationPublicKey())
2925 JLOG(p_journal_.debug())
2926 <<
"onMessage: TMSquelch discarding validator's squelch " << slice;
2931 m->has_squelchduration() ? m->squelchduration() : 0;
2933 squelch_.removeSquelch(key);
2935 charge(Resource::feeBadData);
2937 JLOG(p_journal_.debug())
2938 <<
"onMessage: TMSquelch " << slice <<
" " << id() <<
" " << duration;
2950 (void)lockedRecentLock;
2952 if (
std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2953 recentLedgers_.end())
2956 recentLedgers_.push_back(hash);
2965 if (app_.getFeeTrack().isLoadedLocal() ||
2966 (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2967 (app_.getJobQueue().getJobCount(
jtPACK) > 10))
2969 JLOG(p_journal_.info()) <<
"Too busy to make fetch pack";
2975 JLOG(p_journal_.warn()) <<
"FetchPack hash size malformed";
2976 fee_ = Resource::feeInvalidRequest;
2980 fee_ = Resource::feeHighBurdenPeer;
2982 uint256 const hash{packet->ledgerhash()};
2985 auto elapsed = UptimeClock::now();
2986 auto const pap = &app_;
2987 app_.getJobQueue().addJob(
2988 jtPACK,
"MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2989 pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2994 PeerImp::doTransactions(
2997 protocol::TMTransactions reply;
2999 JLOG(p_journal_.trace()) <<
"received TMGetObjectByHash requesting tx "
3000 << packet->objects_size();
3002 if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
3004 JLOG(p_journal_.error()) <<
"doTransactions, invalid number of hashes";
3005 fee_ = Resource::feeInvalidRequest;
3011 auto const& obj = packet->objects(i);
3015 fee_ = Resource::feeInvalidRequest;
3021 auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
3025 JLOG(p_journal_.error()) <<
"doTransactions, transaction not found "
3027 fee_ = Resource::feeInvalidRequest;
3032 auto tx = reply.add_transactions();
3033 auto sttx = txn->getSTransaction();
3035 tx->set_rawtransaction(s.
data(), s.
size());
3037 txn->getStatus() ==
INCLUDED ? protocol::tsCURRENT
3039 tx->set_receivetimestamp(
3040 app_.timeKeeper().now().time_since_epoch().count());
3041 tx->set_deferred(txn->getSubmitResult().queued);
3044 if (reply.transactions_size() > 0)
3045 send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
3049 PeerImp::checkTransaction(
3051 bool checkSignature,
3060 app_.getLedgerMaster().getValidLedgerIndex()))
3062 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3063 charge(Resource::feeUnwantedData);
3071 app_.getHashRouter(),
3073 app_.getLedgerMaster().getValidatedRules(),
3075 valid != Validity::Valid)
3077 if (!validReason.empty())
3079 JLOG(p_journal_.trace())
3080 <<
"Exception checking transaction: " << validReason;
3084 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3085 charge(Resource::feeInvalidSignature);
3092 app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3096 auto tx = std::make_shared<Transaction>(stx, reason, app_);
3098 if (tx->getStatus() ==
INVALID)
3100 if (!reason.
empty())
3102 JLOG(p_journal_.trace())
3103 <<
"Exception checking transaction: " << reason;
3105 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3106 charge(Resource::feeInvalidSignature);
3110 bool const trusted(flags & SF_TRUSTED);
3111 app_.getOPs().processTransaction(
3112 tx, trusted,
false, NetworkOPs::FailHard::no);
3116 JLOG(p_journal_.warn())
3117 <<
"Exception in " << __func__ <<
": " << ex.
what();
3118 app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3119 charge(Resource::feeBadData);
3125 PeerImp::checkPropose(
3130 JLOG(p_journal_.trace())
3131 <<
"Checking " << (isTrusted ?
"trusted" :
"UNTRUSTED") <<
" proposal";
3137 JLOG(p_journal_.warn()) <<
"Proposal fails sig check";
3138 charge(Resource::feeInvalidSignature);
3145 relay = app_.getOPs().processTrustedProposal(peerPos);
3147 relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3155 auto haveMessage = app_.overlay().relay(
3157 if (reduceRelayReady() && !haveMessage.empty())
3158 overlay_.updateSlotAndSquelch(
3161 std::move(haveMessage),
3162 protocol::mtPROPOSE_LEDGER);
3167 PeerImp::checkValidation(
3172 if (!val->isValid())
3174 JLOG(p_journal_.debug()) <<
"Validation forwarded by peer is invalid";
3175 charge(Resource::feeInvalidSignature);
3190 overlay_.relay(*packet, key, val->getSignerPublic());
3191 if (reduceRelayReady() && !haveMessage.empty())
3193 overlay_.updateSlotAndSquelch(
3195 val->getSignerPublic(),
3196 std::move(haveMessage),
3197 protocol::mtVALIDATION);
3203 JLOG(p_journal_.trace())
3204 <<
"Exception processing validation: " << ex.
what();
3205 charge(Resource::feeInvalidRequest);
3219 if (p->hasTxSet(rootHash) && p.get() != skip)
3221 auto score = p->getScore(true);
3222 if (!ret || (score > retScore))
3247 if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3249 auto score = p->getScore(true);
3250 if (!ret || (score > retScore))
3262 PeerImp::sendLedgerBase(
3264 protocol::TMLedgerData& ledgerData)
3266 JLOG(p_journal_.trace()) <<
"sendLedgerBase: Base data";
3269 addRaw(ledger->info(), s);
3272 auto const& stateMap{ledger->stateMap()};
3273 if (stateMap.getHash() != beast::zero)
3278 stateMap.serializeRoot(
root);
3279 ledgerData.add_nodes()->set_nodedata(
3280 root.getDataPtr(),
root.getLength());
3282 if (ledger->info().txHash != beast::zero)
3284 auto const& txMap{ledger->txMap()};
3285 if (txMap.getHash() != beast::zero)
3289 txMap.serializeRoot(
root);
3290 ledgerData.add_nodes()->set_nodedata(
3291 root.getDataPtr(),
root.getLength());
3297 std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3304 JLOG(p_journal_.trace()) <<
"getLedger: Ledger";
3308 if (m->has_ledgerhash())
3311 uint256 const ledgerHash{m->ledgerhash()};
3312 ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3315 if (m->has_ledgerseq())
3318 if (
auto shards = app_.getShardStore())
3320 if (m->ledgerseq() >= shards->earliestLedgerSeq())
3323 shards->fetchLedger(ledgerHash, m->ledgerseq());
3330 JLOG(p_journal_.trace())
3331 <<
"getLedger: Don't have ledger with hash " << ledgerHash;
3333 if (m->has_querytype() && !m->has_requestcookie())
3339 m->has_ledgerseq() ? m->ledgerseq() : 0,
3342 m->set_requestcookie(
id());
3343 peer->send(std::make_shared<Message>(
3344 *m, protocol::mtGET_LEDGER));
3345 JLOG(p_journal_.debug())
3346 <<
"getLedger: Request relayed to peer";
3350 JLOG(p_journal_.trace())
3351 <<
"getLedger: Failed to find peer to relay request";
3356 else if (m->has_ledgerseq())
3359 if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3361 JLOG(p_journal_.debug())
3362 <<
"getLedger: Early ledger sequence request";
3366 ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3369 JLOG(p_journal_.debug())
3370 <<
"getLedger: Don't have ledger with sequence "
3375 else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3377 ledger = app_.getLedgerMaster().getClosedLedger();
3383 auto const ledgerSeq{ledger->info().seq};
3384 if (m->has_ledgerseq())
3386 if (ledgerSeq != m->ledgerseq())
3389 if (!m->has_requestcookie())
3390 charge(Resource::feeInvalidRequest);
3393 JLOG(p_journal_.warn())
3394 <<
"getLedger: Invalid ledger sequence " << ledgerSeq;
3397 else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3400 JLOG(p_journal_.debug())
3401 <<
"getLedger: Early ledger sequence request " << ledgerSeq;
3406 JLOG(p_journal_.debug()) <<
"getLedger: Unable to find ledger";
3415 JLOG(p_journal_.trace()) <<
"getTxSet: TX set";
3417 uint256 const txSetHash{m->ledgerhash()};
3419 app_.getInboundTransactions().getSet(txSetHash,
false)};
3422 if (m->has_querytype() && !m->has_requestcookie())
3427 m->set_requestcookie(
id());
3429 std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3430 JLOG(p_journal_.debug()) <<
"getTxSet: Request relayed";
3434 JLOG(p_journal_.debug())
3435 <<
"getTxSet: Failed to find relay peer";
3440 JLOG(p_journal_.debug()) <<
"getTxSet: Failed to find TX set";
3451 if (!m->has_requestcookie())
3452 charge(Resource::feeMediumBurdenPeer);
3456 SHAMap const* map{
nullptr};
3457 protocol::TMLedgerData ledgerData;
3458 bool fatLeaves{
true};
3459 auto const itype{m->itype()};
3461 if (itype == protocol::liTS_CANDIDATE)
3463 if (sharedMap = getTxSet(m); !sharedMap)
3465 map = sharedMap.
get();
3468 ledgerData.set_ledgerseq(0);
3469 ledgerData.set_ledgerhash(m->ledgerhash());
3470 ledgerData.set_type(protocol::liTS_CANDIDATE);
3471 if (m->has_requestcookie())
3472 ledgerData.set_requestcookie(m->requestcookie());
3479 if (send_queue_.size() >= Tuning::dropSendQueue)
3481 JLOG(p_journal_.debug())
3482 <<
"processLedgerRequest: Large send queue";
3485 if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3487 JLOG(p_journal_.debug()) <<
"processLedgerRequest: Too busy";
3491 if (ledger = getLedger(m); !ledger)
3495 auto const ledgerHash{ledger->info().hash};
3496 ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3497 ledgerData.set_ledgerseq(ledger->info().seq);
3498 ledgerData.set_type(itype);
3499 if (m->has_requestcookie())
3500 ledgerData.set_requestcookie(m->requestcookie());
3504 case protocol::liBASE:
3505 sendLedgerBase(ledger, ledgerData);
3508 case protocol::liTX_NODE:
3509 map = &ledger->txMap();
3510 JLOG(p_journal_.trace()) <<
"processLedgerRequest: TX map hash "
3511 << to_string(map->getHash());
3514 case protocol::liAS_NODE:
3515 map = &ledger->stateMap();
3516 JLOG(p_journal_.trace())
3517 <<
"processLedgerRequest: Account state map hash "
3518 << to_string(map->getHash());
3523 JLOG(p_journal_.error())
3524 <<
"processLedgerRequest: Invalid ledger info type";
3531 JLOG(p_journal_.warn()) <<
"processLedgerRequest: Unable to find map";
3536 if (m->nodeids_size() > 0)
3538 auto const queryDepth{
3539 m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3543 for (
int i = 0; i < m->nodeids_size() &&
3544 ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3550 data.reserve(Tuning::softMaxReplyNodes);
3554 if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3556 JLOG(p_journal_.trace())
3557 <<
"processLedgerRequest: getNodeFat got "
3558 << data.size() <<
" nodes";
3560 for (
auto const& d : data)
3562 protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3563 node->set_nodeid(d.first.getRawString());
3564 node->set_nodedata(d.second.data(), d.second.size());
3569 JLOG(p_journal_.warn())
3570 <<
"processLedgerRequest: getNodeFat returns false";
3578 case protocol::liBASE:
3580 info =
"Ledger base";
3583 case protocol::liTX_NODE:
3587 case protocol::liAS_NODE:
3591 case protocol::liTS_CANDIDATE:
3592 info =
"TS candidate";
3600 if (!m->has_ledgerhash())
3601 info +=
", no hash specified";
3603 JLOG(p_journal_.error())
3604 <<
"processLedgerRequest: getNodeFat with nodeId "
3605 << *shaMapNodeId <<
" and ledger info type " << info
3606 <<
" throws exception: " << e.
what();
3610 JLOG(p_journal_.info())
3611 <<
"processLedgerRequest: Got request for " << m->nodeids_size()
3612 <<
" nodes at depth " << queryDepth <<
", return "
3613 << ledgerData.nodes_size() <<
" nodes";
3616 send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3620 PeerImp::getScore(
bool haveItem)
const
3624 static const int spRandomMax = 9999;
3628 static const int spHaveItem = 10000;
3633 static const int spLatency = 30;
3636 static const int spNoLatency = 8000;
3641 score += spHaveItem;
3650 score -= latency->count() * spLatency;
3652 score -= spNoLatency;
3658 PeerImp::isHighLatency()
const
3661 return latency_ >= peerHighLatency;
3665 PeerImp::reduceRelayReady()
3667 if (!reduceRelayReady_)
3669 reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3670 reduce_relay::WAIT_ON_BOOTUP;
3671 return vpReduceRelayEnabled_ && reduceRelayReady_;
3677 using namespace std::chrono_literals;
3680 totalBytes_ += bytes;
3681 accumBytes_ += bytes;
3682 auto const timeElapsed = clock_type::now() - intervalStart_;
3683 auto const timeElapsedInSecs =
3684 std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3686 if (timeElapsedInSecs >= 1s)
3688 auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3689 rollingAvg_.push_back(avgBytes);
3691 auto const totalBytes =
3693 rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3695 intervalStart_ = clock_type::now();
3701 PeerImp::Metrics::average_bytes()
const
3704 return rollingAvgBytes_;
3708 PeerImp::Metrics::total_bytes()
const