rippled
LedgerReplayer.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/LedgerReplayer.h>
21 #include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
22 #include <ripple/app/ledger/impl/SkipListAcquire.h>
23 #include <ripple/core/JobQueue.h>
24 
25 namespace ripple {
26 
28  Application& app,
29  InboundLedgers& inboundLedgers,
30  std::unique_ptr<PeerSetBuilder> peerSetBuilder)
31  : app_(app)
32  , inboundLedgers_(inboundLedgers)
33  , peerSetBuilder_(std::move(peerSetBuilder))
34  , j_(app.journal("LedgerReplayer"))
35 {
36 }
37 
39 {
41  tasks_.clear();
42 }
43 
44 void
47  uint256 const& finishLedgerHash,
48  std::uint32_t totalNumLedgers)
49 {
50  assert(
51  finishLedgerHash.isNonZero() && totalNumLedgers > 0 &&
52  totalNumLedgers <= LedgerReplayParameters::MAX_TASK_SIZE);
53 
55  r, finishLedgerHash, totalNumLedgers);
56 
59  bool newSkipList = false;
60  {
62  if (app_.isStopping())
63  return;
65  {
66  JLOG(j_.info()) << "Too many replay tasks, dropping new task "
67  << parameter.finishHash_;
68  return;
69  }
70 
71  for (auto const& t : tasks_)
72  {
73  if (parameter.canMergeInto(t->getTaskParameter()))
74  {
75  JLOG(j_.info()) << "Task " << parameter.finishHash_ << " with "
76  << totalNumLedgers
77  << " ledgers merged into an existing task.";
78  return;
79  }
80  }
81  JLOG(j_.info()) << "Replay " << totalNumLedgers
82  << " ledgers. Finish ledger hash "
83  << parameter.finishHash_;
84 
85  auto i = skipLists_.find(parameter.finishHash_);
86  if (i != skipLists_.end())
87  skipList = i->second.lock();
88 
89  if (!skipList) // cannot find, or found but cannot lock
90  {
91  skipList = std::make_shared<SkipListAcquire>(
92  app_,
94  parameter.finishHash_,
95  peerSetBuilder_->build());
96  skipLists_[parameter.finishHash_] = skipList;
97  newSkipList = true;
98  }
99 
100  task = std::make_shared<LedgerReplayTask>(
101  app_, inboundLedgers_, *this, skipList, std::move(parameter));
102  tasks_.push_back(task);
103  }
104 
105  if (newSkipList)
106  skipList->init(1);
107  // task init after skipList init, could save a timeout
108  task->init();
109 }
110 
111 void
113 {
114  {
115  // TODO for use cases like Consensus (i.e. totalLedgers = 1 or small):
116  // check if the last closed or validated ledger l the local node has
117  // is in the skip list and is an ancestor of parameter.startLedger
118  // that has to be downloaded, if so expand the task to start with l.
119  }
120 
121  auto const& parameter = task->getTaskParameter();
122  JLOG(j_.trace()) << "Creating " << parameter.totalLedgers_ - 1 << " deltas";
123  if (parameter.totalLedgers_ > 1)
124  {
125  auto skipListItem = std::find(
126  parameter.skipList_.begin(),
127  parameter.skipList_.end(),
128  parameter.startHash_);
129  if (skipListItem == parameter.skipList_.end() ||
130  ++skipListItem == parameter.skipList_.end())
131  {
132  JLOG(j_.error()) << "Task parameter error when creating deltas "
133  << parameter.finishHash_;
134  return;
135  }
136 
137  for (std::uint32_t seq = parameter.startSeq_ + 1;
138  seq <= parameter.finishSeq_ &&
139  skipListItem != parameter.skipList_.end();
140  ++seq, ++skipListItem)
141  {
143  bool newDelta = false;
144  {
146  if (app_.isStopping())
147  return;
148  auto i = deltas_.find(*skipListItem);
149  if (i != deltas_.end())
150  delta = i->second.lock();
151 
152  if (!delta) // cannot find, or found but cannot lock
153  {
154  delta = std::make_shared<LedgerDeltaAcquire>(
155  app_,
157  *skipListItem,
158  seq,
159  peerSetBuilder_->build());
160  deltas_[*skipListItem] = delta;
161  newDelta = true;
162  }
163  }
164 
165  task->addDelta(delta);
166  if (newDelta)
167  delta->init(1);
168  }
169  }
170 }
171 
172 void
174  LedgerInfo const& info,
175  boost::intrusive_ptr<SHAMapItem const> const& item)
176 {
177  std::shared_ptr<SkipListAcquire> skipList = {};
178  {
180  auto i = skipLists_.find(info.hash);
181  if (i == skipLists_.end())
182  return;
183  skipList = i->second.lock();
184  if (!skipList)
185  {
186  skipLists_.erase(i);
187  return;
188  }
189  }
190 
191  if (skipList)
192  skipList->processData(info.seq, item);
193 }
194 
195 void
197  LedgerInfo const& info,
199 {
201  {
203  auto i = deltas_.find(info.hash);
204  if (i == deltas_.end())
205  return;
206  delta = i->second.lock();
207  if (!delta)
208  {
209  deltas_.erase(i);
210  return;
211  }
212  }
213 
214  if (delta)
215  delta->processData(info, std::move(txns));
216 }
217 
218 void
220 {
221  auto const start = std::chrono::steady_clock::now();
222  {
224  JLOG(j_.debug()) << "Sweeping, LedgerReplayer has " << tasks_.size()
225  << " tasks, " << skipLists_.size()
226  << " skipLists, and " << deltas_.size() << " deltas.";
227 
228  tasks_.erase(
230  tasks_.begin(),
231  tasks_.end(),
232  [this](auto const& t) -> bool {
233  if (t->finished())
234  {
235  JLOG(j_.debug()) << "Sweep task "
236  << t->getTaskParameter().finishHash_;
237  return true;
238  }
239  return false;
240  }),
241  tasks_.end());
242 
243  auto removeCannotLocked = [](auto& subTasks) {
244  for (auto it = subTasks.begin(); it != subTasks.end();)
245  {
246  if (auto item = it->second.lock(); !item)
247  {
248  it = subTasks.erase(it);
249  }
250  else
251  ++it;
252  }
253  };
254  removeCannotLocked(skipLists_);
255  removeCannotLocked(deltas_);
256  }
257  JLOG(j_.debug()) << " LedgerReplayer sweep lock duration "
258  << std::chrono::duration_cast<std::chrono::milliseconds>(
260  .count()
261  << "ms";
262 }
263 
264 void
265 LedgerReplayer::stop()
266 {
267  JLOG(j_.info()) << "Stopping...";
268  {
269  std::lock_guard<std::mutex> lock(mtx_);
271  tasks_.begin(), tasks_.end(), [](auto& i) { i->cancel(); });
272  tasks_.clear();
273  auto lockAndCancel = [](auto& i) {
274  if (auto sptr = i.second.lock(); sptr)
275  {
276  sptr->cancel();
277  }
278  };
279  std::for_each(skipLists_.begin(), skipLists_.end(), lockAndCancel);
280  skipLists_.clear();
281  std::for_each(deltas_.begin(), deltas_.end(), lockAndCancel);
282  deltas_.clear();
283  }
284 
285  JLOG(j_.info()) << "Stopped";
286 }
287 
288 } // namespace ripple
ripple::Application
Definition: Application.h:115
ripple::LedgerReplayer::skipLists_
hash_map< uint256, std::weak_ptr< SkipListAcquire > > skipLists_
Definition: LedgerReplayer.h:132
std::for_each
T for_each(T... args)
std::shared_ptr
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::LedgerReplayParameters::MAX_TASKS
constexpr std::uint32_t MAX_TASKS
Definition: LedgerReplayer.h:60
ripple::LedgerInfo::hash
uint256 hash
Definition: ReadView.h:91
std::find
T find(T... args)
std::lock_guard
STL class.
ripple::Application::isStopping
virtual bool isStopping() const =0
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:83
ripple::LedgerReplayParameters::MAX_TASK_SIZE
constexpr std::uint32_t MAX_TASK_SIZE
Definition: LedgerReplayer.h:63
ripple::LedgerReplayer::deltas_
hash_map< uint256, std::weak_ptr< LedgerDeltaAcquire > > deltas_
Definition: LedgerReplayer.h:131
ripple::LedgerReplayer::gotSkipList
void gotSkipList(LedgerInfo const &info, boost::intrusive_ptr< SHAMapItem const > const &data)
Process a skip list (extracted from a TMProofPathResponse message)
Definition: LedgerReplayer.cpp:173
ripple::LedgerReplayer::createDeltas
void createDeltas(std::shared_ptr< LedgerReplayTask > task)
Create LedgerDeltaAcquire subtasks for the LedgerReplayTask task.
Definition: LedgerReplayer.cpp:112
ripple::base_uint< 256 >
ripple::LedgerReplayer::inboundLedgers_
InboundLedgers & inboundLedgers_
Definition: LedgerReplayer.h:135
ripple::LedgerReplayer::~LedgerReplayer
~LedgerReplayer()
Definition: LedgerReplayer.cpp:38
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::uint32_t
ripple::LedgerReplayer::gotReplayDelta
void gotReplayDelta(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const >> &&txns)
Process a ledger delta (extracted from a TMReplayDeltaResponse message)
Definition: LedgerReplayer.cpp:196
ripple::LedgerReplayer::replay
void replay(InboundLedger::Reason r, uint256 const &finishLedgerHash, std::uint32_t totalNumLedgers)
Replay a range of ledgers.
Definition: LedgerReplayer.cpp:45
std::remove_if
T remove_if(T... args)
std::map
STL class.
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:33
ripple::LedgerReplayTask::TaskParameter
Definition: LedgerReplayTask.h:46
ripple::LedgerReplayer::tasks_
std::vector< std::shared_ptr< LedgerReplayTask > > tasks_
Definition: LedgerReplayer.h:130
ripple::LedgerReplayer::sweep
void sweep()
Remove completed tasks.
Definition: LedgerReplayer.cpp:219
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::LedgerReplayer::LedgerReplayer
LedgerReplayer(Application &app, InboundLedgers &inboundLedgers, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
Definition: LedgerReplayer.cpp:27
std
STL namespace.
ripple::LedgerReplayer::j_
beast::Journal j_
Definition: LedgerReplayer.h:137
ripple::LedgerReplayTask::TaskParameter::finishHash_
uint256 finishHash_
Definition: LedgerReplayTask.h:51
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::LedgerReplayer::app_
Application & app_
Definition: LedgerReplayer.h:134
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:75
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:43
ripple::LedgerReplayer::peerSetBuilder_
std::unique_ptr< PeerSetBuilder > peerSetBuilder_
Definition: LedgerReplayer.h:136
std::unique_ptr
STL class.
ripple::LedgerReplayTask::TaskParameter::canMergeInto
bool canMergeInto(TaskParameter const &existingTask) const
check if this task can be merged into an existing task
Definition: LedgerReplayTask.cpp:58
ripple::LedgerReplayer::mtx_
std::mutex mtx_
Definition: LedgerReplayer.h:129
std::chrono::steady_clock::now
T now(T... args)