rippled
RocksDBFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/unity/rocksdb.h>
21 
22 #if RIPPLE_ROCKSDB_AVAILABLE
23 
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/contract.h>
26 #include <ripple/basics/safe_cast.h>
27 #include <ripple/beast/core/CurrentThreadName.h>
28 #include <ripple/core/Config.h> // VFALCO Bad dependency
29 #include <ripple/nodestore/Factory.h>
30 #include <ripple/nodestore/Manager.h>
31 #include <ripple/nodestore/impl/BatchWriter.h>
32 #include <ripple/nodestore/impl/DecodedBlob.h>
33 #include <ripple/nodestore/impl/EncodedBlob.h>
34 
35 #include <atomic>
36 #include <memory>
37 
38 namespace ripple {
39 namespace NodeStore {
40 
41 class RocksDBEnv : public rocksdb::EnvWrapper
42 {
43 public:
44  RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
45  {
46  }
47 
48  struct ThreadParams
49  {
50  ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
51  {
52  }
53 
54  void (*f)(void*);
55  void* a;
56  };
57 
58  static void
59  thread_entry(void* ptr)
60  {
61  ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
62  void (*f)(void*) = p->f;
63  void* a(p->a);
64  delete p;
65 
66  static std::atomic<std::size_t> n;
67  std::size_t const id(++n);
69  ss << "rocksdb #" << id;
71 
72  (*f)(a);
73  }
74 
75  void
76  StartThread(void (*f)(void*), void* a) override
77  {
78  ThreadParams* const p(new ThreadParams(f, a));
79  EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
80  }
81 };
82 
83 //------------------------------------------------------------------------------
84 
85 class RocksDBBackend : public Backend, public BatchWriter::Callback
86 {
87 private:
88  std::atomic<bool> m_deletePath;
89 
90 public:
91  beast::Journal m_journal;
92  size_t const m_keyBytes;
93  BatchWriter m_batch;
94  std::string m_name;
96  int fdRequired_ = 2048;
97  rocksdb::Options m_options;
98 
99  RocksDBBackend(
100  int keyBytes,
101  Section const& keyValues,
102  Scheduler& scheduler,
103  beast::Journal journal,
104  RocksDBEnv* env)
105  : m_deletePath(false)
106  , m_journal(journal)
107  , m_keyBytes(keyBytes)
108  , m_batch(*this, scheduler)
109  {
110  if (!get_if_exists(keyValues, "path", m_name))
111  Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
112 
113  rocksdb::BlockBasedTableOptions table_options;
114  m_options.env = env;
115 
116  bool hard_set =
117  keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
118 
119  if (keyValues.exists("cache_mb"))
120  {
121  auto size = get<int>(keyValues, "cache_mb");
122 
123  if (!hard_set && size == 256)
124  size = 1024;
125 
126  table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
127  }
128 
129  if (auto const v = get<int>(keyValues, "filter_bits"))
130  {
131  bool const filter_blocks = !keyValues.exists("filter_full") ||
132  (get<int>(keyValues, "filter_full") == 0);
133  table_options.filter_policy.reset(
134  rocksdb::NewBloomFilterPolicy(v, filter_blocks));
135  }
136 
137  if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
138  {
139  if (!hard_set && m_options.max_open_files == 2000)
140  m_options.max_open_files = 8000;
141 
142  fdRequired_ = m_options.max_open_files + 128;
143  }
144 
145  if (keyValues.exists("file_size_mb"))
146  {
147  auto file_size_mb = get<int>(keyValues, "file_size_mb");
148 
149  if (!hard_set && file_size_mb == 8)
150  file_size_mb = 256;
151 
152  m_options.target_file_size_base = megabytes(file_size_mb);
153  m_options.max_bytes_for_level_base =
154  5 * m_options.target_file_size_base;
155  m_options.write_buffer_size = 2 * m_options.target_file_size_base;
156  }
157 
159  keyValues, "file_size_mult", m_options.target_file_size_multiplier);
160 
161  if (keyValues.exists("bg_threads"))
162  {
163  m_options.env->SetBackgroundThreads(
164  get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
165  }
166 
167  if (keyValues.exists("high_threads"))
168  {
169  auto const highThreads = get<int>(keyValues, "high_threads");
170  m_options.env->SetBackgroundThreads(
171  highThreads, rocksdb::Env::HIGH);
172 
173  // If we have high-priority threads, presumably we want to
174  // use them for background flushes
175  if (highThreads > 0)
176  m_options.max_background_flushes = highThreads;
177  }
178 
179  m_options.compression = rocksdb::kSnappyCompression;
180 
181  get_if_exists(keyValues, "block_size", table_options.block_size);
182 
183  if (keyValues.exists("universal_compaction") &&
184  (get<int>(keyValues, "universal_compaction") != 0))
185  {
186  m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
187  m_options.min_write_buffer_number_to_merge = 2;
188  m_options.max_write_buffer_number = 6;
189  m_options.write_buffer_size = 6 * m_options.target_file_size_base;
190  }
191 
192  if (keyValues.exists("bbt_options"))
193  {
194  auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
195  table_options, get(keyValues, "bbt_options"), &table_options);
196  if (!s.ok())
197  Throw<std::runtime_error>(
198  std::string("Unable to set RocksDB bbt_options: ") +
199  s.ToString());
200  }
201 
202  m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
203 
204  if (keyValues.exists("options"))
205  {
206  auto const s = rocksdb::GetOptionsFromString(
207  m_options, get(keyValues, "options"), &m_options);
208  if (!s.ok())
209  Throw<std::runtime_error>(
210  std::string("Unable to set RocksDB options: ") +
211  s.ToString());
212  }
213 
214  std::string s1, s2;
215  rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
216  rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
217  JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
218  JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
219  }
220 
221  ~RocksDBBackend() override
222  {
223  close();
224  }
225 
226  void
227  open(bool createIfMissing) override
228  {
229  if (m_db)
230  {
231  assert(false);
232  JLOG(m_journal.error()) << "database is already open";
233  return;
234  }
235  rocksdb::DB* db = nullptr;
236  m_options.create_if_missing = createIfMissing;
237  rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
238  if (!status.ok() || !db)
239  Throw<std::runtime_error>(
240  std::string("Unable to open/create RocksDB: ") +
241  status.ToString());
242  m_db.reset(db);
243  }
244 
245  bool
246  isOpen() override
247  {
248  return static_cast<bool>(m_db);
249  }
250 
251  void
252  close() override
253  {
254  if (m_db)
255  {
256  m_db.reset();
257  if (m_deletePath)
258  {
259  boost::filesystem::path dir = m_name;
260  boost::filesystem::remove_all(dir);
261  }
262  }
263  }
264 
266  getName() override
267  {
268  return m_name;
269  }
270 
271  //--------------------------------------------------------------------------
272 
273  Status
274  fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
275  {
276  assert(m_db);
277  pObject->reset();
278 
279  Status status(ok);
280 
281  rocksdb::ReadOptions const options;
282  rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
283 
284  std::string string;
285 
286  rocksdb::Status getStatus = m_db->Get(options, slice, &string);
287 
288  if (getStatus.ok())
289  {
290  DecodedBlob decoded(key, string.data(), string.size());
291 
292  if (decoded.wasOk())
293  {
294  *pObject = decoded.createObject();
295  }
296  else
297  {
298  // Decoding failed, probably corrupted!
299  //
301  }
302  }
303  else
304  {
305  if (getStatus.IsCorruption())
306  {
308  }
309  else if (getStatus.IsNotFound())
310  {
311  status = notFound;
312  }
313  else
314  {
315  status =
316  Status(customCode + unsafe_cast<int>(getStatus.code()));
317 
318  JLOG(m_journal.error()) << getStatus.ToString();
319  }
320  }
321 
322  return status;
323  }
324 
326  fetchBatch(std::vector<uint256 const*> const& hashes) override
327  {
329  results.reserve(hashes.size());
330  for (auto const& h : hashes)
331  {
333  Status status = fetch(h->begin(), &nObj);
334  if (status != ok)
335  results.push_back({});
336  else
337  results.push_back(nObj);
338  }
339 
340  return {results, ok};
341  }
342 
343  void
344  store(std::shared_ptr<NodeObject> const& object) override
345  {
346  m_batch.store(object);
347  }
348 
349  void
350  storeBatch(Batch const& batch) override
351  {
352  assert(m_db);
353  rocksdb::WriteBatch wb;
354 
355  for (auto const& e : batch)
356  {
357  EncodedBlob encoded(e);
358 
359  wb.Put(
360  rocksdb::Slice(
361  reinterpret_cast<char const*>(encoded.getKey()),
362  m_keyBytes),
363  rocksdb::Slice(
364  reinterpret_cast<char const*>(encoded.getData()),
365  encoded.getSize()));
366  }
367 
368  rocksdb::WriteOptions const options;
369 
370  auto ret = m_db->Write(options, &wb);
371 
372  if (!ret.ok())
373  Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
374  }
375 
376  void
377  sync() override
378  {
379  }
380 
381  void
383  {
384  assert(m_db);
385  rocksdb::ReadOptions const options;
386 
387  std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
388 
389  for (it->SeekToFirst(); it->Valid(); it->Next())
390  {
391  if (it->key().size() == m_keyBytes)
392  {
393  DecodedBlob decoded(
394  it->key().data(), it->value().data(), it->value().size());
395 
396  if (decoded.wasOk())
397  {
398  f(decoded.createObject());
399  }
400  else
401  {
402  // Uh oh, corrupted data!
403  JLOG(m_journal.fatal())
404  << "Corrupt NodeObject #" << it->key().ToString(true);
405  }
406  }
407  else
408  {
409  // VFALCO NOTE What does it mean to find an
410  // incorrectly sized key? Corruption?
411  JLOG(m_journal.fatal())
412  << "Bad key size = " << it->key().size();
413  }
414  }
415  }
416 
417  int
418  getWriteLoad() override
419  {
420  return m_batch.getWriteLoad();
421  }
422 
423  void
424  setDeletePath() override
425  {
426  m_deletePath = true;
427  }
428 
429  //--------------------------------------------------------------------------
430 
431  void
432  writeBatch(Batch const& batch) override
433  {
434  storeBatch(batch);
435  }
436 
438  int
439  fdRequired() const override
440  {
441  return fdRequired_;
442  }
443 };
444 
445 //------------------------------------------------------------------------------
446 
447 class RocksDBFactory : public Factory
448 {
449 public:
450  RocksDBEnv m_env;
451 
452  RocksDBFactory()
453  {
454  Manager::instance().insert(*this);
455  }
456 
457  ~RocksDBFactory() override
458  {
459  Manager::instance().erase(*this);
460  }
461 
463  getName() const override
464  {
465  return "RocksDB";
466  }
467 
469  createInstance(
470  size_t keyBytes,
471  Section const& keyValues,
472  std::size_t,
473  Scheduler& scheduler,
474  beast::Journal journal) override
475  {
476  return std::make_unique<RocksDBBackend>(
477  keyBytes, keyValues, scheduler, journal, &m_env);
478  }
479 };
480 
481 static RocksDBFactory rocksDBFactory;
482 
483 } // namespace NodeStore
484 } // namespace ripple
485 
486 #endif
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
std::for_each
T for_each(T... args)
std::string
STL class.
std::shared_ptr< NodeObject >
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::pair
std::vector::reserve
T reserve(T... args)
std::vector
STL class.
std::size
T size(T... args)
std::stringstream
STL class.
std::function
std::unique_ptr::reset
T reset(T... args)
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:384
std::vector::push_back
T push_back(T... args)
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:55
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
ripple::NodeStore::customCode
@ customCode
Definition: nodestore/Types.h:51
ripple::megabytes
constexpr auto megabytes(T value) noexcept
Definition: ByteUtilities.h:34
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:120
std::unique_ptr
STL class.
std::data
T data(T... args)
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:98