20 #include <ripple/app/ledger/AccountStateSF.h>
21 #include <ripple/app/ledger/InboundLedger.h>
22 #include <ripple/app/ledger/InboundLedgers.h>
23 #include <ripple/app/ledger/LedgerMaster.h>
24 #include <ripple/app/ledger/TransactionStateSF.h>
25 #include <ripple/app/main/Application.h>
26 #include <ripple/app/misc/NetworkOPs.h>
27 #include <ripple/basics/Log.h>
28 #include <ripple/core/JobQueue.h>
29 #include <ripple/nodestore/DatabaseShard.h>
30 #include <ripple/overlay/Overlay.h>
31 #include <ripple/protocol/HashPrefix.h>
32 #include <ripple/protocol/jss.h>
33 #include <ripple/resource/Fees.h>
34 #include <ripple/shamap/SHAMapNodeID.h>
36 #include <boost/iterator/function_output_iterator.hpp>
43 using namespace std::chrono_literals;
89 app.journal(
"InboundLedger"))
93 , mHaveTransactions(
false)
98 , mReceiveDispatched(
false)
99 , mPeerSet(std::move(peerSet))
101 JLOG(journal_.trace()) <<
"Acquiring ledger " << hash_;
123 <<
"Acquiring shard with no shard store available";
137 else if (shardStore &&
mSeq >= shardStore->earliestLedgerSeq())
139 if (
auto l = shardStore->fetchLedger(
hash_,
mSeq))
156 JLOG(
journal_.
debug()) <<
"Acquiring ledger we already have in "
157 <<
" local store. " <<
hash_;
176 auto const& peerIds =
mPeerSet->getPeerIds();
177 return std::count_if(peerIds.begin(), peerIds.end(), [
this](
auto id) {
178 return (app_.overlay().findPeerByShortID(id) != nullptr);
188 if ((seq != 0) && (
mSeq == 0))
222 if (entry.second->type() == protocol::liAS_NODE)
228 <<
"Acquire " <<
hash_ <<
" abort "
253 for (
auto const& n : mn)
311 auto makeLedger = [&,
this](
Blob const& data) {
312 JLOG(
journal_.
trace()) <<
"Ledger header found in fetch pack";
313 mLedger = std::make_shared<Ledger>(
324 <<
" cannot be a ledger";
333 JLOG(
journal_.
trace()) <<
"Ledger header found in local store";
335 makeLedger(nodeObject->getData());
340 auto& dstDB{
mLedger->stateMap().family().db()};
343 Blob blob{nodeObject->getData()};
355 JLOG(
journal_.
trace()) <<
"Ledger header found in fetch pack";
362 mLedger->stateMap().family().db().store(
375 if (
mLedger->info().txHash.isZero())
384 if (
mLedger->txMap().fetchRoot(
398 if (
mLedger->info().accountHash.isZero())
401 <<
"We are acquiring a ledger with a zero account hash";
407 if (
mLedger->stateMap().fetchRoot(
408 SHAMapHash{mLedger->info().accountHash}, &filter))
467 <<
"No progress(" << pc <<
") for ledger " <<
hash_;
487 [
this](
auto peer) {
return peer->hasLedger(
hash_,
mSeq); },
543 if (self->complete_ && !self->failed_)
545 self->app_.getLedgerMaster().checkAccept(self->getLedger());
546 self->app_.getLedgerMaster().tryAdvance();
549 self->app_.getInboundLedgers().logFailure(
550 self->hash_, self->mSeq);
564 <<
"Trigger on ledger: " <<
hash_ << (
complete_ ?
" completed" :
"")
572 stream <<
"Trigger acquiring ledger " <<
hash_ <<
" from " << peer;
574 stream <<
"Trigger acquiring ledger " <<
hash_;
595 protocol::TMGetLedger tmGL;
601 tmGL.set_querytype(protocol::qtINDIRECT);
610 protocol::TMGetObjectByHash tmBH;
611 bool typeSet =
false;
612 tmBH.set_query(
true);
614 for (
auto const& p : need)
620 tmBH.set_type(p.first);
624 if (p.first == tmBH.type())
626 protocol::TMIndexedObject* io = tmBH.add_objects();
627 io->set_hash(p.second.begin(), p.second.size());
629 io->set_ledgerseq(
mSeq);
634 std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
635 auto const& peerIds =
mPeerSet->getPeerIds();
637 peerIds.begin(), peerIds.end(), [
this, &packet](
auto id) {
638 if (auto p = app_.overlay().findPeerByShortID(id))
648 <<
"getNeededHashes says acquire is complete";
659 if (!mHaveHeader && !failed_)
661 tmGL.set_itype(protocol::liBASE);
663 tmGL.set_ledgerseq(mSeq);
664 JLOG(journal_.trace()) <<
"Sending header request to "
665 << (peer ?
"selected peer" :
"all peers");
666 mPeerSet->sendRequest(tmGL, peer);
671 tmGL.set_ledgerseq(mLedger->info().seq);
673 if (reason != TriggerReason::reply)
676 tmGL.set_querydepth(0);
678 else if (peer && peer->isHighLatency())
681 tmGL.set_querydepth(2);
684 tmGL.set_querydepth(1);
688 if (mHaveHeader && !mHaveState && !failed_)
692 if (!mLedger->stateMap().isValid())
696 else if (mLedger->stateMap().getHash().isZero())
699 tmGL.set_itype(protocol::liAS_NODE);
700 *tmGL.add_nodeids() = SHAMapNodeID().getRawString();
701 JLOG(journal_.trace()) <<
"Sending AS root request to "
702 << (peer ?
"selected peer" :
"all peers");
703 mPeerSet->sendRequest(tmGL, peer);
708 AccountStateSF filter(
709 mLedger->stateMap().family().db(), app_.getLedgerMaster());
718 if (!failed_ && !complete_ && !mHaveState)
722 if (!mLedger->stateMap().isValid())
728 if (mHaveTransactions)
734 filterNodes(nodes, reason);
738 tmGL.set_itype(protocol::liAS_NODE);
739 for (
auto const&
id : nodes)
741 *(tmGL.add_nodeids()) =
id.first.getRawString();
744 JLOG(journal_.trace())
745 <<
"Sending AS node request (" << nodes.size()
747 << (peer ?
"selected peer" :
"all peers");
748 mPeerSet->sendRequest(tmGL, peer);
753 JLOG(journal_.trace()) <<
"All AS nodes filtered";
760 if (mHaveHeader && !mHaveTransactions && !failed_)
764 if (!mLedger->txMap().isValid())
768 else if (mLedger->txMap().getHash().isZero())
771 tmGL.set_itype(protocol::liTX_NODE);
772 *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
773 JLOG(journal_.trace()) <<
"Sending TX root request to "
774 << (peer ?
"selected peer" :
"all peers");
775 mPeerSet->sendRequest(tmGL, peer);
780 TransactionStateSF filter(
781 mLedger->txMap().family().db(), app_.getLedgerMaster());
788 if (!mLedger->txMap().isValid())
792 mHaveTransactions =
true;
800 filterNodes(nodes, reason);
804 tmGL.set_itype(protocol::liTX_NODE);
805 for (
auto const& n : nodes)
807 *(tmGL.add_nodeids()) = n.first.getRawString();
809 JLOG(journal_.trace())
810 <<
"Sending TX node request (" << nodes.size()
811 <<
") to " << (peer ?
"selected peer" :
"all peers");
812 mPeerSet->sendRequest(tmGL, peer);
817 JLOG(journal_.trace()) <<
"All TX nodes filtered";
823 if (complete_ || failed_)
825 JLOG(journal_.debug())
826 <<
"Done:" << (complete_ ?
" complete" :
"")
827 << (failed_ ?
" failed " :
" ") << mLedger->info().seq;
834 InboundLedger::filterNodes(
841 nodes.begin(), nodes.end(), [
this](
auto const& item) {
842 return mRecentNodes.count(item.second) == 0;
848 if (dup == nodes.begin())
850 JLOG(journal_.trace()) <<
"filterNodes: all duplicates";
852 if (reason != TriggerReason::timeout)
860 JLOG(journal_.trace()) <<
"filterNodes: pruning duplicates";
862 nodes.erase(dup, nodes.end());
868 if (nodes.size() > limit)
871 for (
auto const& n : nodes)
872 mRecentNodes.insert(n.second);
883 JLOG(journal_.trace()) <<
"got header acquiring ledger " << hash_;
885 if (complete_ || failed_ || mHaveHeader)
888 auto* f = mReason == Reason::SHARD ? app_.getShardFamily()
889 : &app_.getNodeFamily();
890 mLedger = std::make_shared<Ledger>(
892 if (mLedger->info().hash != hash_ ||
893 (mSeq != 0 && mSeq != mLedger->info().seq))
895 JLOG(journal_.warn())
896 <<
"Acquire hash mismatch: " << mLedger->info().hash
902 mSeq = mLedger->info().seq;
903 mLedger->stateMap().setLedgerSeq(mSeq);
904 mLedger->txMap().setLedgerSeq(mSeq);
908 s.
add32(HashPrefix::ledgerMaster);
909 s.
addRaw(data.data(), data.size());
912 if (mLedger->info().txHash.isZero())
913 mHaveTransactions =
true;
915 if (mLedger->info().accountHash.isZero())
918 mLedger->txMap().setSynching();
919 mLedger->stateMap().setSynching();
928 InboundLedger::receiveNode(protocol::TMLedgerData& packet,
SHAMapAddNode& san)
932 JLOG(journal_.warn()) <<
"Missing ledger header";
936 if (packet.type() == protocol::liTX_NODE)
938 if (mHaveTransactions || failed_)
944 else if (mHaveState || failed_)
950 auto [map, rootHash, filter] = [&]()
952 if (packet.type() == protocol::liTX_NODE)
956 std::make_unique<TransactionStateSF>(
957 mLedger->txMap().family().db(), app_.getLedgerMaster())};
961 std::make_unique<AccountStateSF>(
962 mLedger->stateMap().family().db(), app_.getLedgerMaster())};
967 auto const f = filter.get();
969 for (
auto const& node : packet.nodes())
976 if (nodeID->isRoot())
978 san += map.addRootNode(rootHash,
makeSlice(node.nodedata()), f);
982 san += map.addKnownNode(*nodeID,
makeSlice(node.nodedata()), f);
987 JLOG(journal_.warn()) <<
"Received bad node data";
994 JLOG(journal_.error()) <<
"Received bad node data: " << e.
what();
999 if (!map.isSynching())
1001 if (packet.type() == protocol::liTX_NODE)
1002 mHaveTransactions =
true;
1006 if (mHaveTransactions && mHaveState)
1020 if (failed_ || mHaveState)
1033 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1034 san += mLedger->stateMap().addRootNode(
1035 SHAMapHash{mLedger->info().accountHash}, data, &filter);
1045 if (failed_ || mHaveTransactions)
1058 mLedger->txMap().family().db(), app_.getLedgerMaster());
1059 san += mLedger->txMap().addRootNode(
1060 SHAMapHash{mLedger->info().txHash}, data, &filter);
1065 InboundLedger::getNeededHashes()
1079 mLedger->stateMap().family().db(), app_.getLedgerMaster());
1080 for (
auto const& h : neededStateHashes(4, &filter))
1087 if (!mHaveTransactions)
1090 mLedger->txMap().family().db(), app_.getLedgerMaster());
1091 for (
auto const& h : neededTxHashes(4, &filter))
1094 protocol::TMGetObjectByHash::otTRANSACTION_NODE, h));
1105 InboundLedger::gotData(
1114 mReceivedData.emplace_back(peer, data);
1116 if (mReceiveDispatched)
1119 mReceiveDispatched =
true;
1132 InboundLedger::processData(
1134 protocol::TMLedgerData& packet)
1136 if (packet.type() == protocol::liBASE)
1138 if (packet.nodes().empty())
1140 JLOG(journal_.warn()) << peer->id() <<
": empty header data";
1141 peer->charge(Resource::feeInvalidRequest);
1153 if (!takeHeader(packet.nodes(0).nodedata()))
1155 JLOG(journal_.warn()) <<
"Got invalid header data";
1156 peer->charge(Resource::feeInvalidRequest);
1163 if (!mHaveState && (packet.nodes().size() > 1) &&
1164 !takeAsRootNode(
makeSlice(packet.nodes(1).nodedata()), san))
1166 JLOG(journal_.warn()) <<
"Included AS root invalid";
1169 if (!mHaveTransactions && (packet.nodes().size() > 2) &&
1170 !takeTxRootNode(
makeSlice(packet.nodes(2).nodedata()), san))
1172 JLOG(journal_.warn()) <<
"Included TX root invalid";
1177 JLOG(journal_.warn())
1178 <<
"Included AS/TX root invalid: " << ex.
what();
1179 peer->charge(Resource::feeBadData);
1190 if ((packet.type() == protocol::liTX_NODE) ||
1191 (packet.type() == protocol::liAS_NODE))
1193 std::string type = packet.type() == protocol::liTX_NODE ?
"liTX_NODE: "
1196 if (packet.nodes().empty())
1198 JLOG(journal_.info()) << peer->id() <<
": response with no nodes";
1199 peer->charge(Resource::feeInvalidRequest);
1206 for (
auto const& node : packet.nodes())
1208 if (!node.has_nodeid() || !node.has_nodedata())
1210 JLOG(journal_.warn()) <<
"Got bad node";
1211 peer->charge(Resource::feeInvalidRequest);
1217 receiveNode(packet, san);
1219 JLOG(journal_.debug())
1221 << ((packet.type() == protocol::liTX_NODE) ?
"TX" :
"AS")
1222 <<
" node stats: " << san.
get();
1249 maxCount =
std::max(maxCount, dataCount);
1250 auto i = counts.
find(peer);
1251 if (i == counts.
end())
1253 counts.
emplace(std::move(peer), dataCount);
1256 i->second =
std::max(i->second, dataCount);
1265 auto const thresh = maxCount / 2;
1266 auto i = counts.
begin();
1267 while (i != counts.
end())
1269 if (i->second < thresh)
1270 i = counts.
erase(i);
1285 auto outFunc = [&f](
auto&& v) { f(v.first); };
1300 boost::make_function_output_iterator(outFunc),
1312 InboundLedger::runData()
1317 decltype(mReceivedData) data;
1331 if (mReceivedData.empty())
1333 mReceiveDispatched =
false;
1337 data.swap(mReceivedData);
1340 for (
auto& entry : data)
1342 if (
auto peer = entry.first.lock())
1344 int count = processData(peer, *(entry.second));
1345 dataCounts.
update(std::move(peer), count);
1354 trigger(peer, TriggerReason::reply);
1359 InboundLedger::getJson(
int)
1365 ret[jss::hash] = to_string(hash_);
1368 ret[jss::complete] =
true;
1371 ret[jss::failed] =
true;
1373 if (!complete_ && !failed_)
1374 ret[jss::peers] =
static_cast<int>(mPeerSet->getPeerIds().size());
1376 ret[jss::have_header] = mHaveHeader;
1380 ret[jss::have_state] = mHaveState;
1381 ret[jss::have_transactions] = mHaveTransactions;
1384 ret[jss::timeouts] = timeouts_;
1386 if (mHaveHeader && !mHaveState)
1389 for (
auto const& h : neededStateHashes(16,
nullptr))
1393 ret[jss::needed_state_hashes] = hv;
1396 if (mHaveHeader && !mHaveTransactions)
1399 for (
auto const& h : neededTxHashes(16,
nullptr))
1403 ret[jss::needed_transaction_hashes] = hv;