rippled
TransactionAcquire.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/ConsensusTransSetSF.h>
21 #include <ripple/app/ledger/InboundLedgers.h>
22 #include <ripple/app/ledger/InboundTransactions.h>
23 #include <ripple/app/ledger/impl/TransactionAcquire.h>
24 #include <ripple/app/main/Application.h>
25 #include <ripple/app/misc/NetworkOPs.h>
26 #include <ripple/overlay/Overlay.h>
27 #include <ripple/overlay/impl/ProtocolMessage.h>
28 
29 #include <memory>
30 
31 namespace ripple {
32 
33 using namespace std::chrono_literals;
34 
35 // Timeout interval in milliseconds
36 auto constexpr TX_ACQUIRE_TIMEOUT = 250ms;
37 
38 enum {
41 };
42 
44  Application& app,
45  uint256 const& hash,
48  app,
49  hash,
51  {jtTXN_DATA, "TransactionAcquire", {}},
52  app.journal("TransactionAcquire"))
53  , mHaveRoot(false)
54  , mPeerSet(std::move(peerSet))
55 {
56  mMap = std::make_shared<SHAMap>(
57  SHAMapType::TRANSACTION, hash, app_.getNodeFamily());
58  mMap->setUnbacked();
59 }
60 
61 void
63 {
64  // We hold a PeerSet lock and so cannot do real work here
65 
66  if (failed_)
67  {
68  JLOG(journal_.debug()) << "Failed to acquire TX set " << hash_;
69  }
70  else
71  {
72  JLOG(journal_.debug()) << "Acquired TX set " << hash_;
73  mMap->setImmutable();
74 
75  uint256 const& hash(hash_);
76  std::shared_ptr<SHAMap> const& map(mMap);
77  auto const pap = &app_;
78  // Note that, when we're in the process of shutting down, addJob()
79  // may reject the request. If that happens then giveSet() will
80  // not be called. That's fine. According to David the giveSet() call
81  // just updates the consensus and related structures when we acquire
82  // a transaction set. No need to update them if we're shutting down.
84  jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
85  pap->getInboundTransactions().giveSet(hash, map, true);
86  });
87  }
88 }
89 
90 void
92 {
93  if (timeouts_ > MAX_TIMEOUTS)
94  {
95  failed_ = true;
96  done();
97  return;
98  }
99 
100  if (timeouts_ >= NORM_TIMEOUTS)
101  trigger(nullptr);
102 
103  addPeers(1);
104 }
105 
108 {
109  return shared_from_this();
110 }
111 
112 void
114 {
115  if (complete_)
116  {
117  JLOG(journal_.info()) << "trigger after complete";
118  return;
119  }
120  if (failed_)
121  {
122  JLOG(journal_.info()) << "trigger after fail";
123  return;
124  }
125 
126  if (!mHaveRoot)
127  {
128  JLOG(journal_.trace()) << "TransactionAcquire::trigger "
129  << (peer ? "havePeer" : "noPeer") << " no root";
130  protocol::TMGetLedger tmGL;
131  tmGL.set_ledgerhash(hash_.begin(), hash_.size());
132  tmGL.set_itype(protocol::liTS_CANDIDATE);
133  tmGL.set_querydepth(3); // We probably need the whole thing
134 
135  if (timeouts_ != 0)
136  tmGL.set_querytype(protocol::qtINDIRECT);
137 
138  *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString();
139  mPeerSet->sendRequest(tmGL, peer);
140  }
141  else if (!mMap->isValid())
142  {
143  failed_ = true;
144  done();
145  }
146  else
147  {
149  auto nodes = mMap->getMissingNodes(256, &sf);
150 
151  if (nodes.empty())
152  {
153  if (mMap->isValid())
154  complete_ = true;
155  else
156  failed_ = true;
157 
158  done();
159  return;
160  }
161 
162  protocol::TMGetLedger tmGL;
163  tmGL.set_ledgerhash(hash_.begin(), hash_.size());
164  tmGL.set_itype(protocol::liTS_CANDIDATE);
165 
166  if (timeouts_ != 0)
167  tmGL.set_querytype(protocol::qtINDIRECT);
168 
169  for (auto const& node : nodes)
170  {
171  *tmGL.add_nodeids() = node.first.getRawString();
172  }
173  mPeerSet->sendRequest(tmGL, peer);
174  }
175 }
176 
180  std::shared_ptr<Peer> const& peer)
181 {
182  ScopedLockType sl(mtx_);
183 
184  if (complete_)
185  {
186  JLOG(journal_.trace()) << "TX set complete";
187  return SHAMapAddNode();
188  }
189 
190  if (failed_)
191  {
192  JLOG(journal_.trace()) << "TX set failed";
193  return SHAMapAddNode();
194  }
195 
196  try
197  {
198  if (data.empty())
199  return SHAMapAddNode::invalid();
200 
202 
203  for (auto const& d : data)
204  {
205  if (d.first.isRoot())
206  {
207  if (mHaveRoot)
208  JLOG(journal_.debug())
209  << "Got root TXS node, already have it";
210  else if (!mMap->addRootNode(
211  SHAMapHash{hash_}, d.second, nullptr)
212  .isGood())
213  {
214  JLOG(journal_.warn()) << "TX acquire got bad root node";
215  }
216  else
217  mHaveRoot = true;
218  }
219  else if (!mMap->addKnownNode(d.first, d.second, &sf).isGood())
220  {
221  JLOG(journal_.warn()) << "TX acquire got bad non-root node";
222  return SHAMapAddNode::invalid();
223  }
224  }
225 
226  trigger(peer);
227  progress_ = true;
228  return SHAMapAddNode::useful();
229  }
230  catch (std::exception const& ex)
231  {
232  JLOG(journal_.error())
233  << "Peer " << peer->id()
234  << " sent us junky transaction node data: " << ex.what();
235  return SHAMapAddNode::invalid();
236  }
237 }
238 
239 void
241 {
242  mPeerSet->addPeers(
243  limit,
244  [this](auto peer) { return peer->hasTxSet(hash_); },
245  [this](auto peer) { trigger(peer); });
246 }
247 
248 void
250 {
251  ScopedLockType sl(mtx_);
252 
253  addPeers(numPeers);
254 
255  setTimer(sl);
256 }
257 
258 void
260 {
261  ScopedLockType sl(mtx_);
262 
263  if (timeouts_ > NORM_TIMEOUTS)
265 }
266 
267 } // namespace ripple
ripple::TransactionAcquire::mHaveRoot
bool mHaveRoot
Definition: TransactionAcquire.h:58
ripple::Application
Definition: Application.h:115
ripple::SHAMapAddNode
Definition: SHAMapAddNode.h:28
ripple::TX_ACQUIRE_TIMEOUT
constexpr auto TX_ACQUIRE_TIMEOUT
Definition: TransactionAcquire.cpp:36
ripple::Application::getTempNodeCache
virtual NodeCache & getTempNodeCache()=0
std::shared_ptr
STL class.
std::exception
STL class.
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::TimeoutCounter::setTimer
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
Definition: TimeoutCounter.cpp:50
std::pair
ripple::SHAMapType::TRANSACTION
@ TRANSACTION
ripple::TransactionAcquire::trigger
void trigger(std::shared_ptr< Peer > const &)
Definition: TransactionAcquire.cpp:113
std::vector
STL class.
ripple::TransactionAcquire::pmDowncast
std::weak_ptr< TimeoutCounter > pmDowncast() override
Return a weak pointer to this.
Definition: TransactionAcquire.cpp:107
ripple::TransactionAcquire::addPeers
void addPeers(std::size_t limit)
Definition: TransactionAcquire.cpp:240
ripple::TimeoutCounter::progress_
bool progress_
Whether forward progress has been made.
Definition: TimeoutCounter.h:134
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::SHAMapNodeID
Identifies a node inside a SHAMap.
Definition: SHAMapNodeID.h:33
ripple::TimeoutCounter
This class is an "active" object.
Definition: TimeoutCounter.h:66
ripple::SHAMapHash
Definition: SHAMapHash.h:32
ripple::TimeoutCounter::mtx_
std::recursive_mutex mtx_
Definition: TimeoutCounter.h:125
ripple::base_uint::size
constexpr static std::size_t size()
Definition: base_uint.h:519
ripple::ConsensusTransSetSF
Definition: ConsensusTransSetSF.h:34
ripple::SHAMapAddNode::useful
static SHAMapAddNode useful()
Definition: SHAMapAddNode.h:144
ripple::jtTXN_DATA
@ jtTXN_DATA
Definition: Job.h:70
ripple::TransactionAcquire::mMap
std::shared_ptr< SHAMap > mMap
Definition: TransactionAcquire.h:57
ripple::base_uint< 256 >
std::enable_shared_from_this< TransactionAcquire >::shared_from_this
T shared_from_this(T... args)
ripple::TimeoutCounter::app_
Application & app_
Definition: TimeoutCounter.h:123
ripple::TimeoutCounter::failed_
bool failed_
Definition: TimeoutCounter.h:132
ripple::SHAMapAddNode::invalid
static SHAMapAddNode invalid()
Definition: SHAMapAddNode.h:150
ripple::TransactionAcquire::onTimer
void onTimer(bool progress, ScopedLockType &peerSetLock) override
Hook called from invokeOnTimer().
Definition: TransactionAcquire.cpp:91
std::unique_lock< std::recursive_mutex >
ripple::MAX_TIMEOUTS
@ MAX_TIMEOUTS
Definition: TransactionAcquire.cpp:40
ripple::SHAMapNodeID::getRawString
std::string getRawString() const
Definition: SHAMapNodeID.cpp:65
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::TransactionAcquire::TransactionAcquire
TransactionAcquire(Application &app, uint256 const &hash, std::unique_ptr< PeerSet > peerSet)
Definition: TransactionAcquire.cpp:43
ripple::TransactionAcquire::init
void init(int startPeers)
Definition: TransactionAcquire.cpp:249
memory
std::weak_ptr
STL class.
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::TimeoutCounter::hash_
const uint256 hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
Definition: TimeoutCounter.h:129
ripple::base_uint::begin
iterator begin()
Definition: base_uint.h:133
ripple::TransactionAcquire::stillNeed
void stillNeed()
Definition: TransactionAcquire.cpp:259
ripple::TransactionAcquire::takeNodes
SHAMapAddNode takeNodes(std::vector< std::pair< SHAMapNodeID, Slice >> const &data, std::shared_ptr< Peer > const &)
Definition: TransactionAcquire.cpp:178
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::TimeoutCounter::complete_
bool complete_
Definition: TimeoutCounter.h:131
ripple::TimeoutCounter::timeouts_
int timeouts_
Definition: TimeoutCounter.h:130
std::unique_ptr
STL class.
ripple::TransactionAcquire::mPeerSet
std::unique_ptr< PeerSet > mPeerSet
Definition: TransactionAcquire.h:59
ripple::TimeoutCounter::journal_
beast::Journal journal_
Definition: TimeoutCounter.h:124
std::exception::what
T what(T... args)
ripple::NORM_TIMEOUTS
@ NORM_TIMEOUTS
Definition: TransactionAcquire.cpp:39
ripple::TransactionAcquire::done
void done()
Definition: TransactionAcquire.cpp:62