rippled
ClosureCounter.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2017 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_CORE_CLOSURE_COUNTER_H_INCLUDED
21 #define RIPPLE_CORE_CLOSURE_COUNTER_H_INCLUDED
22 
23 #include <ripple/basics/Log.h>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <optional>
28 #include <type_traits>
29 
30 namespace ripple {
31 
53 template <typename Ret_t, typename... Args_t>
55 {
56 private:
57  std::mutex mutable mutex_{};
59  bool waitForClosures_{false}; // guard with mutex_
61 
62  // Increment the count.
65  {
66  ++closureCount_;
67  return *this;
68  }
69 
70  // Decrement the count. If we're stopping and the count drops to zero
71  // notify allClosuresDoneCond_.
74  {
75  // Even though closureCount_ is atomic, we decrement its value under
76  // a lock. This removes a small timing window that occurs if the
77  // waiting thread is handling a spurious wakeup when closureCount_
78  // drops to zero.
79  std::lock_guard lock{mutex_};
80 
81  // Update closureCount_. Notify if stopping and closureCount_ == 0.
82  if ((--closureCount_ == 0) && waitForClosures_)
84  return *this;
85  }
86 
87  // A private template class that helps count the number of closures
88  // in flight. This allows callers to block until all their postponed
89  // closures are dispatched.
90  template <typename Closure>
91  class Substitute
92  {
93  private:
96 
97  static_assert(
98  std::is_same<decltype(closure_(std::declval<Args_t>()...)), Ret_t>::
99  value,
100  "Closure arguments don't match ClosureCounter Ret_t or Args_t");
101 
102  public:
103  Substitute() = delete;
104 
105  Substitute(Substitute const& rhs)
106  : counter_(rhs.counter_), closure_(rhs.closure_)
107  {
108  ++counter_;
109  }
110 
111  Substitute(Substitute&& rhs) noexcept(
113  : counter_(rhs.counter_), closure_(std::move(rhs.closure_))
114  {
115  ++counter_;
116  }
117 
118  Substitute(ClosureCounter& counter, Closure&& closure)
119  : counter_(counter), closure_(std::forward<Closure>(closure))
120  {
121  ++counter_;
122  }
123 
124  Substitute&
125  operator=(Substitute const& rhs) = delete;
126  Substitute&
127  operator=(Substitute&& rhs) = delete;
128 
130  {
131  --counter_;
132  }
133 
134  // Note that Args_t is not deduced, it is explicit. So Args_t&&
135  // would be an rvalue reference, not a forwarding reference. We
136  // want to forward exactly what the user declared.
137  Ret_t
138  operator()(Args_t... args)
139  {
140  return closure_(std::forward<Args_t>(args)...);
141  }
142  };
143 
144 public:
145  ClosureCounter() = default;
146  // Not copyable or movable. Outstanding counts would be hard to sort out.
147  ClosureCounter(ClosureCounter const&) = delete;
148 
150  operator=(ClosureCounter const&) = delete;
151 
154  {
155  using namespace std::chrono_literals;
156  join("ClosureCounter", 1s, debugLog());
157  }
158 
165  void
166  join(char const* name, std::chrono::milliseconds wait, beast::Journal j)
167  {
169  waitForClosures_ = true;
170  if (closureCount_ > 0)
171  {
173  lock, wait, [this] { return closureCount_ == 0; }))
174  {
175  if (auto stream = j.error())
176  stream << name << " waiting for ClosureCounter::join().";
178  lock, [this] { return closureCount_ == 0; });
179  }
180  }
181  }
182 
190  template <class Closure>
192  wrap(Closure&& closure)
193  {
195 
196  std::lock_guard lock{mutex_};
197  if (!waitForClosures_)
198  ret.emplace(*this, std::forward<Closure>(closure));
199 
200  return ret;
201  }
202 
204  int
205  count() const
206  {
207  return closureCount_;
208  }
209 
216  bool
217  joined() const
218  {
219  std::lock_guard lock{mutex_};
220  return waitForClosures_;
221  }
222 };
223 
224 } // namespace ripple
225 
226 #endif // RIPPLE_CORE_CLOSURE_COUNTER_H_INCLUDED
ripple::ClosureCounter::Substitute::Substitute
Substitute(Substitute &&rhs) noexcept(std::is_nothrow_move_constructible< Closure >::value)
Definition: ClosureCounter.h:111
ripple::ClosureCounter::Substitute::Substitute
Substitute()=delete
std::is_same
ripple::ClosureCounter::ClosureCounter
ClosureCounter()=default
ripple::ClosureCounter::mutex_
std::mutex mutex_
Definition: ClosureCounter.h:57
std::chrono::milliseconds
std::optional::emplace
T emplace(T... args)
std::lock_guard
STL class.
std::is_nothrow_move_constructible
ripple::debugLog
beast::Journal debugLog()
Returns a debug journal.
Definition: Log.cpp:452
ripple::ClosureCounter::operator--
ClosureCounter & operator--()
Definition: ClosureCounter.h:73
ripple::ClosureCounter::~ClosureCounter
~ClosureCounter()
Destructor verifies all in-flight closures are complete.
Definition: ClosureCounter.h:153
ripple::ClosureCounter::Substitute::operator=
Substitute & operator=(Substitute const &rhs)=delete
ripple::ClosureCounter::allClosuresDoneCond_
std::condition_variable allClosuresDoneCond_
Definition: ClosureCounter.h:58
ripple::ClosureCounter::waitForClosures_
bool waitForClosures_
Definition: ClosureCounter.h:59
std::unique_lock
STL class.
ripple::ClosureCounter::operator++
ClosureCounter & operator++()
Definition: ClosureCounter.h:64
beast::Journal::error
Stream error() const
Definition: Journal.h:333
ripple::ClosureCounter::Substitute::operator()
Ret_t operator()(Args_t... args)
Definition: ClosureCounter.h:138
ripple::ClosureCounter
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
Definition: ClosureCounter.h:54
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::condition_variable::wait
T wait(T... args)
atomic
ripple::ClosureCounter::Substitute::~Substitute
~Substitute()
Definition: ClosureCounter.h:129
ripple::ClosureCounter::operator=
ClosureCounter & operator=(ClosureCounter const &)=delete
std::condition_variable::wait_for
T wait_for(T... args)
ripple::ClosureCounter::Substitute
Definition: ClosureCounter.h:91
ripple::ClosureCounter::Substitute::counter_
ClosureCounter & counter_
Definition: ClosureCounter.h:94
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ClosureCounter::Substitute::Substitute
Substitute(ClosureCounter &counter, Closure &&closure)
Definition: ClosureCounter.h:118
std::remove_reference_t< Closure >
std
STL namespace.
condition_variable
ripple::ClosureCounter::Substitute::closure_
std::remove_reference_t< Closure > closure_
Definition: ClosureCounter.h:95
ripple::ClosureCounter::join
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
Definition: ClosureCounter.h:166
optional
ripple::ClosureCounter::Substitute::Substitute
Substitute(Substitute const &rhs)
Definition: ClosureCounter.h:105
mutex
ripple::ClosureCounter::count
int count() const
Current number of Closures outstanding.
Definition: ClosureCounter.h:205
ripple::ClosureCounter::closureCount_
std::atomic< int > closureCount_
Definition: ClosureCounter.h:60
type_traits
std::condition_variable::notify_all
T notify_all(T... args)
ripple::ClosureCounter::joined
bool joined() const
Returns true if this has been joined.
Definition: ClosureCounter.h:217
ripple::ClosureCounter::wrap
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Definition: ClosureCounter.h:192