rippled
InboundLedgers.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/misc/NetworkOPs.h>
24 #include <ripple/basics/DecayingSample.h>
25 #include <ripple/basics/Log.h>
26 #include <ripple/beast/container/aged_map.h>
27 #include <ripple/beast/core/LexicalCast.h>
28 #include <ripple/core/JobQueue.h>
29 #include <ripple/nodestore/DatabaseShard.h>
30 #include <ripple/protocol/jss.h>
31 #include <memory>
32 #include <mutex>
33 #include <vector>
34 
35 namespace ripple {
36 
38 {
39 private:
42  // measures ledgers per second, constants are important
45 
46 public:
47  // How long before we try again to acquire the same ledger
48  static constexpr std::chrono::minutes const kReacquireInterval{5};
49 
51  Application& app,
52  clock_type& clock,
53  beast::insight::Collector::ptr const& collector,
54  std::unique_ptr<PeerSetBuilder> peerSetBuilder)
55  : app_(app)
56  , fetchRate_(clock.now())
57  , j_(app.journal("InboundLedger"))
58  , m_clock(clock)
59  , mRecentFailures(clock)
60  , mCounter(collector->make_counter("ledger_fetches"))
61  , mPeerSetBuilder(std::move(peerSetBuilder))
62  {
63  }
64 
68  uint256 const& hash,
69  std::uint32_t seq,
70  InboundLedger::Reason reason) override
71  {
72  assert(hash.isNonZero());
73  assert(
74  reason != InboundLedger::Reason::SHARD ||
75  (seq != 0 && app_.getShardStore()));
76 
77  // probably not the right rule
79  (reason != InboundLedger::Reason::GENERIC) &&
81  return {};
82 
83  bool isNew = true;
85  {
87  if (stopping_)
88  {
89  return {};
90  }
91 
92  auto it = mLedgers.find(hash);
93  if (it != mLedgers.end())
94  {
95  isNew = false;
96  inbound = it->second;
97  }
98  else
99  {
100  inbound = std::make_shared<InboundLedger>(
101  app_,
102  hash,
103  seq,
104  reason,
105  std::ref(m_clock),
106  mPeerSetBuilder->build());
107  mLedgers.emplace(hash, inbound);
108  inbound->init(sl);
109  ++mCounter;
110  }
111  }
112 
113  if (inbound->isFailed())
114  return {};
115 
116  if (!isNew)
117  inbound->update(seq);
118 
119  if (!inbound->isComplete())
120  return {};
121 
122  if (reason == InboundLedger::Reason::HISTORY)
123  {
124  if (inbound->getLedger()->stateMap().family().isShardBacked())
125  app_.getNodeStore().storeLedger(inbound->getLedger());
126  }
127  else if (reason == InboundLedger::Reason::SHARD)
128  {
129  auto shardStore = app_.getShardStore();
130  if (!shardStore)
131  {
132  JLOG(j_.error())
133  << "Acquiring shard with no shard store available";
134  return {};
135  }
136  if (inbound->getLedger()->stateMap().family().isShardBacked())
137  shardStore->setStored(inbound->getLedger());
138  else
139  shardStore->storeLedger(inbound->getLedger());
140  }
141  return inbound->getLedger();
142  }
143 
145  find(uint256 const& hash) override
146  {
147  assert(hash.isNonZero());
148 
150 
151  {
152  ScopedLockType sl(mLock);
153 
154  auto it = mLedgers.find(hash);
155  if (it != mLedgers.end())
156  {
157  ret = it->second;
158  }
159  }
160 
161  return ret;
162  }
163 
164  /*
165  This gets called when
166  "We got some data from an inbound ledger"
167 
168  inboundLedgerTrigger:
169  "What do we do with this partial data?"
170  Figures out what to do with the responses to our requests for information.
171 
172  */
173  // means "We got some data from an inbound ledger"
174 
175  // VFALCO TODO Remove the dependency on the Peer object.
178  bool
180  LedgerHash const& hash,
183  {
184  if (auto ledger = find(hash))
185  {
186  JLOG(j_.trace()) << "Got data (" << packet->nodes().size()
187  << ") for acquiring ledger: " << hash;
188 
189  // Stash the data for later processing and see if we need to
190  // dispatch
191  if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
193  jtLEDGER_DATA, "processLedgerData", [ledger]() {
194  ledger->runData();
195  });
196 
197  return true;
198  }
199 
200  JLOG(j_.trace()) << "Got data for ledger " << hash
201  << " which we're no longer acquiring";
202 
203  // If it's state node data, stash it because it still might be
204  // useful.
205  if (packet->type() == protocol::liAS_NODE)
206  {
208  jtLEDGER_DATA, "gotStaleData", [this, packet]() {
209  gotStaleData(packet);
210  });
211  }
212 
213  return false;
214  }
215 
216  void
217  logFailure(uint256 const& h, std::uint32_t seq) override
218  {
219  ScopedLockType sl(mLock);
220 
221  mRecentFailures.emplace(h, seq);
222  }
223 
224  bool
225  isFailure(uint256 const& h) override
226  {
227  ScopedLockType sl(mLock);
228 
230  return mRecentFailures.find(h) != mRecentFailures.end();
231  }
232 
239  void
241  {
242  Serializer s;
243  try
244  {
245  for (int i = 0; i < packet_ptr->nodes().size(); ++i)
246  {
247  auto const& node = packet_ptr->nodes(i);
248 
249  if (!node.has_nodeid() || !node.has_nodedata())
250  return;
251 
252  auto newNode =
253  SHAMapTreeNode::makeFromWire(makeSlice(node.nodedata()));
254 
255  if (!newNode)
256  return;
257 
258  s.erase();
259  newNode->serializeWithPrefix(s);
260 
262  newNode->getHash().as_uint256(),
263  std::make_shared<Blob>(s.begin(), s.end()));
264  }
265  }
266  catch (std::exception const&)
267  {
268  }
269  }
270 
271  void
272  clearFailures() override
273  {
274  ScopedLockType sl(mLock);
275 
276  mRecentFailures.clear();
277  mLedgers.clear();
278  }
279 
281  fetchRate() override
282  {
284  return 60 * fetchRate_.value(m_clock.now());
285  }
286 
287  // Should only be called with an inboundledger that has
288  // a reason of history or shard
289  void
290  onLedgerFetched() override
291  {
293  fetchRate_.add(1, m_clock.now());
294  }
295 
297  getInfo() override
298  {
300 
302 
303  {
304  ScopedLockType sl(mLock);
305 
306  acqs.reserve(mLedgers.size());
307  for (auto const& it : mLedgers)
308  {
309  assert(it.second);
310  acqs.push_back(it);
311  }
312  for (auto const& it : mRecentFailures)
313  {
314  if (it.second > 1)
315  ret[std::to_string(it.second)][jss::failed] = true;
316  else
317  ret[to_string(it.first)][jss::failed] = true;
318  }
319  }
320 
321  for (auto const& it : acqs)
322  {
323  // getJson is expensive, so call without the lock
324  std::uint32_t seq = it.second->getSeq();
325  if (seq > 1)
326  ret[std::to_string(seq)] = it.second->getJson(0);
327  else
328  ret[to_string(it.first)] = it.second->getJson(0);
329  }
330 
331  return ret;
332  }
333 
334  void
335  gotFetchPack() override
336  {
338  {
339  ScopedLockType sl(mLock);
340 
341  acquires.reserve(mLedgers.size());
342  for (auto const& it : mLedgers)
343  {
344  assert(it.second);
345  acquires.push_back(it.second);
346  }
347  }
348 
349  for (auto const& acquire : acquires)
350  {
351  acquire->checkLocal();
352  }
353  }
354 
355  void
356  sweep() override
357  {
358  auto const start = m_clock.now();
359 
360  // Make a list of things to sweep, while holding the lock
362  std::size_t total;
363 
364  {
365  ScopedLockType sl(mLock);
366  MapType::iterator it(mLedgers.begin());
367  total = mLedgers.size();
368 
369  stuffToSweep.reserve(total);
370 
371  while (it != mLedgers.end())
372  {
373  auto const la = it->second->getLastAction();
374 
375  if (la > start)
376  {
377  it->second->touch();
378  ++it;
379  }
380  else if ((la + std::chrono::minutes(1)) < start)
381  {
382  stuffToSweep.push_back(it->second);
383  // shouldn't cause the actual final delete
384  // since we are holding a reference in the vector.
385  it = mLedgers.erase(it);
386  }
387  else
388  {
389  ++it;
390  }
391  }
392 
394  }
395 
396  JLOG(j_.debug())
397  << "Swept " << stuffToSweep.size() << " out of " << total
398  << " inbound ledgers. Duration: "
399  << std::chrono::duration_cast<std::chrono::milliseconds>(
400  m_clock.now() - start)
401  .count()
402  << "ms";
403  }
404 
405  void
406  stop() override
407  {
408  ScopedLockType lock(mLock);
409  stopping_ = true;
410  mLedgers.clear();
411  mRecentFailures.clear();
412  }
413 
414 private:
416 
419 
420  bool stopping_ = false;
423 
425 
427 
429 };
430 
431 //------------------------------------------------------------------------------
432 
435  Application& app,
437  beast::insight::Collector::ptr const& collector)
438 {
439  return std::make_unique<InboundLedgersImp>(
440  app, clock, collector, make_PeerSetBuilder(app));
441 }
442 
443 } // namespace ripple
ripple::InboundLedgersImp::getInfo
Json::Value getInfo() override
Definition: InboundLedgers.cpp:297
ripple::Application
Definition: Application.h:115
ripple::InboundLedgersImp::fetchRate_
DecayWindow< 30, clock_type > fetchRate_
Definition: InboundLedgers.cpp:43
ripple::InboundLedger::Reason::HISTORY
@ HISTORY
ripple::Serializer::end
Blob ::iterator end()
Definition: Serializer.h:223
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:241
ripple::InboundLedger::Reason::CONSENSUS
@ CONSENSUS
std::shared_ptr< Collector >
std::exception
STL class.
ripple::base_uint::isNonZero
bool isNonZero() const
Definition: base_uint.h:537
beast::insight::Counter
A metric for measuring an integral value.
Definition: Counter.h:38
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::InboundLedgersImp::m_clock
clock_type & m_clock
Definition: InboundLedgers.cpp:415
ripple::InboundLedgersImp::app_
Application & app_
Definition: InboundLedgers.cpp:40
ripple::Serializer::erase
void erase()
Definition: Serializer.h:209
std::vector::reserve
T reserve(T... args)
ripple::InboundLedger::Reason::GENERIC
@ GENERIC
vector
std::unordered_map::find
T find(T... args)
std::unordered_map::size
T size(T... args)
ripple::InboundLedgersImp::stop
void stop() override
Definition: InboundLedgers.cpp:406
ripple::InboundLedgersImp::mPeerSetBuilder
std::unique_ptr< PeerSetBuilder > mPeerSetBuilder
Definition: InboundLedgers.cpp:428
ripple::make_InboundLedgers
std::unique_ptr< InboundLedgers > make_InboundLedgers(Application &app, InboundLedgers::clock_type &clock, beast::insight::Collector::ptr const &collector)
Definition: InboundLedgers.cpp:434
ripple::InboundLedgersImp
Definition: InboundLedgers.cpp:37
std::chrono::minutes
ripple::InboundLedgersImp::kReacquireInterval
static constexpr const std::chrono::minutes kReacquireInterval
Definition: InboundLedgers.cpp:48
ripple::DecayWindow< 30, clock_type >
std::unordered_map::emplace
T emplace(T... args)
std::recursive_mutex
STL class.
std::lock_guard
STL class.
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::InboundLedgersImp::clearFailures
void clearFailures() override
Definition: InboundLedgers.cpp:272
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::SHAMapTreeNode::makeFromWire
static std::shared_ptr< SHAMapTreeNode > makeFromWire(Slice rawNode)
Definition: SHAMapTreeNode.cpp:116
ripple::InboundLedgersImp::stopping_
bool stopping_
Definition: InboundLedgers.cpp:420
ripple::Serializer::begin
Blob ::iterator begin()
Definition: Serializer.h:218
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::jtLEDGER_DATA
@ jtLEDGER_DATA
Definition: Job.h:67
std::unordered_map::clear
T clear(T... args)
beast::abstract_clock::now
virtual time_point now() const =0
Returns the current time.
ripple::InboundLedgersImp::fetchRate
std::size_t fetchRate() override
Returns the rate of historical ledger fetches per minute.
Definition: InboundLedgers.cpp:281
ripple::InboundLedgersImp::sweep
void sweep() override
Definition: InboundLedgers.cpp:356
ripple::InboundLedgersImp::j_
const beast::Journal j_
Definition: InboundLedgers.cpp:44
std::vector::push_back
T push_back(T... args)
ripple::base_uint< 256 >
ripple::NetworkOPs::isNeedNetworkLedger
virtual bool isNeedNetworkLedger()=0
ripple::InboundLedgersImp::InboundLedgersImp
InboundLedgersImp(Application &app, clock_type &clock, beast::insight::Collector::ptr const &collector, std::unique_ptr< PeerSetBuilder > peerSetBuilder)
Definition: InboundLedgers.cpp:50
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::InboundLedgersImp::logFailure
void logFailure(uint256 const &h, std::uint32_t seq) override
Definition: InboundLedgers.cpp:217
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
std::unique_lock< std::recursive_mutex >
ripple::InboundLedgersImp::acquire
std::shared_ptr< Ledger const > acquire(uint256 const &hash, std::uint32_t seq, InboundLedger::Reason reason) override
Definition: InboundLedgers.cpp:67
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
beast::Journal::error
Stream error() const
Definition: Journal.h:333
std::unordered_map::erase
T erase(T... args)
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
beast::expire
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
Definition: aged_container_utility.h:33
std::uint32_t
ripple::InboundLedgersImp::gotFetchPack
void gotFetchPack() override
Definition: InboundLedgers.cpp:335
beast::abstract_clock< std::chrono::steady_clock >
memory
ripple::InboundLedgersImp::mRecentFailures
beast::aged_map< uint256, std::uint32_t > mRecentFailures
Definition: InboundLedgers.cpp:424
ripple::InboundLedgers
Manages the lifetime of inbound ledgers.
Definition: InboundLedgers.h:33
ripple::InboundLedgersImp::onLedgerFetched
void onLedgerFetched() override
Called when a complete ledger is obtained.
Definition: InboundLedgers.cpp:290
ripple::InboundLedgersImp::fetchRateMutex_
std::mutex fetchRateMutex_
Definition: InboundLedgers.cpp:41
std::weak_ptr< Peer >
ripple::Serializer
Definition: Serializer.h:39
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::DecayWindow::value
double value(time_point now)
Definition: DecayingSample.h:129
ripple::NodeStore::Database::storeLedger
virtual bool storeLedger(std::shared_ptr< Ledger const > const &srcLedger)=0
Store a ledger from a different database.
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
beast::detail::aged_ordered_container
Associative container where each element is also indexed by time.
Definition: aged_ordered_container.h:82
ripple::InboundLedgersImp::isFailure
bool isFailure(uint256 const &h) override
Definition: InboundLedgers.cpp:225
ripple::LedgerMaster::addFetchPack
void addFetchPack(uint256 const &hash, std::shared_ptr< Blob > data)
Definition: LedgerMaster.cpp:2140
ripple::make_PeerSetBuilder
std::unique_ptr< PeerSetBuilder > make_PeerSetBuilder(Application &app)
Definition: PeerSet.cpp:144
std::unordered_map::begin
T begin(T... args)
std
STL namespace.
ripple::DecayWindow::add
void add(double value, time_point now)
Definition: DecayingSample.h:122
ripple::InboundLedgersImp::mLock
std::recursive_mutex mLock
Definition: InboundLedgers.cpp:418
ripple::InboundLedgersImp::gotStaleData
void gotStaleData(std::shared_ptr< protocol::TMLedgerData > packet_ptr) override
We got some data for a ledger we are no longer acquiring Since we paid the price to receive it,...
Definition: InboundLedgers.cpp:240
mutex
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::InboundLedgersImp::mCounter
beast::insight::Counter mCounter
Definition: InboundLedgers.cpp:426
std::size_t
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
std::unordered_map::end
T end(T... args)
ripple::InboundLedger::Reason
Reason
Definition: InboundLedger.h:43
ripple::InboundLedgersImp::gotLedgerData
bool gotLedgerData(LedgerHash const &hash, std::shared_ptr< Peer > peer, std::shared_ptr< protocol::TMLedgerData > packet) override
We received a TMLedgerData from a peer.
Definition: InboundLedgers.cpp:179
std::unique_ptr
STL class.
ripple::InboundLedger::Reason::SHARD
@ SHARD
std::unordered_map
STL class.
ripple::InboundLedgers::clock_type
beast::abstract_clock< std::chrono::steady_clock > clock_type
Definition: InboundLedgers.h:36
ripple::InboundLedgersImp::mLedgers
MapType mLedgers
Definition: InboundLedgers.cpp:422
ripple::InboundLedgersImp::find
std::shared_ptr< InboundLedger > find(uint256 const &hash) override
Definition: InboundLedgers.cpp:145
std::ref
T ref(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145