rippled
OrderBookDB.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/ledger/LedgerMaster.h>
21 #include <ripple/app/ledger/OrderBookDB.h>
22 #include <ripple/app/main/Application.h>
23 #include <ripple/app/misc/NetworkOPs.h>
24 #include <ripple/basics/Log.h>
25 #include <ripple/core/Config.h>
26 #include <ripple/core/JobQueue.h>
27 #include <ripple/protocol/Indexes.h>
28 
29 namespace ripple {
30 
32  : app_(app), seq_(0), j_(app.journal("OrderBookDB"))
33 {
34 }
35 
36 void
38 {
40  {
41  JLOG(j_.warn()) << "Eliding full order book update: no ledger";
42  return;
43  }
44 
45  auto seq = seq_.load();
46 
47  if (seq != 0)
48  {
49  if ((seq > ledger->seq()) && ((ledger->seq() - seq) < 25600))
50  return;
51 
52  if ((ledger->seq() <= seq) && ((seq - ledger->seq()) < 16))
53  return;
54  }
55 
56  if (seq_.exchange(ledger->seq()) != seq)
57  return;
58 
59  JLOG(j_.debug()) << "Full order book update: " << seq << " to "
60  << ledger->seq();
61 
62  if (app_.config().PATH_SEARCH_MAX != 0)
63  {
64  if (app_.config().standalone())
65  update(ledger);
66  else
69  "OrderBookDB::update: " + std::to_string(ledger->seq()),
70  [this, ledger]() { update(ledger); });
71  }
72 }
73 
74 void
76 {
77  if (app_.config().PATH_SEARCH_MAX == 0)
78  return; // pathfinding has been disabled
79 
80  // A newer full update job is pending
81  if (auto const seq = seq_.load(); seq > ledger->seq())
82  {
83  JLOG(j_.debug()) << "Eliding update for " << ledger->seq()
84  << " because of pending update to later " << seq;
85  return;
86  }
87 
88  decltype(allBooks_) allBooks;
89  decltype(xrpBooks_) xrpBooks;
90 
91  allBooks.reserve(allBooks_.size());
92  xrpBooks.reserve(xrpBooks_.size());
93 
94  JLOG(j_.debug()) << "Beginning update (" << ledger->seq() << ")";
95 
96  // walk through the entire ledger looking for orderbook entries
97  int cnt = 0;
98 
99  try
100  {
101  for (auto& sle : ledger->sles)
102  {
103  if (app_.isStopping())
104  {
105  JLOG(j_.info())
106  << "Update halted because the process is stopping";
107  seq_.store(0);
108  return;
109  }
110 
111  if (sle->getType() == ltDIR_NODE &&
112  sle->isFieldPresent(sfExchangeRate) &&
113  sle->getFieldH256(sfRootIndex) == sle->key())
114  {
115  Book book;
116 
117  book.in.currency = sle->getFieldH160(sfTakerPaysCurrency);
118  book.in.account = sle->getFieldH160(sfTakerPaysIssuer);
119  book.out.currency = sle->getFieldH160(sfTakerGetsCurrency);
120  book.out.account = sle->getFieldH160(sfTakerGetsIssuer);
121 
122  allBooks[book.in].insert(book.out);
123 
124  if (isXRP(book.out))
125  xrpBooks.insert(book.in);
126 
127  ++cnt;
128  }
129  }
130  }
131  catch (SHAMapMissingNode const& mn)
132  {
133  JLOG(j_.info()) << "Missing node in " << ledger->seq()
134  << " during update: " << mn.what();
135  seq_.store(0);
136  return;
137  }
138 
139  JLOG(j_.debug()) << "Update completed (" << ledger->seq() << "): " << cnt
140  << " books found";
141 
142  {
144  allBooks_.swap(allBooks);
145  xrpBooks_.swap(xrpBooks);
146  }
147 
149 }
150 
151 void
153 {
154  bool toXRP = isXRP(book.out);
155 
157 
158  allBooks_[book.in].insert(book.out);
159 
160  if (toXRP)
161  xrpBooks_.insert(book.in);
162 }
163 
164 // return list of all orderbooks that want this issuerID and currencyID
167 {
168  std::vector<Book> ret;
169 
170  {
172 
173  if (auto it = allBooks_.find(issue); it != allBooks_.end())
174  {
175  ret.reserve(it->second.size());
176 
177  for (auto const& gets : it->second)
178  ret.push_back(Book(issue, gets));
179  }
180  }
181 
182  return ret;
183 }
184 
185 int
187 {
189  if (auto it = allBooks_.find(issue); it != allBooks_.end())
190  return static_cast<int>(it->second.size());
191  return 0;
192 }
193 
194 bool
196 {
198  return xrpBooks_.count(issue) > 0;
199 }
200 
203 {
205  auto ret = getBookListeners(book);
206 
207  if (!ret)
208  {
209  ret = std::make_shared<BookListeners>();
210 
211  mListeners[book] = ret;
212  assert(getBookListeners(book) == ret);
213  }
214 
215  return ret;
216 }
217 
220 {
223 
224  auto it0 = mListeners.find(book);
225  if (it0 != mListeners.end())
226  ret = it0->second;
227 
228  return ret;
229 }
230 
231 // Based on the meta, send the meta to the streams that are listening.
232 // We need to determine which streams a given meta effects.
233 void
235  std::shared_ptr<ReadView const> const& ledger,
236  const AcceptedLedgerTx& alTx,
237  Json::Value const& jvObj)
238 {
240 
241  // For this particular transaction, maintain the set of unique
242  // subscriptions that have already published it. This prevents sending
243  // the transaction multiple times if it touches multiple ltOFFER
244  // entries for the same book, or if it touches multiple books and a
245  // single client has subscribed to those books.
246  hash_set<std::uint64_t> havePublished;
247 
248  for (auto const& node : alTx.getMeta().getNodes())
249  {
250  try
251  {
252  if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
253  {
254  auto process = [&, this](SField const& field) {
255  if (auto data = dynamic_cast<STObject const*>(
256  node.peekAtPField(field));
257  data && data->isFieldPresent(sfTakerPays) &&
258  data->isFieldPresent(sfTakerGets))
259  {
260  auto listeners = getBookListeners(
261  {data->getFieldAmount(sfTakerGets).issue(),
262  data->getFieldAmount(sfTakerPays).issue()});
263  if (listeners)
264  listeners->publish(jvObj, havePublished);
265  }
266  };
267 
268  // We need a field that contains the TakerGets and TakerPays
269  // parameters.
270  if (node.getFName() == sfModifiedNode)
271  process(sfPreviousFields);
272  else if (node.getFName() == sfCreatedNode)
273  process(sfNewFields);
274  else if (node.getFName() == sfDeletedNode)
275  process(sfFinalFields);
276  }
277  }
278  catch (std::exception const& ex)
279  {
280  JLOG(j_.info())
281  << "processTxn: field not found (" << ex.what() << ")";
282  }
283  }
284 }
285 
286 } // namespace ripple
ripple::Application
Definition: Application.h:115
ripple::sfRootIndex
const SF_UINT256 sfRootIndex
ripple::Issue
A currency issued by an account.
Definition: Issue.h:34
std::shared_ptr
STL class.
std::exception
STL class.
ripple::Book::out
Issue out
Definition: Book.h:37
std::unordered_set
STL class.
ripple::OrderBookDB::j_
const beast::Journal j_
Definition: OrderBookDB.h:85
ripple::OrderBookDB::mLock
std::recursive_mutex mLock
Definition: OrderBookDB.h:77
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::unordered_map::find
T find(T... args)
ripple::sfTakerPaysCurrency
const SF_UINT160 sfTakerPaysCurrency
ripple::Issue::currency
Currency currency
Definition: Issue.h:37
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
ripple::sfFinalFields
const SField sfFinalFields
ripple::OrderBookDB::seq_
std::atomic< std::uint32_t > seq_
Definition: OrderBookDB.h:83
ripple::Application::isStopping
virtual bool isStopping() const =0
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::jtUPDATE_PF
@ jtUPDATE_PF
Definition: Job.h:57
ripple::OrderBookDB::update
void update(std::shared_ptr< ReadView const > const &ledger)
Definition: OrderBookDB.cpp:75
ripple::sfTakerGetsCurrency
const SF_UINT160 sfTakerGetsCurrency
ripple::sfDeletedNode
const SField sfDeletedNode
ripple::OrderBookDB::xrpBooks_
hash_set< Issue > xrpBooks_
Definition: OrderBookDB.h:75
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::Config::PATH_SEARCH_MAX
int PATH_SEARCH_MAX
Definition: Config.h:203
ripple::ltDIR_NODE
@ ltDIR_NODE
A ledger object which contains a list of object identifiers.
Definition: LedgerFormats.h:66
ripple::SHAMapMissingNode
Definition: SHAMapMissingNode.h:55
std::vector::push_back
T push_back(T... args)
ripple::OrderBookDB::isBookToXRP
bool isBookToXRP(Issue const &)
Definition: OrderBookDB.cpp:195
ripple::sfTakerGetsIssuer
const SF_UINT160 sfTakerGetsIssuer
ripple::sfTakerPays
const SF_AMOUNT sfTakerPays
ripple::ltOFFER
@ ltOFFER
A ledger object which describes an offer on the DEX.
Definition: LedgerFormats.h:92
ripple::OrderBookDB::getBookSize
int getBookSize(Issue const &)
Definition: OrderBookDB.cpp:186
ripple::OrderBookDB::processTxn
void processTxn(std::shared_ptr< ReadView const > const &ledger, const AcceptedLedgerTx &alTx, Json::Value const &jvObj)
Definition: OrderBookDB.cpp:234
ripple::NetworkOPs::isNeedNetworkLedger
virtual bool isNeedNetworkLedger()=0
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
std::atomic::load
T load(T... args)
ripple::sfNewFields
const SField sfNewFields
ripple::Application::config
virtual Config & config()=0
ripple::Config::standalone
bool standalone() const
Definition: Config.h:332
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::AcceptedLedgerTx
A transaction that is in a closed ledger.
Definition: AcceptedLedgerTx.h:43
ripple::sfModifiedNode
const SField sfModifiedNode
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::sfTakerGets
const SF_AMOUNT sfTakerGets
ripple::OrderBookDB::setup
void setup(std::shared_ptr< ReadView const > const &ledger)
Definition: OrderBookDB.cpp:37
ripple::isXRP
bool isXRP(AccountID const &c)
Definition: AccountID.h:89
ripple::sfExchangeRate
const SF_UINT64 sfExchangeRate
ripple::OrderBookDB::makeBookListeners
BookListeners::pointer makeBookListeners(Book const &)
Definition: OrderBookDB.cpp:202
ripple::LedgerMaster::newOrderBookDB
bool newOrderBookDB()
Definition: LedgerMaster.cpp:1614
ripple::OrderBookDB::allBooks_
hardened_hash_map< Issue, hardened_hash_set< Issue > > allBooks_
Definition: OrderBookDB.h:72
ripple::OrderBookDB::mListeners
BookToListenersMap mListeners
Definition: OrderBookDB.h:81
ripple::sfPreviousFields
const SField sfPreviousFields
ripple::OrderBookDB::getBookListeners
BookListeners::pointer getBookListeners(Book const &)
Definition: OrderBookDB.cpp:219
ripple::STObject
Definition: STObject.h:51
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::atomic::exchange
T exchange(T... args)
ripple::TxMeta::getNodes
STArray & getNodes()
Definition: TxMeta.h:100
ripple::sfLedgerEntryType
const SF_UINT16 sfLedgerEntryType
ripple::SField
Identifies fields.
Definition: SField.h:112
ripple::AcceptedLedgerTx::getMeta
TxMeta const & getMeta() const
Definition: AcceptedLedgerTx.h:57
ripple::sfTakerPaysIssuer
const SF_UINT160 sfTakerPaysIssuer
ripple::sfCreatedNode
const SField sfCreatedNode
ripple::OrderBookDB::OrderBookDB
OrderBookDB(Application &app)
Definition: OrderBookDB.cpp:31
std::atomic::store
T store(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::OrderBookDB::app_
Application & app_
Definition: OrderBookDB.h:69
ripple::Book
Specifies an order book.
Definition: Book.h:33
std::unordered_map::end
T end(T... args)
ripple::OrderBookDB::getBooksByTakerPays
std::vector< Book > getBooksByTakerPays(Issue const &)
Definition: OrderBookDB.cpp:166
ripple::Book::in
Issue in
Definition: Book.h:36
ripple::Issue::account
AccountID account
Definition: Issue.h:38
std::runtime_error::what
T what(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::OrderBookDB::addOrderBook
void addOrderBook(Book const &)
Definition: OrderBookDB.cpp:152