20 #ifndef RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED
21 #define RIPPLE_CONSENSUS_CONSENSUS_H_INCLUDED
23 #include <ripple/basics/Log.h>
24 #include <ripple/basics/chrono.h>
25 #include <ripple/beast/utility/Journal.h>
26 #include <ripple/consensus/ConsensusParms.h>
27 #include <ripple/consensus/ConsensusProposal.h>
28 #include <ripple/consensus/ConsensusTypes.h>
29 #include <ripple/consensus/DisputedTx.h>
30 #include <ripple/consensus/LedgerTiming.h>
31 #include <ripple/json/json_writer.h>
32 #include <boost/logic/tribool.hpp>
66 ConsensusParms
const& parms,
91 ConsensusParms
const& parms,
283 template <
class Adaptor>
289 using Tx_t =
typename TxSet_t::Tx;
293 typename Ledger_t::ID,
294 typename TxSet_t::ID>;
317 a.onModeChange(
mode_, mode);
402 std::optional<
std::chrono::milliseconds> consensusDelay);
604 template <
class Adaptor>
609 : adaptor_(adaptor), clock_(clock), j_{journal}
611 JLOG(j_.
debug()) <<
"Creating consensus object";
614 template <
class Adaptor>
618 typename Ledger_t::ID
const& prevLedgerID,
626 prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
627 prevCloseTime_ = prevLedger.closeTime();
632 prevCloseTime_ = rawCloseTimes_.self;
635 for (
NodeID_t const& n : nowUntrusted)
636 recentPeerPositions_.erase(n);
639 proposing ? ConsensusMode::proposing : ConsensusMode::observing;
642 if (prevLedger.id() != prevLedgerID)
645 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID))
647 prevLedger = *newLedger;
651 startMode = ConsensusMode::wrongLedger;
653 <<
"Entering consensus with: " << previousLedger_.id();
654 JLOG(j_.
info()) <<
"Correct LCL is: " << prevLedgerID;
658 startRoundInternal(now, prevLedgerID, prevLedger, startMode);
660 template <
class Adaptor>
664 typename Ledger_t::ID
const& prevLedgerID,
668 phase_ = ConsensusPhase::open;
669 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::open";
670 mode_.set(mode, adaptor_);
672 prevLedgerID_ = prevLedgerID;
673 previousLedger_ = prevLedger;
675 convergePercent_ = 0;
676 haveCloseTimeConsensus_ =
false;
677 openTime_.reset(clock_.now());
678 currPeerPositions_.clear();
680 rawCloseTimes_.peers.clear();
681 rawCloseTimes_.self = {};
685 previousLedger_.closeTimeResolution(),
686 previousLedger_.closeAgree(),
687 previousLedger_.seq() +
typename Ledger_t::Seq{1});
690 if (currPeerPositions_.size() > (prevProposers_ / 2))
698 template <
class Adaptor>
704 auto const& peerID = newPeerPos.proposal().nodeID();
708 auto& props = recentPeerPositions_[peerID];
710 if (props.size() >= 10)
713 props.push_back(newPeerPos);
715 return peerProposalInternal(now, newPeerPos);
718 template <
class Adaptor>
725 if (phase_ == ConsensusPhase::accepted)
730 auto const& newPeerProp = newPeerPos.proposal();
732 if (newPeerProp.prevLedger() != prevLedgerID_)
734 JLOG(j_.
debug()) <<
"Got proposal for " << newPeerProp.prevLedger()
735 <<
" but we are on " << prevLedgerID_;
739 auto const& peerID = newPeerProp.nodeID();
741 if (deadNodes_.find(peerID) != deadNodes_.end())
743 JLOG(j_.
info()) <<
"Position from dead node: " << peerID;
749 auto peerPosIt = currPeerPositions_.find(peerID);
751 if (peerPosIt != currPeerPositions_.end())
753 if (newPeerProp.proposeSeq() <=
754 peerPosIt->second.proposal().proposeSeq())
760 if (newPeerProp.isBowOut())
762 JLOG(j_.
info()) <<
"Peer " << peerID <<
" bows out";
765 for (
auto& it : result_->disputes)
766 it.second.unVote(peerID);
768 if (peerPosIt != currPeerPositions_.end())
769 currPeerPositions_.erase(peerID);
770 deadNodes_.insert(peerID);
775 if (peerPosIt != currPeerPositions_.end())
776 peerPosIt->second = newPeerPos;
778 currPeerPositions_.emplace(peerID, newPeerPos);
781 if (newPeerProp.isInitial())
784 JLOG(j_.
trace()) <<
"Peer reports close time as "
785 << newPeerProp.closeTime().time_since_epoch().count();
786 ++rawCloseTimes_.peers[newPeerProp.closeTime()];
789 JLOG(j_.
trace()) <<
"Processing peer proposal " << newPeerProp.proposeSeq()
790 <<
"/" << newPeerProp.position();
793 auto const ait = acquired_.find(newPeerProp.position());
794 if (ait == acquired_.end())
799 if (
auto set = adaptor_.acquireTxSet(newPeerProp.position()))
800 gotTxSet(now_, *
set);
802 JLOG(j_.
debug()) <<
"Don't have tx set for peer";
806 updateDisputes(newPeerProp.nodeID(), ait->second);
813 template <
class Adaptor>
818 if (phase_ == ConsensusPhase::accepted)
826 if (phase_ == ConsensusPhase::open)
830 else if (phase_ == ConsensusPhase::establish)
836 template <
class Adaptor>
843 if (phase_ == ConsensusPhase::accepted)
848 auto id = txSet.id();
852 if (!acquired_.emplace(
id, txSet).second)
857 JLOG(j_.
debug()) <<
"Not creating disputes: no position yet.";
863 assert(
id != result_->position.position());
865 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
867 if (peerPos.proposal().position() ==
id)
869 updateDisputes(nodeId, txSet);
877 <<
"By the time we got " <<
id <<
" no peers were proposing it";
882 template <
class Adaptor>
888 using namespace std::chrono_literals;
889 JLOG(j_.
info()) <<
"Simulating consensus";
892 result_->roundTime.tick(consensusDelay.
value_or(100ms));
893 result_->proposers = prevProposers_ = currPeerPositions_.size();
894 prevRoundTime_ = result_->roundTime.read();
895 phase_ = ConsensusPhase::accepted;
896 adaptor_.onForceAccept(
903 JLOG(j_.
info()) <<
"Simulation complete";
906 template <
class Adaptor>
915 ret[
"proposing"] = (mode_.get() == ConsensusMode::proposing);
916 ret[
"proposers"] =
static_cast<int>(currPeerPositions_.size());
918 if (mode_.get() != ConsensusMode::wrongLedger)
920 ret[
"synched"] =
true;
923 ret[
"close_granularity"] =
static_cast<Int
>(closeResolution_.count());
926 ret[
"synched"] =
false;
928 ret[
"phase"] = to_string(phase_);
930 if (result_ && !result_->disputes.empty() && !full)
931 ret[
"disputes"] =
static_cast<Int
>(result_->disputes.size());
934 ret[
"our_position"] = result_->position.getJson();
940 static_cast<Int
>(result_->roundTime.read().count());
941 ret[
"converge_percent"] = convergePercent_;
942 ret[
"close_resolution"] =
static_cast<Int
>(closeResolution_.count());
943 ret[
"have_time_consensus"] = haveCloseTimeConsensus_;
944 ret[
"previous_proposers"] =
static_cast<Int
>(prevProposers_);
945 ret[
"previous_mseconds"] =
static_cast<Int
>(prevRoundTime_.count());
947 if (!currPeerPositions_.empty())
951 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
953 ppj[to_string(nodeId)] = peerPos.getJson();
955 ret[
"peer_positions"] = std::move(ppj);
958 if (!acquired_.empty())
961 for (
auto const& at : acquired_)
963 acq.
append(to_string(at.first));
965 ret[
"acquired"] = std::move(acq);
968 if (result_ && !result_->disputes.empty())
971 for (
auto const& [txId, dispute] : result_->disputes)
973 dsj[to_string(txId)] = dispute.getJson();
975 ret[
"disputes"] = std::move(dsj);
978 if (!rawCloseTimes_.peers.empty())
981 for (
auto const& ct : rawCloseTimes_.peers)
986 ret[
"close_times"] = std::move(ctj);
989 if (!deadNodes_.empty())
992 for (
auto const& dn : deadNodes_)
994 dnj.
append(to_string(dn));
996 ret[
"dead_nodes"] = std::move(dnj);
1004 template <
class Adaptor>
1008 assert(lgrId != prevLedgerID_ || previousLedger_.id() != lgrId);
1014 if (prevLedgerID_ != lgrId)
1016 prevLedgerID_ = lgrId;
1021 result_->disputes.clear();
1022 result_->compares.clear();
1025 currPeerPositions_.clear();
1026 rawCloseTimes_.peers.clear();
1030 playbackProposals();
1033 if (previousLedger_.id() == prevLedgerID_)
1037 if (
auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
1039 JLOG(j_.
info()) <<
"Have the consensus ledger " << prevLedgerID_;
1041 now_, lgrId, *newLedger, ConsensusMode::switchedLedger);
1045 mode_.set(ConsensusMode::wrongLedger, adaptor_);
1049 template <
class Adaptor>
1054 adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
1056 if (netLgr != prevLedgerID_)
1058 JLOG(j_.
warn()) <<
"View of consensus changed during "
1059 << to_string(phase_) <<
" status=" << to_string(phase_)
1061 <<
" mode=" << to_string(mode_.get());
1062 JLOG(j_.
warn()) << prevLedgerID_ <<
" to " << netLgr;
1064 JLOG(j_.
debug()) <<
"State on consensus change "
1066 handleWrongLedger(netLgr);
1068 else if (previousLedger_.id() != prevLedgerID_)
1069 handleWrongLedger(netLgr);
1072 template <
class Adaptor>
1076 for (
auto const& it : recentPeerPositions_)
1078 for (
auto const& pos : it.second)
1080 if (pos.proposal().prevLedger() == prevLedgerID_)
1082 if (peerProposalInternal(now_, pos))
1083 adaptor_.share(pos);
1089 template <
class Adaptor>
1096 bool anyTransactions = adaptor_.hasOpenTransactions();
1097 auto proposersClosed = currPeerPositions_.size();
1098 auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
1100 openTime_.tick(clock_.now());
1105 bool previousCloseCorrect =
1106 (mode_.get() != ConsensusMode::wrongLedger) &&
1107 previousLedger_.closeAgree() &&
1108 (previousLedger_.closeTime() !=
1109 (previousLedger_.parentCloseTime() + 1s));
1111 auto lastCloseTime = previousCloseCorrect
1112 ? previousLedger_.closeTime()
1115 if (now_ >= lastCloseTime)
1116 sinceClose = duration_cast<milliseconds>(now_ - lastCloseTime);
1118 sinceClose = -duration_cast<milliseconds>(lastCloseTime - now_);
1121 auto const idleInterval = std::max<milliseconds>(
1122 adaptor_.parms().ledgerIDLE_INTERVAL,
1123 2 * previousLedger_.closeTimeResolution());
1142 template <
class Adaptor>
1146 auto const& parms = adaptor_.parms();
1148 previousLedger_.seq() -
1149 std::min(adaptor_.getValidLedgerIndex(), previousLedger_.seq()));
1150 auto [quorum, trustedKeys] = adaptor_.getQuorumKeys();
1151 std::size_t const totalValidators = trustedKeys.size();
1153 adaptor_.laggards(previousLedger_.seq(), trustedKeys);
1157 vars <<
" (working seq: " << previousLedger_.seq() <<
", "
1158 <<
"validated seq: " << adaptor_.getValidLedgerIndex() <<
", "
1159 <<
"am validator: " << adaptor_.validator() <<
", "
1160 <<
"have validated: " << adaptor_.haveValidated() <<
", "
1161 <<
"roundTime: " << result_->roundTime.
read().count() <<
", "
1162 <<
"max consensus time: " << parms.ledgerMAX_CONSENSUS.count() <<
", "
1163 <<
"validators: " << totalValidators <<
", "
1164 <<
"laggards: " << laggards <<
", "
1165 <<
"offline: " << offline <<
", "
1166 <<
"quorum: " << quorum <<
")";
1168 if (!ahead || !laggards || !totalValidators || !adaptor_.validator() ||
1169 !adaptor_.haveValidated() ||
1170 result_->roundTime.read() > parms.ledgerMAX_CONSENSUS)
1172 j_.
debug() <<
"not pausing (early)" << vars.
str();
1176 bool willPause =
false;
1212 std::size_t const phase = (ahead - 1) % (maxPausePhase + 1);
1221 if (laggards + offline > totalValidators - quorum)
1236 float const nonLaggards = totalValidators - (laggards + offline);
1237 float const quorumRatio =
1238 static_cast<float>(quorum) / totalValidators;
1239 float const allowedDissent = 1.0f - quorumRatio;
1240 float const phaseFactor =
static_cast<float>(phase) / maxPausePhase;
1242 if (nonLaggards / totalValidators <
1243 quorumRatio + (allowedDissent * phaseFactor))
1250 j_.
warn() <<
"pausing" << vars.
str();
1252 j_.
debug() <<
"not pausing" << vars.
str();
1256 template <
class Adaptor>
1266 result_->roundTime.tick(clock_.now());
1267 result_->proposers = currPeerPositions_.size();
1269 convergePercent_ = result_->roundTime.read() * 100 /
1276 updateOurPositions();
1279 if (shouldPause() || !haveConsensus())
1282 if (!haveCloseTimeConsensus_)
1284 JLOG(j_.
info()) <<
"We have TX consensus but not CT consensus";
1288 JLOG(j_.
info()) <<
"Converge cutoff (" << currPeerPositions_.size()
1289 <<
" participants)";
1290 adaptor_.updateOperatingMode(currPeerPositions_.size());
1291 prevProposers_ = currPeerPositions_.size();
1292 prevRoundTime_ = result_->roundTime.read();
1293 phase_ = ConsensusPhase::accepted;
1294 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::accepted";
1304 template <
class Adaptor>
1311 phase_ = ConsensusPhase::establish;
1312 JLOG(j_.
debug()) <<
"transitioned to ConsensusPhase::establish";
1313 rawCloseTimes_.self = now_;
1315 result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
1316 result_->roundTime.reset(clock_.now());
1319 if (acquired_.emplace(result_->txns.id(), result_->txns).second)
1320 adaptor_.share(result_->txns);
1322 if (mode_.get() == ConsensusMode::proposing)
1323 adaptor_.propose(result_->position);
1326 for (
auto const& pit : currPeerPositions_)
1328 auto const& pos = pit.second.proposal().position();
1329 auto const it = acquired_.find(pos);
1330 if (it != acquired_.end())
1332 createDisputes(it->second);
1352 int result = ((participants * percent) + (percent / 2)) / 100;
1354 return (result == 0) ? 1 : result;
1357 template <
class Adaptor>
1372 auto it = currPeerPositions_.
begin();
1373 while (it != currPeerPositions_.end())
1375 Proposal_t const& peerProp = it->second.proposal();
1376 if (peerProp.
isStale(peerCutoff))
1380 JLOG(j_.
warn()) <<
"Removing stale proposal from " << peerID;
1381 for (
auto& dt : result_->disputes)
1382 dt.second.unVote(peerID);
1383 it = currPeerPositions_.erase(it);
1388 ++closeTimeVotes[asCloseTime(peerProp.
closeTime())];
1400 for (
auto& [txId, dispute] : result_->disputes)
1404 if (dispute.updateVote(
1406 mode_.get() == ConsensusMode::proposing,
1410 mutableSet.
emplace(result_->txns);
1412 if (dispute.getOurVote())
1415 mutableSet->insert(dispute.tx());
1420 mutableSet->erase(txId);
1426 ourNewSet.
emplace(std::move(*mutableSet));
1430 haveCloseTimeConsensus_ =
false;
1432 if (currPeerPositions_.empty())
1435 haveCloseTimeConsensus_ =
true;
1436 consensusCloseTime = asCloseTime(result_->position.closeTime());
1451 int participants = currPeerPositions_.size();
1452 if (mode_.get() == ConsensusMode::proposing)
1454 ++closeTimeVotes[asCloseTime(result_->position.closeTime())];
1462 int const threshConsensus =
1465 JLOG(j_.
info()) <<
"Proposers:" << currPeerPositions_.size()
1466 <<
" nw:" << neededWeight <<
" thrV:" << threshVote
1467 <<
" thrC:" << threshConsensus;
1469 for (
auto const& [t, v] : closeTimeVotes)
1473 <<
static_cast<std::uint32_t>(previousLedger_.seq()) + 1 <<
": "
1474 << t.time_since_epoch().count() <<
" has " << v <<
", "
1475 << threshVote <<
" required";
1477 if (v >= threshVote)
1480 consensusCloseTime = t;
1483 if (threshVote >= threshConsensus)
1484 haveCloseTimeConsensus_ =
true;
1488 if (!haveCloseTimeConsensus_)
1491 <<
"No CT consensus:"
1492 <<
" Proposers:" << currPeerPositions_.size()
1493 <<
" Mode:" << to_string(mode_.get())
1494 <<
" Thresh:" << threshConsensus
1500 ((consensusCloseTime != asCloseTime(result_->position.closeTime())) ||
1501 result_->position.isStale(ourCutoff)))
1504 ourNewSet.
emplace(result_->txns);
1509 auto newID = ourNewSet->id();
1511 result_->txns = std::move(*ourNewSet);
1513 JLOG(j_.
info()) <<
"Position change: CTime "
1515 <<
", tx " << newID;
1517 result_->position.changePosition(newID, consensusCloseTime, now_);
1521 if (acquired_.emplace(newID, result_->txns).second)
1523 if (!result_->position.isBowOut())
1524 adaptor_.share(result_->txns);
1526 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1530 updateDisputes(nodeId, result_->txns);
1535 if (!result_->position.isBowOut() &&
1536 (mode_.get() == ConsensusMode::proposing))
1537 adaptor_.propose(result_->position);
1541 template <
class Adaptor>
1549 int agree = 0, disagree = 0;
1551 auto ourPosition = result_->position.position();
1554 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1556 Proposal_t const& peerProp = peerPos.proposal();
1557 if (peerProp.
position() == ourPosition)
1563 JLOG(j_.
debug()) << nodeId <<
" has " << peerProp.
position();
1567 auto currentFinished =
1568 adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
1570 JLOG(j_.
debug()) <<
"Checking for TX consensus: agree=" << agree
1571 <<
", disagree=" << disagree;
1580 result_->roundTime.read(),
1582 mode_.get() == ConsensusMode::proposing,
1585 if (result_->state == ConsensusState::No)
1590 if (result_->state == ConsensusState::MovedOn)
1592 JLOG(j_.
error()) <<
"Unable to reach consensus";
1599 template <
class Adaptor>
1603 if (mode_.get() == ConsensusMode::proposing)
1605 if (result_ && !result_->position.isBowOut())
1607 result_->position.bowOut(now_);
1608 adaptor_.propose(result_->position);
1611 mode_.set(ConsensusMode::observing, adaptor_);
1612 JLOG(j_.
info()) <<
"Bowing out of consensus";
1616 template <
class Adaptor>
1624 if (!result_->compares.emplace(o.id()).second)
1628 if (result_->txns.id() == o.id())
1631 JLOG(j_.
debug()) <<
"createDisputes " << result_->txns.id() <<
" to "
1634 auto differences = result_->txns.compare(o);
1638 for (
auto const& [txId, inThisSet] : differences)
1643 (inThisSet && result_->txns.find(txId) && !o.find(txId)) ||
1644 (!inThisSet && !result_->txns.find(txId) && o.find(txId)));
1646 Tx_t tx = inThisSet ? result_->txns.find(txId) : o.find(txId);
1647 auto txID = tx.id();
1649 if (result_->disputes.find(txID) != result_->disputes.end())
1652 JLOG(j_.
debug()) <<
"Transaction " << txID <<
" is disputed";
1656 result_->txns.exists(txID),
1657 std::max(prevProposers_, currPeerPositions_.size()),
1661 for (
auto const& [nodeId, peerPos] : currPeerPositions_)
1663 Proposal_t const& peerProp = peerPos.proposal();
1664 auto const cit = acquired_.find(peerProp.
position());
1665 if (cit != acquired_.end())
1666 dtx.setVote(nodeId, cit->second.exists(txID));
1668 adaptor_.share(dtx.tx());
1670 result_->disputes.emplace(txID, std::move(dtx));
1672 JLOG(j_.
debug()) << dc <<
" differences found";
1675 template <
class Adaptor>
1684 if (result_->compares.find(other.id()) == result_->compares.end())
1685 createDisputes(other);
1687 for (
auto& it : result_->disputes)
1689 auto& d = it.second;
1690 d.setVote(node, other.exists(d.tx().id()));
1694 template <
class Adaptor>