20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerReplayTask.h>
22 #include <ripple/app/ledger/LedgerReplayer.h>
23 #include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
24 #include <ripple/app/ledger/impl/SkipListAcquire.h>
25 #include <ripple/core/JobQueue.h>
31 uint256 const& finishLedgerHash,
33 : reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
35 assert(finishLedgerHash.
isNonZero() && totalNumLedgers > 0);
44 if (finishHash_ != hash || sList.
size() + 1 < totalLedgers_ || full_)
50 startHash_ = skipList_[skipList_.size() - totalLedgers_];
51 assert(startHash_.isNonZero());
52 startSeq_ = finishSeq_ - totalLedgers_ + 1;
61 if (reason_ == existingTask.
reason_)
69 if (existingTask.
full_)
71 auto const& exList = existingTask.
skipList_;
72 if (
auto i =
std::find(exList.begin(), exList.end(), finishHash_);
76 totalLedgers_ + (exList.end() - i) - 1;
92 parameter.finishHash_,
93 LedgerReplayParameters::TASK_TIMEOUT,
97 app.journal(
"LedgerReplayTask"))
98 , inboundLedgers_(inboundLedgers)
100 , parameter_(parameter)
103 parameter.totalLedgers_ *
105 , skipListAcquirer_(skipListAcquirer)
107 JLOG(journal_.trace()) <<
"Create " << hash_;
122 if (
auto sptr = wptr.
lock(); sptr)
130 auto const skipListData = sptr->skipListAcquirer_->getData();
131 sptr->updateSkipList(
132 hash, skipListData->ledgerSeq, skipListData->skipList);
137 ScopedLockType sl(mtx_);
148 JLOG(journal_.trace()) <<
"trigger " << hash_;
149 if (!parameter_.full_)
154 parent_ = app_.getLedgerMaster().getLedgerByHash(parameter_.startHash_);
157 parent_ = inboundLedgers_.acquire(
158 parameter_.startHash_,
159 parameter_.startSeq_,
160 InboundLedger::Reason::GENERIC);
164 JLOG(journal_.trace())
165 <<
"Got start ledger " << parameter_.startHash_ <<
" for task "
174 LedgerReplayTask::deltaReady(
uint256 const& deltaHash)
176 JLOG(journal_.trace()) <<
"Delta " << deltaHash <<
" ready for task "
186 JLOG(journal_.trace()) <<
"tryAdvance task " << hash_
187 << (parameter_.full_ ?
", full parameter"
188 :
", waiting to fill parameter")
189 <<
", deltaIndex=" << deltaToBuild_
190 <<
", totalDeltas=" << deltas_.size() <<
", parent "
191 << (parent_ ? parent_->info().hash :
uint256());
193 bool shouldTry = parent_ && parameter_.full_ &&
194 parameter_.totalLedgers_ - 1 == deltas_.size();
200 for (; deltaToBuild_ < deltas_.size(); ++deltaToBuild_)
202 auto& delta = deltas_[deltaToBuild_];
203 assert(parent_->seq() + 1 == delta->ledgerSeq_);
204 if (
auto l = delta->tryBuild(parent_); l)
206 JLOG(journal_.debug())
207 <<
"Task " << hash_ <<
" got ledger " << l->info().hash
208 <<
" deltaIndex=" << deltaToBuild_
209 <<
" totalDeltas=" << deltas_.size();
217 JLOG(journal_.info()) <<
"Completed " << hash_;
226 LedgerReplayTask::updateSkipList(
235 if (!parameter_.update(hash, seq, sList))
237 JLOG(journal_.error()) <<
"Parameter update failed " << hash_;
243 replayer_.createDeltas(shared_from_this());
252 JLOG(journal_.trace()) <<
"mTimeouts=" << timeouts_ <<
" for " << hash_;
253 if (timeouts_ > maxTimeouts_)
256 JLOG(journal_.debug())
257 <<
"LedgerReplayTask Failed, too many timeouts " << hash_;
266 LedgerReplayTask::pmDowncast()
268 return shared_from_this();
275 delta->addDataCallback(
276 parameter_.reason_, [wptr](
bool good,
uint256 const& hash) {
277 if (auto sptr = wptr.lock(); sptr)
282 sptr->deltaReady(hash);
286 ScopedLockType sl(mtx_);
289 JLOG(journal_.trace())
290 <<
"addDelta task " << hash_ <<
" deltaIndex=" << deltaToBuild_
291 <<
" totalDeltas=" << deltas_.size();
294 deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_);
295 deltas_.push_back(delta);
300 LedgerReplayTask::finished()
const