20 #include <ripple/app/ledger/LedgerReplayer.h>
21 #include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
22 #include <ripple/app/ledger/impl/SkipListAcquire.h>
23 #include <ripple/core/JobQueue.h>
32 , inboundLedgers_(inboundLedgers)
33 , peerSetBuilder_(
std::move(peerSetBuilder))
34 , j_(app.journal(
"LedgerReplayer"))
47 uint256 const& finishLedgerHash,
51 finishLedgerHash.
isNonZero() && totalNumLedgers > 0 &&
55 r, finishLedgerHash, totalNumLedgers);
59 bool newSkipList =
false;
66 JLOG(
j_.
info()) <<
"Too many replay tasks, dropping new task "
71 for (
auto const& t :
tasks_)
77 <<
" ledgers merged into an existing task.";
81 JLOG(
j_.
info()) <<
"Replay " << totalNumLedgers
82 <<
" ledgers. Finish ledger hash "
87 skipList = i->second.lock();
91 skipList = std::make_shared<SkipListAcquire>(
100 task = std::make_shared<LedgerReplayTask>(
121 auto const& parameter = task->getTaskParameter();
122 JLOG(
j_.
trace()) <<
"Creating " << parameter.totalLedgers_ - 1 <<
" deltas";
123 if (parameter.totalLedgers_ > 1)
126 parameter.skipList_.begin(),
127 parameter.skipList_.end(),
128 parameter.startHash_);
129 if (skipListItem == parameter.skipList_.end() ||
130 ++skipListItem == parameter.skipList_.end())
132 JLOG(
j_.
error()) <<
"Task parameter error when creating deltas "
133 << parameter.finishHash_;
138 seq <= parameter.finishSeq_ &&
139 skipListItem != parameter.skipList_.end();
140 ++seq, ++skipListItem)
143 bool newDelta =
false;
148 auto i =
deltas_.find(*skipListItem);
150 delta = i->second.lock();
154 delta = std::make_shared<LedgerDeltaAcquire>(
160 deltas_[*skipListItem] = delta;
165 task->addDelta(delta);
175 boost::intrusive_ptr<SHAMapItem const>
const& item)
183 skipList = i->second.lock();
192 skipList->processData(info.
seq, item);
206 delta = i->second.lock();
215 delta->processData(info, std::move(txns));
224 JLOG(
j_.
debug()) <<
"Sweeping, LedgerReplayer has " <<
tasks_.size()
226 <<
" skipLists, and " <<
deltas_.size() <<
" deltas.";
232 [
this](
auto const& t) ->
bool {
235 JLOG(j_.debug()) <<
"Sweep task "
236 << t->getTaskParameter().finishHash_;
243 auto removeCannotLocked = [](
auto& subTasks) {
244 for (
auto it = subTasks.begin(); it != subTasks.end();)
246 if (
auto item = it->second.lock(); !item)
248 it = subTasks.erase(it);
257 JLOG(j_.
debug()) <<
" LedgerReplayer sweep lock duration "
258 << std::chrono::duration_cast<std::chrono::milliseconds>(
265 LedgerReplayer::stop()
267 JLOG(j_.
info()) <<
"Stopping...";
271 tasks_.begin(), tasks_.end(), [](
auto& i) { i->cancel(); });
273 auto lockAndCancel = [](
auto& i) {
274 if (
auto sptr = i.second.lock(); sptr)
279 std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
281 std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
285 JLOG(j_.
info()) <<
"Stopped";