rippled
LedgerDeltaAcquire.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/BuildLedger.h>
21 #include <ripple/app/ledger/InboundLedger.h>
22 #include <ripple/app/ledger/LedgerReplay.h>
23 #include <ripple/app/ledger/LedgerReplayer.h>
24 #include <ripple/app/ledger/impl/LedgerDeltaAcquire.h>
25 #include <ripple/app/main/Application.h>
26 #include <ripple/core/JobQueue.h>
27 #include <ripple/overlay/PeerSet.h>
28 
29 namespace ripple {
30 
32  Application& app,
33  InboundLedgers& inboundLedgers,
34  uint256 const& ledgerHash,
35  std::uint32_t ledgerSeq,
38  app,
39  ledgerHash,
40  LedgerReplayParameters::SUB_TASK_TIMEOUT,
42  "LedgerReplayDelta",
44  app.journal("LedgerReplayDelta"))
45  , inboundLedgers_(inboundLedgers)
46  , ledgerSeq_(ledgerSeq)
47  , peerSet_(std::move(peerSet))
48 {
49  JLOG(journal_.trace()) << "Create " << hash_ << " Seq " << ledgerSeq;
50 }
51 
53 {
54  JLOG(journal_.trace()) << "Destroy " << hash_;
55 }
56 
57 void
59 {
60  ScopedLockType sl(mtx_);
61  if (!isDone())
62  {
63  trigger(numPeers, sl);
64  setTimer(sl);
65  }
66 }
67 
68 void
70 {
72  if (fullLedger_)
73  {
74  complete_ = true;
75  JLOG(journal_.trace()) << "existing ledger " << hash_;
76  notify(sl);
77  return;
78  }
79 
80  if (!fallBack_)
81  {
82  peerSet_->addPeers(
83  limit,
84  [this](auto peer) {
85  return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
86  peer->hasLedger(hash_, ledgerSeq_);
87  },
88  [this](auto peer) {
89  if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
90  {
91  JLOG(journal_.trace())
92  << "Add a peer " << peer->id() << " for " << hash_;
93  protocol::TMReplayDeltaRequest request;
94  request.set_ledgerhash(hash_.data(), hash_.size());
95  peerSet_->sendRequest(request, peer);
96  }
97  else
98  {
99  if (++noFeaturePeerCount >=
101  {
102  JLOG(journal_.debug()) << "Fall back for " << hash_;
105  fallBack_ = true;
106  }
107  }
108  });
109  }
110 
111  if (fallBack_)
114 }
115 
116 void
118 {
119  JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
121  {
122  failed_ = true;
123  JLOG(journal_.debug()) << "too many timeouts " << hash_;
124  notify(sl);
125  }
126  else
127  {
128  trigger(1, sl);
129  }
130 }
131 
134 {
135  return shared_from_this();
136 }
137 
138 void
140  LedgerInfo const& info,
142 {
143  ScopedLockType sl(mtx_);
144  JLOG(journal_.trace()) << "got data for " << hash_;
145  if (isDone())
146  return;
147 
148  if (info.seq == ledgerSeq_)
149  {
150  // create a temporary ledger for building a LedgerReplay object later
151  replayTemp_ =
152  std::make_shared<Ledger>(info, app_.config(), app_.getNodeFamily());
153  if (replayTemp_)
154  {
155  complete_ = true;
156  orderedTxns_ = std::move(orderedTxns);
157  JLOG(journal_.debug()) << "ready to replay " << hash_;
158  notify(sl);
159  return;
160  }
161  }
162 
163  failed_ = true;
164  JLOG(journal_.error())
165  << "failed to create a (info only) ledger from verified data " << hash_;
166  notify(sl);
167 }
168 
169 void
171  InboundLedger::Reason reason,
172  OnDeltaDataCB&& cb)
173 {
174  ScopedLockType sl(mtx_);
175  dataReadyCallbacks_.emplace_back(std::move(cb));
176  if (reasons_.count(reason) == 0)
177  {
178  reasons_.emplace(reason);
179  if (fullLedger_)
180  onLedgerBuilt(sl, reason);
181  }
182 
183  if (isDone())
184  {
185  JLOG(journal_.debug())
186  << "task added to a finished LedgerDeltaAcquire " << hash_;
187  notify(sl);
188  }
189 }
190 
193 {
194  ScopedLockType sl(mtx_);
195 
196  if (fullLedger_)
197  return fullLedger_;
198 
199  if (failed_ || !complete_ || !replayTemp_)
200  return {};
201 
202  assert(parent->seq() + 1 == replayTemp_->seq());
203  assert(parent->info().hash == replayTemp_->info().parentHash);
204  // build ledger
205  LedgerReplay replayData(parent, replayTemp_, std::move(orderedTxns_));
206  fullLedger_ = buildLedger(replayData, tapNONE, app_, journal_);
207  if (fullLedger_ && fullLedger_->info().hash == hash_)
208  {
209  JLOG(journal_.info()) << "Built " << hash_;
210  onLedgerBuilt(sl);
211  return fullLedger_;
212  }
213  else
214  {
215  failed_ = true;
216  complete_ = false;
217  JLOG(journal_.error()) << "tryBuild failed " << hash_ << " with parent "
218  << parent->info().hash;
219  Throw<std::runtime_error>("Cannot replay ledger");
220  }
221 }
222 
223 void
225  ScopedLockType& sl,
227 {
228  JLOG(journal_.debug()) << "onLedgerBuilt " << hash_
229  << (reason ? " for a new reason" : "");
230 
232  reasons_.begin(), reasons_.end());
233  bool firstTime = true;
234  if (reason) // small chance
235  {
236  reasons.clear();
237  reasons.push_back(*reason);
238  firstTime = false;
239  }
242  "onLedgerBuilt",
243  [=, ledger = this->fullLedger_, &app = this->app_]() {
244  for (auto reason : reasons)
245  {
246  switch (reason)
247  {
249  app.getLedgerMaster().storeLedger(ledger);
250  break;
251  default:
252  // TODO for other use cases
253  break;
254  }
255  }
256 
257  if (firstTime)
258  app.getLedgerMaster().tryAdvance();
259  });
260 }
261 
262 void
264 {
265  assert(isDone());
268  auto const good = !failed_;
269  sl.unlock();
270 
271  for (auto& cb : toCall)
272  {
273  cb(good, hash_);
274  }
275 
276  sl.lock();
277 }
278 
279 } // namespace ripple
ripple::LedgerDeltaAcquire::dataReadyCallbacks_
std::vector< OnDeltaDataCB > dataReadyCallbacks_
Definition: LedgerDeltaAcquire.h:155
ripple::Application
Definition: Application.h:115
ripple::Application::getNodeFamily
virtual Family & getNodeFamily()=0
ripple::LedgerDeltaAcquire::notify
void notify(ScopedLockType &sl)
Call the OnDeltaDataCB callbacks.
Definition: LedgerDeltaAcquire.cpp:263
std::unique_lock::lock
T lock(T... args)
ripple::LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS
constexpr std::uint32_t SUB_TASK_MAX_TIMEOUTS
Definition: LedgerReplayer.h:51
std::shared_ptr
STL class.
ripple::LedgerDeltaAcquire::ledgerSeq_
const std::uint32_t ledgerSeq_
Definition: LedgerDeltaAcquire.h:150
ripple::LedgerDeltaAcquire::orderedTxns_
std::map< std::uint32_t, std::shared_ptr< STTx const > > orderedTxns_
Definition: LedgerDeltaAcquire.h:154
ripple::LedgerDeltaAcquire::processData
void processData(LedgerInfo const &info, std::map< std::uint32_t, std::shared_ptr< STTx const >> &&orderedTxns)
Process the data extracted from a peer's reply.
Definition: LedgerDeltaAcquire.cpp:139
ripple::LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT
constexpr auto SUB_TASK_FALLBACK_TIMEOUT
Definition: LedgerReplayer.h:57
ripple::LedgerDeltaAcquire::~LedgerDeltaAcquire
~LedgerDeltaAcquire() override
Definition: LedgerDeltaAcquire.cpp:52
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::TimeoutCounter::setTimer
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
Definition: TimeoutCounter.cpp:50
ripple::jtREPLAY_TASK
@ jtREPLAY_TASK
Definition: Job.h:62
ripple::LedgerDeltaAcquire::peerSet_
std::unique_ptr< PeerSet > peerSet_
Definition: LedgerDeltaAcquire.h:151
ripple::InboundLedger::Reason::GENERIC
@ GENERIC
std::vector
STL class.
ripple::LedgerDeltaAcquire::init
void init(int numPeers)
Start the LedgerDeltaAcquire task.
Definition: LedgerDeltaAcquire.cpp:58
ripple::ProtocolFeature::LedgerReplay
@ LedgerReplay
std::set::emplace
T emplace(T... args)
ripple::LedgerMaster::getLedgerByHash
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
Definition: LedgerMaster.cpp:1854
ripple::LedgerDeltaAcquire::fullLedger_
std::shared_ptr< Ledger const > fullLedger_
Definition: LedgerDeltaAcquire.h:153
ripple::LedgerDeltaAcquire::inboundLedgers_
InboundLedgers & inboundLedgers_
Definition: LedgerDeltaAcquire.h:149
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
std::function
ripple::LedgerInfo::seq
LedgerIndex seq
Definition: ReadView.h:83
ripple::tapNONE
@ tapNONE
Definition: ApplyView.h:30
ripple::TimeoutCounter
This class is an "active" object.
Definition: TimeoutCounter.h:66
ripple::base_uint::data
pointer data()
Definition: base_uint.h:122
ripple::TimeoutCounter::mtx_
std::recursive_mutex mtx_
Definition: TimeoutCounter.h:125
ripple::base_uint::size
constexpr static std::size_t size()
Definition: base_uint.h:519
std::unique_lock::unlock
T unlock(T... args)
ripple::LedgerReplayParameters::MAX_QUEUED_TASKS
constexpr std::uint32_t MAX_QUEUED_TASKS
Definition: LedgerReplayer.h:66
ripple::base_uint< 256 >
ripple::LedgerDeltaAcquire::replayTemp_
std::shared_ptr< Ledger const > replayTemp_
Definition: LedgerDeltaAcquire.h:152
std::enable_shared_from_this< LedgerDeltaAcquire >::shared_from_this
T shared_from_this(T... args)
ripple::TimeoutCounter::app_
Application & app_
Definition: TimeoutCounter.h:123
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::InboundLedgers::acquire
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
ripple::TimeoutCounter::failed_
bool failed_
Definition: TimeoutCounter.h:132
ripple::LedgerDeltaAcquire::reasons_
std::set< InboundLedger::Reason > reasons_
Definition: LedgerDeltaAcquire.h:156
ripple::Application::config
virtual Config & config()=0
std::unique_lock< std::recursive_mutex >
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::LedgerDeltaAcquire::trigger
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
Definition: LedgerDeltaAcquire.cpp:69
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::uint32_t
std::map
STL class.
ripple::TimeoutCounter::isDone
bool isDone() const
Definition: TimeoutCounter.h:116
ripple::LedgerDeltaAcquire::addDataCallback
void addDataCallback(InboundLedger::Reason reason, OnDeltaDataCB &&cb)
Add a reason and a callback to the LedgerDeltaAcquire subtask.
Definition: LedgerDeltaAcquire.cpp:170
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:33
std::swap
T swap(T... args)
ripple::LedgerDeltaAcquire::pmDowncast
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
Definition: LedgerDeltaAcquire.cpp:133
std::weak_ptr
STL class.
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::TimeoutCounter::hash_
const uint256 hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
Definition: TimeoutCounter.h:129
ripple::LedgerDeltaAcquire::noFeaturePeerCount
std::uint32_t noFeaturePeerCount
Definition: LedgerDeltaAcquire.h:157
std::set::begin
T begin(T... args)
ripple::LedgerDeltaAcquire::fallBack_
bool fallBack_
Definition: LedgerDeltaAcquire.h:158
ripple::buildLedger
std::shared_ptr< Ledger > buildLedger(std::shared_ptr< Ledger const > const &parent, NetClock::time_point closeTime, const bool closeTimeCorrect, NetClock::duration closeResolution, Application &app, CanonicalTXSet &txns, std::set< TxID > &failedTxs, beast::Journal j)
Build a new ledger by applying consensus transactions.
Definition: BuildLedger.cpp:178
std::set::count
T count(T... args)
ripple::LedgerDeltaAcquire::tryBuild
std::shared_ptr< Ledger const > tryBuild(std::shared_ptr< Ledger const > const &parent)
Try to build the ledger if not already.
Definition: LedgerDeltaAcquire.cpp:192
std::optional
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT
constexpr auto MAX_NO_FEATURE_PEER_COUNT
Definition: LedgerReplayer.h:55
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:75
std::set::end
T end(T... args)
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:43
ripple::TimeoutCounter::timerInterval_
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
Definition: TimeoutCounter.h:136
ripple::TimeoutCounter::complete_
bool complete_
Definition: TimeoutCounter.h:131
ripple::TimeoutCounter::timeouts_
int timeouts_
Definition: TimeoutCounter.h:130
std::unique_ptr
STL class.
ripple::LedgerDeltaAcquire::LedgerDeltaAcquire
LedgerDeltaAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::uint32_t ledgerSeq, std::unique_ptr< PeerSet > peerSet)
Constructor.
Definition: LedgerDeltaAcquire.cpp:31
ripple::LedgerDeltaAcquire::onTimer
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
Definition: LedgerDeltaAcquire.cpp:117
ripple::TimeoutCounter::journal_
beast::Journal journal_
Definition: TimeoutCounter.h:124
ripple::LedgerDeltaAcquire::onLedgerBuilt
void onLedgerBuilt(ScopedLockType &sl, std::optional< InboundLedger::Reason > reason={})
Process a newly built ledger, such as store it.
Definition: LedgerDeltaAcquire.cpp:224
ripple::LedgerReplay
Definition: LedgerReplay.h:33