rippled
LedgerReplayTask.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerReplayTask.h>
22 #include <ripple/app/ledger/LedgerReplayer.h>
23 #include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
24 #include <ripple/app/ledger/impl/SkipListAcquire.h>
25 #include <ripple/core/JobQueue.h>
26 
27 namespace ripple {
28 
31  uint256 const& finishLedgerHash,
32  std::uint32_t totalNumLedgers)
33  : reason_(r), finishHash_(finishLedgerHash), totalLedgers_(totalNumLedgers)
34 {
35  assert(finishLedgerHash.isNonZero() && totalNumLedgers > 0);
36 }
37 
38 bool
40  uint256 const& hash,
41  std::uint32_t seq,
42  std::vector<uint256> const& sList)
43 {
44  if (finishHash_ != hash || sList.size() + 1 < totalLedgers_ || full_)
45  return false;
46 
47  finishSeq_ = seq;
48  skipList_ = sList;
49  skipList_.emplace_back(finishHash_);
50  startHash_ = skipList_[skipList_.size() - totalLedgers_];
51  assert(startHash_.isNonZero());
52  startSeq_ = finishSeq_ - totalLedgers_ + 1;
53  full_ = true;
54  return true;
55 }
56 
57 bool
59  TaskParameter const& existingTask) const
60 {
61  if (reason_ == existingTask.reason_)
62  {
63  if (finishHash_ == existingTask.finishHash_ &&
64  totalLedgers_ <= existingTask.totalLedgers_)
65  {
66  return true;
67  }
68 
69  if (existingTask.full_)
70  {
71  auto const& exList = existingTask.skipList_;
72  if (auto i = std::find(exList.begin(), exList.end(), finishHash_);
73  i != exList.end())
74  {
75  return existingTask.totalLedgers_ >=
76  totalLedgers_ + (exList.end() - i) - 1;
77  }
78  }
79  }
80 
81  return false;
82 }
83 
85  Application& app,
86  InboundLedgers& inboundLedgers,
87  LedgerReplayer& replayer,
88  std::shared_ptr<SkipListAcquire>& skipListAcquirer,
89  TaskParameter&& parameter)
91  app,
92  parameter.finishHash_,
93  LedgerReplayParameters::TASK_TIMEOUT,
95  "LedgerReplayTask",
97  app.journal("LedgerReplayTask"))
98  , inboundLedgers_(inboundLedgers)
99  , replayer_(replayer)
100  , parameter_(parameter)
101  , maxTimeouts_(std::max(
103  parameter.totalLedgers_ *
105  , skipListAcquirer_(skipListAcquirer)
106 {
107  JLOG(journal_.trace()) << "Create " << hash_;
108 }
109 
111 {
112  JLOG(journal_.trace()) << "Destroy " << hash_;
113 }
114 
115 void
117 {
118  JLOG(journal_.debug()) << "Task start " << hash_;
119 
121  skipListAcquirer_->addDataCallback([wptr](bool good, uint256 const& hash) {
122  if (auto sptr = wptr.lock(); sptr)
123  {
124  if (!good)
125  {
126  sptr->cancel();
127  }
128  else
129  {
130  auto const skipListData = sptr->skipListAcquirer_->getData();
131  sptr->updateSkipList(
132  hash, skipListData->ledgerSeq, skipListData->skipList);
133  }
134  }
135  });
136 
137  ScopedLockType sl(mtx_);
138  if (!isDone())
139  {
140  trigger(sl);
141  setTimer(sl);
142  }
143 }
144 
145 void
146 LedgerReplayTask::trigger(ScopedLockType& sl)
147 {
148  JLOG(journal_.trace()) << "trigger " << hash_;
149  if (!parameter_.full_)
150  return;
151 
152  if (!parent_)
153  {
154  parent_ = app_.getLedgerMaster().getLedgerByHash(parameter_.startHash_);
155  if (!parent_)
156  {
157  parent_ = inboundLedgers_.acquire(
158  parameter_.startHash_,
159  parameter_.startSeq_,
160  InboundLedger::Reason::GENERIC);
161  }
162  if (parent_)
163  {
164  JLOG(journal_.trace())
165  << "Got start ledger " << parameter_.startHash_ << " for task "
166  << hash_;
167  }
168  }
169 
170  tryAdvance(sl);
171 }
172 
173 void
174 LedgerReplayTask::deltaReady(uint256 const& deltaHash)
175 {
176  JLOG(journal_.trace()) << "Delta " << deltaHash << " ready for task "
177  << hash_;
178  ScopedLockType sl(mtx_);
179  if (!isDone())
180  tryAdvance(sl);
181 }
182 
183 void
184 LedgerReplayTask::tryAdvance(ScopedLockType& sl)
185 {
186  JLOG(journal_.trace()) << "tryAdvance task " << hash_
187  << (parameter_.full_ ? ", full parameter"
188  : ", waiting to fill parameter")
189  << ", deltaIndex=" << deltaToBuild_
190  << ", totalDeltas=" << deltas_.size() << ", parent "
191  << (parent_ ? parent_->info().hash : uint256());
192 
193  bool shouldTry = parent_ && parameter_.full_ &&
194  parameter_.totalLedgers_ - 1 == deltas_.size();
195  if (!shouldTry)
196  return;
197 
198  try
199  {
200  for (; deltaToBuild_ < deltas_.size(); ++deltaToBuild_)
201  {
202  auto& delta = deltas_[deltaToBuild_];
203  assert(parent_->seq() + 1 == delta->ledgerSeq_);
204  if (auto l = delta->tryBuild(parent_); l)
205  {
206  JLOG(journal_.debug())
207  << "Task " << hash_ << " got ledger " << l->info().hash
208  << " deltaIndex=" << deltaToBuild_
209  << " totalDeltas=" << deltas_.size();
210  parent_ = l;
211  }
212  else
213  return;
214  }
215 
216  complete_ = true;
217  JLOG(journal_.info()) << "Completed " << hash_;
218  }
219  catch (std::runtime_error const&)
220  {
221  failed_ = true;
222  }
223 }
224 
225 void
226 LedgerReplayTask::updateSkipList(
227  uint256 const& hash,
228  std::uint32_t seq,
229  std::vector<uint256> const& sList)
230 {
231  {
232  ScopedLockType sl(mtx_);
233  if (isDone())
234  return;
235  if (!parameter_.update(hash, seq, sList))
236  {
237  JLOG(journal_.error()) << "Parameter update failed " << hash_;
238  failed_ = true;
239  return;
240  }
241  }
242 
243  replayer_.createDeltas(shared_from_this());
244  ScopedLockType sl(mtx_);
245  if (!isDone())
246  trigger(sl);
247 }
248 
249 void
250 LedgerReplayTask::onTimer(bool progress, ScopedLockType& sl)
251 {
252  JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
253  if (timeouts_ > maxTimeouts_)
254  {
255  failed_ = true;
256  JLOG(journal_.debug())
257  << "LedgerReplayTask Failed, too many timeouts " << hash_;
258  }
259  else
260  {
261  trigger(sl);
262  }
263 }
264 
266 LedgerReplayTask::pmDowncast()
267 {
268  return shared_from_this();
269 }
270 
271 void
272 LedgerReplayTask::addDelta(std::shared_ptr<LedgerDeltaAcquire> const& delta)
273 {
274  std::weak_ptr<LedgerReplayTask> wptr = shared_from_this();
275  delta->addDataCallback(
276  parameter_.reason_, [wptr](bool good, uint256 const& hash) {
277  if (auto sptr = wptr.lock(); sptr)
278  {
279  if (!good)
280  sptr->cancel();
281  else
282  sptr->deltaReady(hash);
283  }
284  });
285 
286  ScopedLockType sl(mtx_);
287  if (!isDone())
288  {
289  JLOG(journal_.trace())
290  << "addDelta task " << hash_ << " deltaIndex=" << deltaToBuild_
291  << " totalDeltas=" << deltas_.size();
292  assert(
293  deltas_.empty() ||
294  deltas_.back()->ledgerSeq_ + 1 == delta->ledgerSeq_);
295  deltas_.push_back(delta);
296  }
297 }
298 
299 bool
300 LedgerReplayTask::finished() const
301 {
302  ScopedLockType sl(mtx_);
303  return isDone();
304 }
305 
306 } // namespace ripple
ripple::Application
Definition: Application.h:115
std::weak_ptr::lock
T lock(T... args)
ripple::LedgerReplayTask::TaskParameter::reason_
InboundLedger::Reason reason_
Definition: LedgerReplayTask.h:50
std::shared_ptr
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::jtREPLAY_TASK
@ jtREPLAY_TASK
Definition: Job.h:62
std::vector
STL class.
std::find
T find(T... args)
std::vector::size
T size(T... args)
ripple::LedgerReplayTask::TaskParameter::full_
bool full_
Definition: LedgerReplayTask.h:59
ripple::LedgerReplayTask::LedgerReplayTask
LedgerReplayTask(Application &app, InboundLedgers &inboundLedgers, LedgerReplayer &replayer, std::shared_ptr< SkipListAcquire > &skipListAcquirer, TaskParameter &&parameter)
Constructor.
Definition: LedgerReplayTask.cpp:84
ripple::TimeoutCounter
This class is an "active" object.
Definition: TimeoutCounter.h:66
ripple::LedgerReplayTask::TaskParameter::update
bool update(uint256 const &hash, std::uint32_t seq, std::vector< uint256 > const &sList)
fill all the fields that was not filled during construction
Definition: LedgerReplayTask.cpp:39
ripple::LedgerReplayParameters::MAX_QUEUED_TASKS
constexpr std::uint32_t MAX_QUEUED_TASKS
Definition: LedgerReplayer.h:66
ripple::base_uint
Integers of any length that is a multiple of 32-bits.
Definition: base_uint.h:82
ripple::LedgerReplayTask::TaskParameter::TaskParameter
TaskParameter(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
constructor
Definition: LedgerReplayTask.cpp:29
std::enable_shared_from_this< LedgerReplayTask >::shared_from_this
T shared_from_this(T... args)
std::unique_lock< std::recursive_mutex >
std::runtime_error
STL class.
std::uint32_t
ripple::LedgerReplayParameters::TASK_MAX_TIMEOUTS_MULTIPLIER
constexpr std::uint32_t TASK_MAX_TIMEOUTS_MULTIPLIER
Definition: LedgerReplayer.h:45
ripple::LedgerReplayTask::~LedgerReplayTask
~LedgerReplayTask()
Definition: LedgerReplayTask.cpp:110
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:33
ripple::LedgerReplayTask::init
void init()
Start the task.
Definition: LedgerReplayTask.cpp:116
ripple::LedgerReplayTask::TaskParameter
Definition: LedgerReplayTask.h:46
std::weak_ptr
STL class.
ripple::LedgerReplayTask::TaskParameter::skipList_
std::vector< uint256 > skipList_
Definition: LedgerReplayTask.h:56
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::TimeoutCounter::hash_
const uint256 hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
Definition: TimeoutCounter.h:129
ripple::LedgerReplayTask::skipListAcquirer_
std::shared_ptr< SkipListAcquire > skipListAcquirer_
Definition: LedgerReplayTask.h:176
ripple::LedgerReplayParameters::TASK_MAX_TIMEOUTS_MINIMUM
constexpr std::uint32_t TASK_MAX_TIMEOUTS_MINIMUM
Definition: LedgerReplayer.h:46
ripple::LedgerReplayer
Manages the lifetime of ledger replay tasks.
Definition: LedgerReplayer.h:72
ripple::LedgerReplayTask::TaskParameter::finishHash_
uint256 finishHash_
Definition: LedgerReplayTask.h:51
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::LedgerReplayTask::TaskParameter::totalLedgers_
std::uint32_t totalLedgers_
Definition: LedgerReplayTask.h:52
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:43
std::max
T max(T... args)
ripple::LedgerReplayTask::TaskParameter::canMergeInto
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
Definition: LedgerReplayTask.cpp:58
ripple::TimeoutCounter::journal_
beast::Journal journal_
Definition: TimeoutCounter.h:124