rippled
PathRequests.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/LedgerMaster.h>
21 #include <ripple/app/main/Application.h>
22 #include <ripple/app/paths/PathRequests.h>
23 #include <ripple/basics/Log.h>
24 #include <ripple/core/JobQueue.h>
25 #include <ripple/net/RPCErr.h>
26 #include <ripple/protocol/ErrorCodes.h>
27 #include <ripple/protocol/jss.h>
28 #include <ripple/resource/Fees.h>
29 #include <algorithm>
30 
31 namespace ripple {
32 
38  std::shared_ptr<ReadView const> const& ledger,
39  bool authoritative)
40 {
42 
43  auto lineCache = lineCache_.lock();
44 
45  std::uint32_t const lineSeq = lineCache ? lineCache->getLedger()->seq() : 0;
46  std::uint32_t const lgrSeq = ledger->seq();
47  JLOG(mJournal.debug()) << "getLineCache has cache for " << lineSeq
48  << ", considering " << lgrSeq;
49 
50  if ((lineSeq == 0) || // no ledger
51  (authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
52  (authoritative &&
53  ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
54  (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
55  {
56  JLOG(mJournal.debug())
57  << "getLineCache creating new cache for " << lgrSeq;
58  // Assign to the local before the member, because the member is a
59  // weak_ptr, and will immediately discard it if there are no other
60  // references.
61  lineCache_ = lineCache = std::make_shared<RippleLineCache>(
62  ledger, app_.journal("RippleLineCache"));
63  }
64  return lineCache;
65 }
66 
67 void
69 {
70  auto event =
71  app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll");
72 
75 
76  // Get the ledger and cache we should be using
77  {
79  requests = requests_;
80  cache = getLineCache(inLedger, true);
81  }
82 
83  bool newRequests = app_.getLedgerMaster().isNewPathRequest();
84  bool mustBreak = false;
85 
86  JLOG(mJournal.trace()) << "updateAll seq=" << cache->getLedger()->seq()
87  << ", " << requests.size() << " requests";
88 
89  int processed = 0, removed = 0;
90 
91  auto getSubscriber =
92  [](PathRequest::pointer const& request) -> InfoSub::pointer {
93  if (auto ipSub = request->getSubscriber();
94  ipSub && ipSub->getRequest() == request)
95  {
96  return ipSub;
97  }
98  request->doAborting();
99  return nullptr;
100  };
101 
102  do
103  {
104  JLOG(mJournal.trace()) << "updateAll looping";
105  for (auto const& wr : requests)
106  {
107  if (app_.getJobQueue().isStopping())
108  break;
109 
110  auto request = wr.lock();
111  bool remove = true;
112  JLOG(mJournal.trace())
113  << "updateAll request " << (request ? "" : "not ") << "found";
114 
115  if (request)
116  {
117  auto continueCallback = [&getSubscriber, &request]() {
118  // This callback is used by doUpdate to determine whether to
119  // continue working. If getSubscriber returns null, that
120  // indicates that this request is no longer relevant.
121  return (bool)getSubscriber(request);
122  };
123  if (!request->needsUpdate(
124  newRequests, cache->getLedger()->seq()))
125  remove = false;
126  else
127  {
128  if (auto ipSub = getSubscriber(request))
129  {
130  if (!ipSub->getConsumer().warn())
131  {
132  // Release the shared ptr to the subscriber so that
133  // it can be freed if the client disconnects, and
134  // thus fail to lock later.
135  ipSub.reset();
136  Json::Value update = request->doUpdate(
137  cache, false, continueCallback);
138  request->updateComplete();
139  update[jss::type] = "path_find";
140  if ((ipSub = getSubscriber(request)))
141  {
142  ipSub->send(update, false);
143  remove = false;
144  ++processed;
145  }
146  }
147  }
148  else if (request->hasCompletion())
149  {
150  // One-shot request with completion function
151  request->doUpdate(cache, false);
152  request->updateComplete();
153  ++processed;
154  }
155  }
156  }
157 
158  if (remove)
159  {
161 
162  // Remove any dangling weak pointers or weak
163  // pointers that refer to this path request.
164  auto ret = std::remove_if(
165  requests_.begin(),
166  requests_.end(),
167  [&removed, &request](auto const& wl) {
168  auto r = wl.lock();
169 
170  if (r && r != request)
171  return false;
172  ++removed;
173  return true;
174  });
175 
176  requests_.erase(ret, requests_.end());
177  }
178 
179  mustBreak =
180  !newRequests && app_.getLedgerMaster().isNewPathRequest();
181 
182  // We weren't handling new requests and then
183  // there was a new request
184  if (mustBreak)
185  break;
186  }
187 
188  if (mustBreak)
189  { // a new request came in while we were working
190  newRequests = true;
191  }
192  else if (newRequests)
193  { // we only did new requests, so we always need a last pass
194  newRequests = app_.getLedgerMaster().isNewPathRequest();
195  }
196  else
197  { // if there are no new requests, we are done
198  newRequests = app_.getLedgerMaster().isNewPathRequest();
199  if (!newRequests)
200  break;
201  }
202 
203  {
204  // Get the latest requests, cache, and ledger for next pass
206 
207  if (requests_.empty())
208  break;
209  requests = requests_;
210  cache = getLineCache(cache->getLedger(), false);
211  }
212  } while (!app_.getJobQueue().isStopping());
213 
214  JLOG(mJournal.debug()) << "updateAll complete: " << processed
215  << " processed and " << removed << " removed";
216 }
217 
218 bool
220 {
222  return !requests_.empty();
223 }
224 
225 void
227 {
229 
230  // Insert after any older unserviced requests but before
231  // any serviced requests
232  auto ret =
233  std::find_if(requests_.begin(), requests_.end(), [](auto const& wl) {
234  auto r = wl.lock();
235 
236  // We come before handled requests
237  return r && !r->isNew();
238  });
239 
240  requests_.emplace(ret, req);
241 }
242 
243 // Make a new-style path_find request
246  std::shared_ptr<InfoSub> const& subscriber,
247  std::shared_ptr<ReadView const> const& inLedger,
248  Json::Value const& requestJson)
249 {
250  auto req = std::make_shared<PathRequest>(
251  app_, subscriber, ++mLastIdentifier, *this, mJournal);
252 
253  auto [valid, jvRes] =
254  req->doCreate(getLineCache(inLedger, false), requestJson);
255 
256  if (valid)
257  {
258  subscriber->setRequest(req);
259  insertPathRequest(req);
261  }
262  return std::move(jvRes);
263 }
264 
265 // Make an old-style ripple_path_find request
269  std::function<void(void)> completion,
270  Resource::Consumer& consumer,
271  std::shared_ptr<ReadView const> const& inLedger,
272  Json::Value const& request)
273 {
274  // This assignment must take place before the
275  // completion function is called
276  req = std::make_shared<PathRequest>(
277  app_, completion, consumer, ++mLastIdentifier, *this, mJournal);
278 
279  auto [valid, jvRes] = req->doCreate(getLineCache(inLedger, false), request);
280 
281  if (!valid)
282  {
283  req.reset();
284  }
285  else
286  {
287  insertPathRequest(req);
289  {
290  // The newPathRequest failed. Tell the caller.
291  jvRes = rpcError(rpcTOO_BUSY);
292  req.reset();
293  }
294  }
295 
296  return std::move(jvRes);
297 }
298 
301  Resource::Consumer& consumer,
302  std::shared_ptr<ReadView const> const& inLedger,
303  Json::Value const& request)
304 {
305  auto cache = std::make_shared<RippleLineCache>(
306  inLedger, app_.journal("RippleLineCache"));
307 
308  auto req = std::make_shared<PathRequest>(
309  app_, [] {}, consumer, ++mLastIdentifier, *this, mJournal);
310 
311  auto [valid, jvRes] = req->doCreate(cache, request);
312  if (valid)
313  jvRes = req->doUpdate(cache, false);
314  return std::move(jvRes);
315 }
316 
317 } // namespace ripple
ripple::PathRequests::doLegacyPathRequest
Json::Value doLegacyPathRequest(Resource::Consumer &consumer, std::shared_ptr< ReadView const > const &inLedger, Json::Value const &request)
Definition: PathRequests.cpp:300
std::shared_ptr
STL class.
ripple::PathRequests::makePathRequest
Json::Value makePathRequest(std::shared_ptr< InfoSub > const &subscriber, std::shared_ptr< ReadView const > const &ledger, Json::Value const &request)
Definition: PathRequests.cpp:245
ripple::rpcError
Json::Value rpcError(int iError)
Definition: RPCErr.cpp:29
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::jtPATH_FIND
@ jtPATH_FIND
Definition: Job.h:85
std::vector
STL class.
std::find_if
T find_if(T... args)
std::vector::size
T size(T... args)
ripple::PathRequests::makeLegacyPathRequest
Json::Value makeLegacyPathRequest(PathRequest::pointer &req, std::function< void(void)> completion, Resource::Consumer &consumer, std::shared_ptr< ReadView const > const &inLedger, Json::Value const &request)
Definition: PathRequests.cpp:267
std::lock_guard
STL class.
ripple::rpcTOO_BUSY
@ rpcTOO_BUSY
Definition: ErrorCodes.h:56
ripple::LedgerMaster::isNewPathRequest
bool isNewPathRequest()
Definition: LedgerMaster.cpp:1603
std::function
std::shared_ptr::reset
T reset(T... args)
algorithm
ripple::PathRequests::requestsPending
bool requestsPending() const
Definition: PathRequests.cpp:219
ripple::PathRequests::mLastIdentifier
std::atomic< int > mLastIdentifier
Definition: PathRequests.h:117
ripple::PathRequests::app_
Application & app_
Definition: PathRequests.h:105
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::PathRequests::insertPathRequest
void insertPathRequest(PathRequest::pointer const &)
Definition: PathRequests.cpp:226
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::JobQueue::isStopping
bool isStopping() const
Definition: JobQueue.h:230
ripple::PathRequests::getLineCache
std::shared_ptr< RippleLineCache > getLineCache(std::shared_ptr< ReadView const > const &ledger, bool authoritative)
Get the current RippleLineCache, updating it if necessary.
Definition: PathRequests.cpp:37
ripple::LedgerMaster::newPathRequest
bool newPathRequest()
Definition: LedgerMaster.cpp:1595
ripple::PathRequests::mJournal
beast::Journal mJournal
Definition: PathRequests.h:106
std::uint32_t
std::remove_if
T remove_if(T... args)
ripple::PathRequests::mLock
std::recursive_mutex mLock
Definition: PathRequests.h:119
ripple::PathRequests::lineCache_
std::weak_ptr< RippleLineCache > lineCache_
Definition: PathRequests.h:115
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Application::journal
virtual beast::Journal journal(std::string const &name)=0
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::PathRequests::updateAll
void updateAll(std::shared_ptr< ReadView const > const &ledger)
Update all of the contained PathRequest instances.
Definition: PathRequests.cpp:68
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:165
ripple::PathRequests::requests_
std::vector< PathRequest::wptr > requests_
Definition: PathRequests.h:112
Json::Value
Represents a JSON value.
Definition: json_value.h:145