rippled
import_test.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/contract.h>
21 #include <ripple/beast/clock/basic_seconds_clock.h>
22 #include <ripple/beast/core/LexicalCast.h>
23 #include <ripple/beast/rfc2616.h>
24 #include <ripple/beast/unit_test.h>
25 #include <ripple/nodestore/impl/codec.h>
26 #include <boost/beast/core/string.hpp>
27 #include <boost/regex.hpp>
28 #include <algorithm>
29 #include <chrono>
30 #include <iomanip>
31 #include <map>
32 #include <nudb/create.hpp>
33 #include <nudb/detail/format.hpp>
34 #include <nudb/xxhasher.hpp>
35 #include <sstream>
36 
37 #include <ripple/unity/rocksdb.h>
38 
39 /*
40 
41 Math:
42 
43 1000 gb dat file
44 170 gb key file
45 capacity 113 keys/bucket
46 
47 normal:
48 1,000gb data file read
49 19,210gb key file read (113 * 170)
50 19,210gb key file write
51 
52 multi(32gb):
53 6 passes (170/32)
54 6,000gb data file read
55 170gb key file write
56 
57 
58 */
59 
60 namespace ripple {
61 namespace NodeStore {
62 
63 namespace detail {
64 
66 {
69  std::ios::fmtflags flags_;
70  std::ios::char_type fill_;
71 
72 public:
74  {
76  os_.flags(flags_);
77  os_.fill(fill_);
78  }
79  save_stream_state(save_stream_state const&) = delete;
81  operator=(save_stream_state const&) = delete;
83  : os_(os)
84  , precision_(os.precision())
85  , flags_(os.flags())
86  , fill_(os.fill())
87  {
88  }
89 };
90 
91 template <class Rep, class Period>
94 {
95  save_stream_state _(os);
96  using namespace std::chrono;
97  if (d < microseconds{1})
98  {
99  // use nanoseconds
100  if (d < nanoseconds{100})
101  {
102  // use floating
103  using ns = duration<float, std::nano>;
104  os << std::fixed << std::setprecision(1) << ns(d).count();
105  }
106  else
107  {
108  // use integral
109  os << round<nanoseconds>(d).count();
110  }
111  os << "ns";
112  }
113  else if (d < milliseconds{1})
114  {
115  // use microseconds
116  if (d < microseconds{100})
117  {
118  // use floating
119  using ms = duration<float, std::micro>;
120  os << std::fixed << std::setprecision(1) << ms(d).count();
121  }
122  else
123  {
124  // use integral
125  os << round<microseconds>(d).count();
126  }
127  os << "us";
128  }
129  else if (d < seconds{1})
130  {
131  // use milliseconds
132  if (d < milliseconds{100})
133  {
134  // use floating
135  using ms = duration<float, std::milli>;
136  os << std::fixed << std::setprecision(1) << ms(d).count();
137  }
138  else
139  {
140  // use integral
141  os << round<milliseconds>(d).count();
142  }
143  os << "ms";
144  }
145  else if (d < minutes{1})
146  {
147  // use seconds
148  if (d < seconds{100})
149  {
150  // use floating
151  using s = duration<float>;
152  os << std::fixed << std::setprecision(1) << s(d).count();
153  }
154  else
155  {
156  // use integral
157  os << round<seconds>(d).count();
158  }
159  os << "s";
160  }
161  else
162  {
163  // use minutes
164  if (d < minutes{100})
165  {
166  // use floating
168  os << std::fixed << std::setprecision(1) << m(d).count();
169  }
170  else
171  {
172  // use integral
173  os << round<minutes>(d).count();
174  }
175  os << "min";
176  }
177  return os;
178 }
179 
180 template <class Period, class Rep>
181 inline std::string
183 {
185  pretty_time(ss, d);
186  return ss.str();
187 }
188 
189 } // namespace detail
190 
191 //------------------------------------------------------------------------------
192 
193 class progress
194 {
195 private:
197 
199  clock_type::time_point start_ = clock_type::now();
200  clock_type::time_point now_ = clock_type::now();
201  clock_type::time_point report_ = clock_type::now();
202  std::size_t prev_ = 0;
203  bool estimate_ = false;
204 
205 public:
206  explicit progress(std::size_t work) : work_(work)
207  {
208  }
209 
210  template <class Log>
211  void
212  operator()(Log& log, std::size_t work)
213  {
214  using namespace std::chrono;
215  auto const now = clock_type::now();
216  if (now == now_)
217  return;
218  now_ = now;
219  auto const elapsed = now - start_;
220  if (!estimate_)
221  {
222  if (elapsed < seconds(15))
223  return;
224  estimate_ = true;
225  }
226  else if (now - report_ < std::chrono::seconds(60))
227  {
228  return;
229  }
230  auto const rate = elapsed.count() / double(work);
231  clock_type::duration const remain(
232  static_cast<clock_type::duration::rep>((work_ - work) * rate));
233  log << "Remaining: " << detail::fmtdur(remain) << " (" << work << " of "
234  << work_ << " in " << detail::fmtdur(elapsed) << ", "
235  << (work - prev_) << " in " << detail::fmtdur(now - report_) << ")";
236  report_ = now;
237  prev_ = work;
238  }
239 
240  template <class Log>
241  void
242  finish(Log& log)
243  {
244  log << "Total time: " << detail::fmtdur(clock_type::now() - start_);
245  }
246 };
247 
250 {
251  // <key> '=' <value>
252  static boost::regex const re1(
253  "^" // start of line
254  "(?:\\s*)" // whitespace (optonal)
255  "([a-zA-Z][_a-zA-Z0-9]*)" // <key>
256  "(?:\\s*)" // whitespace (optional)
257  "(?:=)" // '='
258  "(?:\\s*)" // whitespace (optional)
259  "(.*\\S+)" // <value>
260  "(?:\\s*)" // whitespace (optional)
261  ,
262  boost::regex_constants::optimize);
264  auto const v = beast::rfc2616::split(s.begin(), s.end(), ',');
265  for (auto const& kv : v)
266  {
267  boost::smatch m;
268  if (!boost::regex_match(kv, m, re1))
269  Throw<std::runtime_error>("invalid parameter " + kv);
270  auto const result = map.emplace(m[1], m[2]);
271  if (!result.second)
272  Throw<std::runtime_error>("duplicate parameter " + m[1]);
273  }
274  return map;
275 }
276 
277 //------------------------------------------------------------------------------
278 
279 #if RIPPLE_ROCKSDB_AVAILABLE
280 
281 class import_test : public beast::unit_test::suite
282 {
283 public:
284  void
285  run() override
286  {
287  testcase(beast::unit_test::abort_on_fail) << arg();
288 
289  using namespace nudb;
290  using namespace nudb::detail;
291 
292  pass();
293  auto const args = parse_args(arg());
294  bool usage = args.empty();
295 
296  if (!usage && args.find("from") == args.end())
297  {
298  log << "Missing parameter: from";
299  usage = true;
300  }
301  if (!usage && args.find("to") == args.end())
302  {
303  log << "Missing parameter: to";
304  usage = true;
305  }
306  if (!usage && args.find("buffer") == args.end())
307  {
308  log << "Missing parameter: buffer";
309  usage = true;
310  }
311 
312  if (usage)
313  {
314  log << "Usage:\n"
315  << "--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
316  << "from: RocksDB database to import from\n"
317  << "to: NuDB database to import to\n"
318  << "buffer: Buffer size (bigger is faster)\n"
319  << "NuDB database must not already exist.";
320  return;
321  }
322 
323  // This controls the size of the bucket buffer.
324  // For a 1TB data file, a 32GB bucket buffer is suggested.
325  // The larger the buffer, the faster the import.
326  //
327  std::size_t const buffer_size = std::stoull(args.at("buffer"));
328  auto const from_path = args.at("from");
329  auto const to_path = args.at("to");
330 
331  using hash_type = nudb::xxhasher;
332  auto const bulk_size = 64 * 1024 * 1024;
333  float const load_factor = 0.5;
334 
335  auto const dp = to_path + ".dat";
336  auto const kp = to_path + ".key";
337 
338  auto const start = std::chrono::steady_clock::now();
339 
340  log << "from: " << from_path
341  << "\n"
342  "to: "
343  << to_path
344  << "\n"
345  "buffer: "
346  << buffer_size;
347 
349  {
350  rocksdb::Options options;
351  options.create_if_missing = false;
352  options.max_open_files = 2000; // 5000?
353  rocksdb::DB* pdb = nullptr;
354  rocksdb::Status status =
355  rocksdb::DB::OpenForReadOnly(options, from_path, &pdb);
356  if (!status.ok() || !pdb)
357  Throw<std::runtime_error>(
358  "Can't open '" + from_path + "': " + status.ToString());
359  db.reset(pdb);
360  }
361  // Create data file with values
362  std::size_t nitems = 0;
363  dat_file_header dh;
364  dh.version = currentVersion;
365  dh.uid = make_uid();
366  dh.appnum = 1;
367  dh.key_size = 32;
368 
369  native_file df;
370  error_code ec;
371  df.create(file_mode::append, dp, ec);
372  if (ec)
373  Throw<nudb::system_error>(ec);
374  bulk_writer<native_file> dw(df, 0, bulk_size);
375  {
376  {
377  auto os = dw.prepare(dat_file_header::size, ec);
378  if (ec)
379  Throw<nudb::system_error>(ec);
380  write(os, dh);
381  }
382  rocksdb::ReadOptions options;
383  options.verify_checksums = false;
384  options.fill_cache = false;
385  std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
386 
387  buffer buf;
388  for (it->SeekToFirst(); it->Valid(); it->Next())
389  {
390  if (it->key().size() != 32)
391  Throw<std::runtime_error>(
392  "Unexpected key size " +
393  std::to_string(it->key().size()));
394  void const* const key = it->key().data();
395  void const* const data = it->value().data();
396  auto const size = it->value().size();
397  std::unique_ptr<char[]> clean(new char[size]);
398  std::memcpy(clean.get(), data, size);
399  filter_inner(clean.get(), size);
400  auto const out = nodeobject_compress(clean.get(), size, buf);
401  // Verify codec correctness
402  {
403  buffer buf2;
404  auto const check =
405  nodeobject_decompress(out.first, out.second, buf2);
406  BEAST_EXPECT(check.second == size);
407  BEAST_EXPECT(
408  std::memcmp(check.first, clean.get(), size) == 0);
409  }
410  // Data Record
411  auto os = dw.prepare(
412  field<uint48_t>::size + // Size
413  32 + // Key
414  out.second,
415  ec);
416  if (ec)
417  Throw<nudb::system_error>(ec);
418  write<uint48_t>(os, out.second);
419  std::memcpy(os.data(32), key, 32);
420  std::memcpy(os.data(out.second), out.first, out.second);
421  ++nitems;
422  }
423  dw.flush(ec);
424  if (ec)
425  Throw<nudb::system_error>(ec);
426  }
427  db.reset();
428  log << "Import data: "
430  auto const df_size = df.size(ec);
431  if (ec)
432  Throw<nudb::system_error>(ec);
433  // Create key file
434  key_file_header kh;
435  kh.version = currentVersion;
436  kh.uid = dh.uid;
437  kh.appnum = dh.appnum;
438  kh.key_size = 32;
439  kh.salt = make_salt();
440  kh.pepper = pepper<hash_type>(kh.salt);
441  kh.block_size = block_size(kp);
442  kh.load_factor = std::min<std::size_t>(65536.0 * load_factor, 65535);
443  kh.buckets =
444  std::ceil(nitems / (bucket_capacity(kh.block_size) * load_factor));
445  kh.modulus = ceil_pow2(kh.buckets);
446  native_file kf;
447  kf.create(file_mode::append, kp, ec);
448  if (ec)
449  Throw<nudb::system_error>(ec);
450  buffer buf(kh.block_size);
451  {
452  std::memset(buf.get(), 0, kh.block_size);
453  ostream os(buf.get(), kh.block_size);
454  write(os, kh);
455  kf.write(0, buf.get(), kh.block_size, ec);
456  if (ec)
457  Throw<nudb::system_error>(ec);
458  }
459  // Build contiguous sequential sections of the
460  // key file using multiple passes over the data.
461  //
462  auto const buckets =
463  std::max<std::size_t>(1, buffer_size / kh.block_size);
464  buf.reserve(buckets * kh.block_size);
465  auto const passes = (kh.buckets + buckets - 1) / buckets;
466  log << "items: " << nitems
467  << "\n"
468  "buckets: "
469  << kh.buckets
470  << "\n"
471  "data: "
472  << df_size
473  << "\n"
474  "passes: "
475  << passes;
476  progress p(df_size * passes);
477  std::size_t npass = 0;
478  for (std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
479  {
480  auto const b1 = std::min(b0 + buckets, kh.buckets);
481  // Buffered range is [b0, b1)
482  auto const bn = b1 - b0;
483  // Create empty buckets
484  for (std::size_t i = 0; i < bn; ++i)
485  {
486  bucket b(kh.block_size, buf.get() + i * kh.block_size, empty);
487  }
488  // Insert all keys into buckets
489  // Iterate Data File
490  bulk_reader<native_file> r(
491  df, dat_file_header::size, df_size, bulk_size);
492  while (!r.eof())
493  {
494  auto const offset = r.offset();
495  // Data Record or Spill Record
497  auto is = r.prepare(field<uint48_t>::size, ec); // Size
498  if (ec)
499  Throw<nudb::system_error>(ec);
500  read<uint48_t>(is, size);
501  if (size > 0)
502  {
503  // Data Record
504  is = r.prepare(
505  dh.key_size + // Key
506  size,
507  ec); // Data
508  if (ec)
509  Throw<nudb::system_error>(ec);
510  std::uint8_t const* const key = is.data(dh.key_size);
511  auto const h = hash<hash_type>(key, kh.key_size, kh.salt);
512  auto const n = bucket_index(h, kh.buckets, kh.modulus);
513  p(log, npass * df_size + r.offset());
514  if (n < b0 || n >= b1)
515  continue;
516  bucket b(
517  kh.block_size, buf.get() + (n - b0) * kh.block_size);
518  maybe_spill(b, dw, ec);
519  if (ec)
520  Throw<nudb::system_error>(ec);
521  b.insert(offset, size, h);
522  }
523  else
524  {
525  // VFALCO Should never get here
526  // Spill Record
527  is = r.prepare(field<std::uint16_t>::size, ec);
528  if (ec)
529  Throw<nudb::system_error>(ec);
530  read<std::uint16_t>(is, size); // Size
531  r.prepare(size, ec); // skip
532  if (ec)
533  Throw<nudb::system_error>(ec);
534  }
535  }
536  kf.write(
537  (b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
538  if (ec)
539  Throw<nudb::system_error>(ec);
540  ++npass;
541  }
542  dw.flush(ec);
543  if (ec)
544  Throw<nudb::system_error>(ec);
545  p.finish(log);
546  }
547 };
548 
549 BEAST_DEFINE_TESTSUITE_MANUAL(import, NodeStore, ripple);
550 
551 #endif
552 
553 //------------------------------------------------------------------------------
554 
555 } // namespace NodeStore
556 } // namespace ripple
ripple::NodeStore::detail::fmtdur
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
Definition: import_test.cpp:182
ripple::NodeStore::nodeobject_decompress
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition: codec.h:108
sstream
std::setprecision
T setprecision(T... args)
std::string
STL class.
ripple::NodeStore::parse_args
std::map< std::string, std::string, boost::beast::iless > parse_args(std::string const &s)
Definition: import_test.cpp:249
ripple::NodeStore::detail::save_stream_state::os_
std::ostream & os_
Definition: import_test.cpp:67
ripple::NodeStore::progress::finish
void finish(Log &log)
Definition: import_test.cpp:242
ripple::NodeStore::detail::save_stream_state::fill_
std::ios::char_type fill_
Definition: import_test.cpp:70
std::stoull
T stoull(T... args)
beast::basic_seconds_clock::duration
typename Clock::duration duration
Definition: basic_seconds_clock.h:45
ripple::NodeStore::detail::save_stream_state::save_stream_state
save_stream_state(save_stream_state const &)=delete
beast::rfc2616::split
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
Definition: rfc2616.h:123
std::size
T size(T... args)
std::chrono::duration
std::map::emplace
T emplace(T... args)
std::stringstream
STL class.
Json::check
void check(bool condition, std::string const &message)
Definition: json/Writer.h:252
ripple::NodeStore::progress::work_
const std::size_t work_
Definition: import_test.cpp:198
ripple::NodeStore::detail::save_stream_state
Definition: import_test.cpp:65
beast::basic_seconds_clock::time_point
typename Clock::time_point time_point
Definition: basic_seconds_clock.h:46
ripple::NodeStore::write
void write(nudb::detail::ostream &os, std::size_t t)
Definition: varint.h:133
std::unique_ptr::reset
T reset(T... args)
algorithm
std::ostream::fill
T fill(T... args)
ripple::NodeStore::detail::save_stream_state::flags_
std::ios::fmtflags flags_
Definition: import_test.cpp:69
ripple::NodeStore::detail::save_stream_state::operator=
save_stream_state & operator=(save_stream_state const &)=delete
ripple::QualityDirection::out
@ out
beast::basic_seconds_clock
A clock whose minimum resolution is one second.
Definition: basic_seconds_clock.h:36
std::streamsize
std::log
T log(T... args)
ripple::NodeStore::detail::save_stream_state::save_stream_state
save_stream_state(std::ostream &os)
Definition: import_test.cpp:82
ripple::NodeStore::progress
Definition: import_test.cpp:193
std::ostream
STL class.
chrono
ripple::NodeStore::progress::progress
progress(std::size_t work)
Definition: import_test.cpp:206
std::to_string
T to_string(T... args)
ripple::NodeStore::filter_inner
void filter_inner(void *in, std::size_t in_size)
Definition: codec.h:315
ripple::NodeStore::BEAST_DEFINE_TESTSUITE_MANUAL
BEAST_DEFINE_TESTSUITE_MANUAL(DatabaseShard, NodeStore, ripple)
std::uint8_t
std::ostream::flags
T flags(T... args)
map
std::ceil
T ceil(T... args)
std::experimental::filesystem::status
T status(T... args)
ripple::NodeStore::detail::pretty_time
std::ostream & pretty_time(std::ostream &os, std::chrono::duration< Rep, Period > d)
Definition: import_test.cpp:93
std::min
T min(T... args)
ripple::NodeStore::progress::operator()
void operator()(Log &log, std::size_t work)
Definition: import_test.cpp:212
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::NodeStore::detail::save_stream_state::precision_
std::streamsize precision_
Definition: import_test.cpp:68
iomanip
std::string::begin
T begin(T... args)
beast::basic_seconds_clock::rep
typename Clock::rep rep
Definition: basic_seconds_clock.h:43
std::fixed
T fixed(T... args)
std::stringstream::str
T str(T... args)
std::size_t
std::memcpy
T memcpy(T... args)
ripple::NodeStore::nodeobject_compress
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition: codec.h:219
std::string::end
T end(T... args)
std::memcmp
T memcmp(T... args)
std::unique_ptr
STL class.
ripple::NodeStore::detail::save_stream_state::~save_stream_state
~save_stream_state()
Definition: import_test.cpp:73
std::data
T data(T... args)
std::memset
T memset(T... args)
std::ostream::precision
T precision(T... args)
std::chrono
std::chrono::steady_clock::now
T now(T... args)