rippled
ETLHelpers.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_APP_REPORTING_ETLHELPERS_H_INCLUDED
21 #define RIPPLE_APP_REPORTING_ETLHELPERS_H_INCLUDED
22 #include <ripple/app/main/Application.h>
23 #include <ripple/ledger/ReadView.h>
24 #include <condition_variable>
25 #include <mutex>
26 #include <optional>
27 #include <queue>
28 #include <sstream>
29 
30 namespace ripple {
31 
40 {
41  // max sequence validated by network
43 
44  mutable std::mutex m_;
45 
47 
48  bool stopping_ = false;
49 
50 public:
53  void
54  push(uint32_t idx)
55  {
56  std::lock_guard lck(m_);
57  if (!max_ || idx > *max_)
58  max_ = idx;
59  cv_.notify_all();
60  }
61 
67  getMostRecent() const
68  {
69  std::unique_lock lck(m_);
70  cv_.wait(lck, [this]() { return max_ || stopping_; });
71  return max_;
72  }
73 
79  {
80  std::unique_lock lk(m_);
81  return max_;
82  }
83 
88  bool
89  waitUntilValidatedByNetwork(uint32_t sequence)
90  {
91  std::unique_lock lck(m_);
92  cv_.wait(lck, [sequence, this]() {
93  return (max_ && sequence <= *max_) || stopping_;
94  });
95  return !stopping_;
96  }
97 
101  void
103  {
104  std::lock_guard lck(m_);
105  stopping_ = true;
106  cv_.notify_all();
107  }
108 };
109 
114 template <class T>
116 {
118 
119  mutable std::mutex m_;
122 
123 public:
126  explicit ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize)
127  {
128  }
129 
131  ThreadSafeQueue() = default;
132 
135  void
136  push(T const& elt)
137  {
138  std::unique_lock lck(m_);
139  // if queue has a max size, wait until not full
140  if (maxSize_)
141  cv_.wait(lck, [this]() { return queue_.size() <= *maxSize_; });
142  queue_.push(elt);
143  cv_.notify_all();
144  }
145 
148  void
149  push(T&& elt)
150  {
151  std::unique_lock lck(m_);
152  // if queue has a max size, wait until not full
153  if (maxSize_)
154  cv_.wait(lck, [this]() { return queue_.size() <= *maxSize_; });
155  queue_.push(std::move(elt));
156  cv_.notify_all();
157  }
158 
160  T
161  pop()
162  {
163  std::unique_lock lck(m_);
164  cv_.wait(lck, [this]() { return !queue_.empty(); });
165  T ret = std::move(queue_.front());
166  queue_.pop();
167  // if queue has a max size, unblock any possible pushers
168  if (maxSize_)
169  cv_.notify_all();
170  return ret;
171  }
172 };
173 
177 getMarkers(size_t numMarkers)
178 {
179  assert(numMarkers <= 256);
180 
181  unsigned char incr = 256 / numMarkers;
182 
183  std::vector<uint256> markers;
184  markers.reserve(numMarkers);
185  uint256 base{0};
186  for (size_t i = 0; i < numMarkers; ++i)
187  {
188  markers.push_back(base);
189  base.data()[0] += incr;
190  }
191  return markers;
192 }
193 
194 } // namespace ripple
195 #endif
sstream
ripple::ThreadSafeQueue::queue_
std::queue< T > queue_
Definition: ETLHelpers.h:117
ripple::ThreadSafeQueue
Generic thread-safe queue with an optional maximum size Note, we can't use a lockfree queue here,...
Definition: ETLHelpers.h:115
std::vector::reserve
T reserve(T... args)
ripple::ThreadSafeQueue::push
void push(T const &elt)
Definition: ETLHelpers.h:136
std::vector
STL class.
ripple::ThreadSafeQueue::cv_
std::condition_variable cv_
Definition: ETLHelpers.h:120
ripple::NetworkValidatedLedgers::push
void push(uint32_t idx)
Notify the datastructure that idx has been validated by the network.
Definition: ETLHelpers.h:54
ripple::NetworkValidatedLedgers::waitUntilValidatedByNetwork
bool waitUntilValidatedByNetwork(uint32_t sequence)
Waits for the sequence to be validated by the network.
Definition: ETLHelpers.h:89
ripple::NetworkValidatedLedgers::stop
void stop()
Puts the datastructure in the stopped state Future calls to this datastructure will not block This op...
Definition: ETLHelpers.h:102
std::lock_guard
STL class.
ripple::NetworkValidatedLedgers
This datastructure is used to keep track of the sequence of the most recent ledger validated by the n...
Definition: ETLHelpers.h:39
ripple::ThreadSafeQueue::m_
std::mutex m_
Definition: ETLHelpers.h:119
ripple::ThreadSafeQueue::push
void push(T &&elt)
Definition: ETLHelpers.h:149
ripple::ThreadSafeQueue::pop
T pop()
Definition: ETLHelpers.h:161
ripple::ThreadSafeQueue::ThreadSafeQueue
ThreadSafeQueue()=default
Create a queue with no maximum size.
queue
ripple::NetworkValidatedLedgers::getMostRecent
std::optional< uint32_t > getMostRecent() const
Get most recently validated sequence.
Definition: ETLHelpers.h:67
ripple::base_uint::data
pointer data()
Definition: base_uint.h:122
std::vector::push_back
T push_back(T... args)
ripple::base_uint< 256 >
ripple::NetworkValidatedLedgers::max_
std::optional< uint32_t > max_
Definition: ETLHelpers.h:42
std::unique_lock
STL class.
ripple::ThreadSafeQueue::maxSize_
std::optional< uint32_t > maxSize_
Definition: ETLHelpers.h:121
std::condition_variable::wait
T wait(T... args)
ripple::NetworkValidatedLedgers::tryGetMostRecent
std::optional< uint32_t > tryGetMostRecent() const
Get most recently validated sequence.
Definition: ETLHelpers.h:78
ripple::NetworkValidatedLedgers::m_
std::mutex m_
Definition: ETLHelpers.h:44
ripple::NetworkValidatedLedgers::stopping_
bool stopping_
Definition: ETLHelpers.h:48
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
condition_variable
optional
mutex
ripple::ThreadSafeQueue::ThreadSafeQueue
ThreadSafeQueue(uint32_t maxSize)
Definition: ETLHelpers.h:126
std::condition_variable::notify_all
T notify_all(T... args)
ripple::getMarkers
std::vector< uint256 > getMarkers(size_t numMarkers)
Parititions the uint256 keyspace into numMarkers partitions, each of equal size.
Definition: ETLHelpers.h:177
ripple::NetworkValidatedLedgers::cv_
std::condition_variable cv_
Definition: ETLHelpers.h:46