rippled
SkipListAcquire.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedger.h>
21 #include <ripple/app/ledger/LedgerReplayer.h>
22 #include <ripple/app/ledger/impl/SkipListAcquire.h>
23 #include <ripple/app/main/Application.h>
24 #include <ripple/core/JobQueue.h>
25 #include <ripple/overlay/PeerSet.h>
26 
27 namespace ripple {
28 
30  Application& app,
31  InboundLedgers& inboundLedgers,
32  uint256 const& ledgerHash,
35  app,
36  ledgerHash,
37  LedgerReplayParameters::SUB_TASK_TIMEOUT,
39  "SkipListAcquire",
41  app.journal("LedgerReplaySkipList"))
42  , inboundLedgers_(inboundLedgers)
43  , peerSet_(std::move(peerSet))
44 {
45  JLOG(journal_.trace()) << "Create " << hash_;
46 }
47 
49 {
50  JLOG(journal_.trace()) << "Destroy " << hash_;
51 }
52 
53 void
54 SkipListAcquire::init(int numPeers)
55 {
56  ScopedLockType sl(mtx_);
57  if (!isDone())
58  {
59  trigger(numPeers, sl);
60  setTimer(sl);
61  }
62 }
63 
64 void
66 {
67  if (auto const l = app_.getLedgerMaster().getLedgerByHash(hash_); l)
68  {
69  JLOG(journal_.trace()) << "existing ledger " << hash_;
70  retrieveSkipList(l, sl);
71  return;
72  }
73 
74  if (!fallBack_)
75  {
76  peerSet_->addPeers(
77  limit,
78  [this](auto peer) {
79  return peer->supportsFeature(ProtocolFeature::LedgerReplay) &&
80  peer->hasLedger(hash_, 0);
81  },
82  [this](auto peer) {
83  if (peer->supportsFeature(ProtocolFeature::LedgerReplay))
84  {
85  JLOG(journal_.trace())
86  << "Add a peer " << peer->id() << " for " << hash_;
87  protocol::TMProofPathRequest request;
88  request.set_ledgerhash(hash_.data(), hash_.size());
89  request.set_key(
90  keylet::skip().key.data(), keylet::skip().key.size());
91  request.set_type(
92  protocol::TMLedgerMapType::lmACCOUNT_STATE);
93  peerSet_->sendRequest(request, peer);
94  }
95  else
96  {
97  JLOG(journal_.trace()) << "Add a no feature peer "
98  << peer->id() << " for " << hash_;
99  if (++noFeaturePeerCount_ >=
101  {
102  JLOG(journal_.debug()) << "Fall back for " << hash_;
105  fallBack_ = true;
106  }
107  }
108  });
109  }
110 
111  if (fallBack_)
113 }
114 
115 void
117 {
118  JLOG(journal_.trace()) << "mTimeouts=" << timeouts_ << " for " << hash_;
120  {
121  failed_ = true;
122  JLOG(journal_.debug()) << "too many timeouts " << hash_;
123  notify(sl);
124  }
125  else
126  {
127  trigger(1, sl);
128  }
129 }
130 
133 {
134  return shared_from_this();
135 }
136 
137 void
139  std::uint32_t ledgerSeq,
140  boost::intrusive_ptr<SHAMapItem const> const& item)
141 {
142  assert(ledgerSeq != 0 && item);
143  ScopedLockType sl(mtx_);
144  if (isDone())
145  return;
146 
147  JLOG(journal_.trace()) << "got data for " << hash_;
148  try
149  {
150  if (auto sle =
151  std::make_shared<SLE>(SerialIter{item->slice()}, item->key());
152  sle)
153  {
154  if (auto const& skipList = sle->getFieldV256(sfHashes).value();
155  !skipList.empty())
156  onSkipListAcquired(skipList, ledgerSeq, sl);
157  return;
158  }
159  }
160  catch (...)
161  {
162  }
163 
164  failed_ = true;
165  JLOG(journal_.error()) << "failed to retrieve Skip list from verified data "
166  << hash_;
167  notify(sl);
168 }
169 
170 void
172 {
173  ScopedLockType sl(mtx_);
174  dataReadyCallbacks_.emplace_back(std::move(cb));
175  if (isDone())
176  {
177  JLOG(journal_.debug())
178  << "task added to a finished SkipListAcquire " << hash_;
179  notify(sl);
180  }
181 }
182 
185 {
186  ScopedLockType sl(mtx_);
187  return data_;
188 }
189 
190 void
192  std::shared_ptr<Ledger const> const& ledger,
193  ScopedLockType& sl)
194 {
195  if (auto const hashIndex = ledger->read(keylet::skip());
196  hashIndex && hashIndex->isFieldPresent(sfHashes))
197  {
198  auto const& slist = hashIndex->getFieldV256(sfHashes).value();
199  if (!slist.empty())
200  {
201  onSkipListAcquired(slist, ledger->seq(), sl);
202  return;
203  }
204  }
205 
206  failed_ = true;
207  JLOG(journal_.error()) << "failed to retrieve Skip list from a ledger "
208  << hash_;
209  notify(sl);
210 }
211 
212 void
214  std::vector<uint256> const& skipList,
215  std::uint32_t ledgerSeq,
216  ScopedLockType& sl)
217 {
218  complete_ = true;
219  data_ = std::make_shared<SkipListData>(ledgerSeq, skipList);
220  JLOG(journal_.debug()) << "Skip list acquired " << hash_;
221  notify(sl);
222 }
223 
224 void
226 {
227  assert(isDone());
230  auto const good = !failed_;
231  sl.unlock();
232 
233  for (auto& cb : toCall)
234  {
235  cb(good, hash_);
236  }
237 
238  sl.lock();
239 }
240 
241 } // namespace ripple
ripple::SkipListAcquire::~SkipListAcquire
~SkipListAcquire() override
Definition: SkipListAcquire.cpp:48
ripple::Application
Definition: Application.h:115
std::unique_lock::lock
T lock(T... args)
ripple::LedgerReplayParameters::SUB_TASK_MAX_TIMEOUTS
constexpr std::uint32_t SUB_TASK_MAX_TIMEOUTS
Definition: LedgerReplayer.h:51
std::shared_ptr
STL class.
ripple::SkipListAcquire::retrieveSkipList
void retrieveSkipList(std::shared_ptr< Ledger const > const &ledger, ScopedLockType &sl)
Retrieve the skip list from the ledger.
Definition: SkipListAcquire.cpp:191
ripple::LedgerReplayParameters::SUB_TASK_FALLBACK_TIMEOUT
constexpr auto SUB_TASK_FALLBACK_TIMEOUT
Definition: LedgerReplayer.h:57
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::TimeoutCounter::setTimer
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
Definition: TimeoutCounter.cpp:50
ripple::SkipListAcquire::inboundLedgers_
InboundLedgers & inboundLedgers_
Definition: SkipListAcquire.h:156
ripple::jtREPLAY_TASK
@ jtREPLAY_TASK
Definition: Job.h:62
ripple::InboundLedger::Reason::GENERIC
@ GENERIC
std::vector
STL class.
ripple::SkipListAcquire::trigger
void trigger(std::size_t limit, ScopedLockType &sl)
Trigger another round.
Definition: SkipListAcquire.cpp:65
ripple::SkipListAcquire::SkipListAcquire
SkipListAcquire(Application &app, InboundLedgers &inboundLedgers, uint256 const &ledgerHash, std::unique_ptr< PeerSet > peerSet)
Constructor.
Definition: SkipListAcquire.cpp:29
ripple::ProtocolFeature::LedgerReplay
@ LedgerReplay
ripple::keylet::skip
Keylet const & skip() noexcept
The index of the "short" skip list.
Definition: Indexes.cpp:145
ripple::LedgerMaster::getLedgerByHash
std::shared_ptr< Ledger const > getLedgerByHash(uint256 const &hash)
Definition: LedgerMaster.cpp:1854
ripple::SkipListAcquire::noFeaturePeerCount_
std::uint32_t noFeaturePeerCount_
Definition: SkipListAcquire.h:160
std::function
ripple::SkipListAcquire::onSkipListAcquired
void onSkipListAcquired(std::vector< uint256 > const &skipList, std::uint32_t ledgerSeq, ScopedLockType &sl)
Process the skip list.
Definition: SkipListAcquire.cpp:213
ripple::TimeoutCounter
This class is an "active" object.
Definition: TimeoutCounter.h:66
ripple::base_uint::data
pointer data()
Definition: base_uint.h:122
ripple::TimeoutCounter::mtx_
std::recursive_mutex mtx_
Definition: TimeoutCounter.h:125
ripple::base_uint::size
constexpr static std::size_t size()
Definition: base_uint.h:519
std::unique_lock::unlock
T unlock(T... args)
ripple::LedgerReplayParameters::MAX_QUEUED_TASKS
constexpr std::uint32_t MAX_QUEUED_TASKS
Definition: LedgerReplayer.h:66
ripple::Keylet::key
uint256 key
Definition: Keylet.h:40
ripple::base_uint< 256 >
std::enable_shared_from_this< SkipListAcquire >::shared_from_this
T shared_from_this(T... args)
ripple::TimeoutCounter::app_
Application & app_
Definition: TimeoutCounter.h:123
ripple::SkipListAcquire::fallBack_
bool fallBack_
Definition: SkipListAcquire.h:161
ripple::SkipListAcquire::pmDowncast
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
Definition: SkipListAcquire.cpp:132
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::InboundLedgers::acquire
virtual std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason)=0
ripple::TimeoutCounter::failed_
bool failed_
Definition: TimeoutCounter.h:132
ripple::SkipListAcquire::init
void init(int numPeers)
Start the SkipListAcquire task.
Definition: SkipListAcquire.cpp:54
std::unique_lock< std::recursive_mutex >
ripple::SkipListAcquire::onTimer
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
Definition: SkipListAcquire.cpp:116
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::SerialIter
Definition: Serializer.h:310
std::uint32_t
ripple::TimeoutCounter::isDone
bool isDone() const
Definition: TimeoutCounter.h:116
ripple::SkipListAcquire::dataReadyCallbacks_
std::vector< OnSkipListDataCB > dataReadyCallbacks_
Definition: SkipListAcquire.h:158
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:33
std::swap
T swap(T... args)
std::weak_ptr
STL class.
ripple::sfHashes
const SF_VECTOR256 sfHashes
ripple::SkipListAcquire::data_
std::shared_ptr< SkipListData const > data_
Definition: SkipListAcquire.h:159
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::SkipListAcquire::peerSet_
std::unique_ptr< PeerSet > peerSet_
Definition: SkipListAcquire.h:157
ripple::SkipListAcquire::addDataCallback
void addDataCallback(OnSkipListDataCB &&cb)
Add a callback that will be called when the skipList is ready or failed.
Definition: SkipListAcquire.cpp:171
ripple::TimeoutCounter::hash_
const uint256 hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
Definition: TimeoutCounter.h:129
ripple::SkipListAcquire::notify
void notify(ScopedLockType &sl)
Call the OnSkipListDataCB callbacks.
Definition: SkipListAcquire.cpp:225
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::LedgerReplayParameters::MAX_NO_FEATURE_PEER_COUNT
constexpr auto MAX_NO_FEATURE_PEER_COUNT
Definition: LedgerReplayer.h:55
ripple::SkipListAcquire::processData
void processData(std::uint32_t ledgerSeq, boost::intrusive_ptr< SHAMapItem const > const &item)
Process the data extracted from a peer's reply.
Definition: SkipListAcquire.cpp:138
ripple::TimeoutCounter::timerInterval_
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
Definition: TimeoutCounter.h:136
ripple::TimeoutCounter::complete_
bool complete_
Definition: TimeoutCounter.h:131
ripple::TimeoutCounter::timeouts_
int timeouts_
Definition: TimeoutCounter.h:130
std::unique_ptr
STL class.
ripple::TimeoutCounter::journal_
beast::Journal journal_
Definition: TimeoutCounter.h:124
ripple::SkipListAcquire::getData
std::shared_ptr< SkipListData const > getData() const
Definition: SkipListAcquire.cpp:184