rippled
CassandraFactory.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifdef RIPPLED_REPORTING
21 
22 #include <cassandra.h>
23 #include <libpq-fe.h>
24 
25 #include <ripple/basics/Slice.h>
26 #include <ripple/basics/StringUtilities.h>
27 #include <ripple/basics/contract.h>
28 #include <ripple/basics/strHex.h>
29 #include <ripple/nodestore/Backend.h>
30 #include <ripple/nodestore/Factory.h>
31 #include <ripple/nodestore/Manager.h>
32 #include <ripple/nodestore/impl/DecodedBlob.h>
33 #include <ripple/nodestore/impl/EncodedBlob.h>
34 #include <ripple/nodestore/impl/codec.h>
35 #include <ripple/protocol/digest.h>
36 #include <boost/asio/steady_timer.hpp>
37 #include <boost/filesystem.hpp>
38 #include <atomic>
39 #include <cassert>
40 #include <chrono>
41 #include <cmath>
42 #include <cstdint>
43 #include <cstdio>
44 #include <cstring>
45 #include <exception>
46 #include <fstream>
47 #include <memory>
48 #include <mutex>
49 #include <nudb/nudb.hpp>
50 #include <queue>
51 #include <sstream>
52 #include <thread>
53 #include <utility>
54 #include <vector>
55 
56 namespace ripple {
57 namespace NodeStore {
58 
59 void
60 writeCallback(CassFuture* fut, void* cbData);
61 void
62 readCallback(CassFuture* fut, void* cbData);
63 
64 class CassandraBackend : public Backend
65 {
66 private:
67  // convenience function for one-off queries. For normal reads and writes,
68  // use the prepared statements insert_ and select_
69  CassStatement*
70  makeStatement(char const* query, std::size_t params)
71  {
72  CassStatement* ret = cass_statement_new(query, params);
73  CassError rc =
74  cass_statement_set_consistency(ret, CASS_CONSISTENCY_QUORUM);
75  if (rc != CASS_OK)
76  {
78  ss << "nodestore: Error setting query consistency: " << query
79  << ", result: " << rc << ", " << cass_error_desc(rc);
80  Throw<std::runtime_error>(ss.str());
81  }
82  return ret;
83  }
84 
85  beast::Journal const j_;
86  // size of a key
87  size_t const keyBytes_;
88 
89  Section const config_;
90 
91  std::atomic<bool> open_{false};
92 
93  // mutex used for open() and close()
94  std::mutex mutex_;
95 
96  std::unique_ptr<CassSession, void (*)(CassSession*)> session_{
97  nullptr,
98  [](CassSession* session) {
99  // Try to disconnect gracefully.
100  CassFuture* fut = cass_session_close(session);
101  cass_future_wait(fut);
102  cass_future_free(fut);
103  cass_session_free(session);
104  }};
105 
106  // Database statements cached server side. Using these is more efficient
107  // than making a new statement
108  const CassPrepared* insert_ = nullptr;
109  const CassPrepared* select_ = nullptr;
110 
111  // io_context used for exponential backoff for write retries
112  boost::asio::io_context ioContext_;
114  std::thread ioThread_;
115 
116  // maximum number of concurrent in flight requests. New requests will wait
117  // for earlier requests to finish if this limit is exceeded
118  uint32_t maxRequestsOutstanding = 10000000;
119  std::atomic_uint32_t numRequestsOutstanding_ = 0;
120 
121  // mutex and condition_variable to limit the number of concurrent in flight
122  // requests
123  std::mutex throttleMutex_;
124  std::condition_variable throttleCv_;
125 
126  // writes are asynchronous. This mutex and condition_variable is used to
127  // wait for all writes to finish
128  std::mutex syncMutex_;
129  std::condition_variable syncCv_;
130 
131  Counters<std::atomic<std::uint64_t>> counters_;
132 
133 public:
134  CassandraBackend(
135  size_t keyBytes,
136  Section const& keyValues,
137  beast::Journal journal)
138  : j_(journal), keyBytes_(keyBytes), config_(keyValues)
139  {
140  }
141 
142  ~CassandraBackend() override
143  {
144  close();
145  }
146 
148  getName() override
149  {
150  return "cassandra";
151  }
152 
153  bool
154  isOpen() override
155  {
156  return open_;
157  }
158 
159  // Setup all of the necessary components for talking to the database.
160  // Create the table if it doesn't exist already
161  // @param createIfMissing ignored
162  void
163  open(bool createIfMissing) override
164  {
165  if (open_)
166  {
167  assert(false);
168  JLOG(j_.error()) << "database is already open";
169  return;
170  }
171 
173  CassCluster* cluster = cass_cluster_new();
174  if (!cluster)
175  Throw<std::runtime_error>(
176  "nodestore:: Failed to create CassCluster");
177 
178  std::string secureConnectBundle = get(config_, "secure_connect_bundle");
179 
180  if (!secureConnectBundle.empty())
181  {
182  /* Setup driver to connect to the cloud using the secure connection
183  * bundle */
184  if (cass_cluster_set_cloud_secure_connection_bundle(
185  cluster, secureConnectBundle.c_str()) != CASS_OK)
186  {
187  JLOG(j_.error()) << "Unable to configure cloud using the "
188  "secure connection bundle: "
189  << secureConnectBundle;
190  Throw<std::runtime_error>(
191  "nodestore: Failed to connect using secure connection "
192  "bundle");
193  return;
194  }
195  }
196  else
197  {
198  std::string contact_points = get(config_, "contact_points");
199  if (contact_points.empty())
200  {
201  Throw<std::runtime_error>(
202  "nodestore: Missing contact_points in Cassandra config");
203  }
204  CassError rc = cass_cluster_set_contact_points(
205  cluster, contact_points.c_str());
206  if (rc != CASS_OK)
207  {
209  ss << "nodestore: Error setting Cassandra contact_points: "
210  << contact_points << ", result: " << rc << ", "
211  << cass_error_desc(rc);
212 
213  Throw<std::runtime_error>(ss.str());
214  }
215 
216  int port = get<int>(config_, "port");
217  if (port)
218  {
219  rc = cass_cluster_set_port(cluster, port);
220  if (rc != CASS_OK)
221  {
223  ss << "nodestore: Error setting Cassandra port: " << port
224  << ", result: " << rc << ", " << cass_error_desc(rc);
225 
226  Throw<std::runtime_error>(ss.str());
227  }
228  }
229  }
230  cass_cluster_set_token_aware_routing(cluster, cass_true);
231  CassError rc = cass_cluster_set_protocol_version(
232  cluster, CASS_PROTOCOL_VERSION_V4);
233  if (rc != CASS_OK)
234  {
236  ss << "nodestore: Error setting cassandra protocol version: "
237  << ", result: " << rc << ", " << cass_error_desc(rc);
238 
239  Throw<std::runtime_error>(ss.str());
240  }
241 
242  std::string username = get(config_, "username");
243  if (username.size())
244  {
245  std::cout << "user = " << username
246  << " password = " << get(config_, "password")
247  << std::endl;
248  cass_cluster_set_credentials(
249  cluster, username.c_str(), get(config_, "password").c_str());
250  }
251 
252  unsigned int const ioThreads = get<int>(config_, "io_threads", 4);
253  maxRequestsOutstanding =
254  get<int>(config_, "max_requests_outstanding", 10000000);
255  JLOG(j_.info()) << "Configuring Cassandra driver to use " << ioThreads
256  << " IO threads. Capping maximum pending requests at "
257  << maxRequestsOutstanding;
258  rc = cass_cluster_set_num_threads_io(cluster, ioThreads);
259  if (rc != CASS_OK)
260  {
262  ss << "nodestore: Error setting Cassandra io threads to "
263  << ioThreads << ", result: " << rc << ", "
264  << cass_error_desc(rc);
265  Throw<std::runtime_error>(ss.str());
266  }
267 
268  rc = cass_cluster_set_queue_size_io(
269  cluster,
270  maxRequestsOutstanding); // This number needs to scale w/ the
271  // number of request per sec
272  if (rc != CASS_OK)
273  {
275  ss << "nodestore: Error setting Cassandra max core connections per "
276  "host"
277  << ", result: " << rc << ", " << cass_error_desc(rc);
278  std::cout << ss.str() << std::endl;
279  return;
280  ;
281  }
282  cass_cluster_set_request_timeout(cluster, 2000);
283 
284  std::string certfile = get(config_, "certfile");
285  if (certfile.size())
286  {
287  std::ifstream fileStream(
288  boost::filesystem::path(certfile).string(), std::ios::in);
289  if (!fileStream)
290  {
292  ss << "opening config file " << certfile;
293  Throw<std::system_error>(
294  errno, std::generic_category(), ss.str());
295  }
296  std::string cert(
297  std::istreambuf_iterator<char>{fileStream},
299  if (fileStream.bad())
300  {
302  ss << "reading config file " << certfile;
303  Throw<std::system_error>(
304  errno, std::generic_category(), ss.str());
305  }
306 
307  CassSsl* context = cass_ssl_new();
308  cass_ssl_set_verify_flags(context, CASS_SSL_VERIFY_NONE);
309  rc = cass_ssl_add_trusted_cert(context, cert.c_str());
310  if (rc != CASS_OK)
311  {
313  ss << "nodestore: Error setting Cassandra ssl context: " << rc
314  << ", " << cass_error_desc(rc);
315  Throw<std::runtime_error>(ss.str());
316  }
317 
318  cass_cluster_set_ssl(cluster, context);
319  cass_ssl_free(context);
320  }
321 
322  std::string keyspace = get(config_, "keyspace");
323  if (keyspace.empty())
324  {
325  Throw<std::runtime_error>(
326  "nodestore: Missing keyspace in Cassandra config");
327  }
328 
329  std::string tableName = get(config_, "table_name");
330  if (tableName.empty())
331  {
332  Throw<std::runtime_error>(
333  "nodestore: Missing table name in Cassandra config");
334  }
335 
336  cass_cluster_set_connect_timeout(cluster, 10000);
337 
338  CassStatement* statement;
339  CassFuture* fut;
340  bool setupSessionAndTable = false;
341  while (!setupSessionAndTable)
342  {
344  session_.reset(cass_session_new());
345  assert(session_);
346 
347  fut = cass_session_connect_keyspace(
348  session_.get(), cluster, keyspace.c_str());
349  rc = cass_future_error_code(fut);
350  cass_future_free(fut);
351  if (rc != CASS_OK)
352  {
354  ss << "nodestore: Error connecting Cassandra session keyspace: "
355  << rc << ", " << cass_error_desc(rc);
356  JLOG(j_.error()) << ss.str();
357  continue;
358  }
359 
360  std::stringstream query;
361  query << "CREATE TABLE IF NOT EXISTS " << tableName
362  << " ( hash blob PRIMARY KEY, object blob)";
363 
364  statement = makeStatement(query.str().c_str(), 0);
365  fut = cass_session_execute(session_.get(), statement);
366  rc = cass_future_error_code(fut);
367  cass_future_free(fut);
368  cass_statement_free(statement);
369  if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
370  {
372  ss << "nodestore: Error creating Cassandra table: " << rc
373  << ", " << cass_error_desc(rc);
374  JLOG(j_.error()) << ss.str();
375  continue;
376  }
377 
378  query.str("");
379  query << "SELECT * FROM " << tableName << " LIMIT 1";
380  statement = makeStatement(query.str().c_str(), 0);
381  fut = cass_session_execute(session_.get(), statement);
382  rc = cass_future_error_code(fut);
383  cass_future_free(fut);
384  cass_statement_free(statement);
385  if (rc != CASS_OK)
386  {
387  if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
388  {
389  JLOG(j_.warn()) << "table not here yet, sleeping 1s to "
390  "see if table creation propagates";
391  continue;
392  }
393  else
394  {
396  ss << "nodestore: Error checking for table: " << rc << ", "
397  << cass_error_desc(rc);
398  JLOG(j_.error()) << ss.str();
399  continue;
400  }
401  }
402 
403  setupSessionAndTable = true;
404  }
405 
406  cass_cluster_free(cluster);
407 
408  bool setupPreparedStatements = false;
409  while (!setupPreparedStatements)
410  {
412  std::stringstream query;
413  query << "INSERT INTO " << tableName
414  << " (hash, object) VALUES (?, ?)";
415  CassFuture* prepare_future =
416  cass_session_prepare(session_.get(), query.str().c_str());
417 
418  /* Wait for the statement to prepare and get the result */
419  rc = cass_future_error_code(prepare_future);
420 
421  if (rc != CASS_OK)
422  {
423  /* Handle error */
424  cass_future_free(prepare_future);
425 
427  ss << "nodestore: Error preparing insert : " << rc << ", "
428  << cass_error_desc(rc);
429  JLOG(j_.error()) << ss.str();
430  continue;
431  }
432 
433  /* Get the prepared object from the future */
434  insert_ = cass_future_get_prepared(prepare_future);
435 
436  /* The future can be freed immediately after getting the prepared
437  * object
438  */
439  cass_future_free(prepare_future);
440 
441  query.str("");
442  query << "SELECT object FROM " << tableName << " WHERE hash = ?";
443  prepare_future =
444  cass_session_prepare(session_.get(), query.str().c_str());
445 
446  /* Wait for the statement to prepare and get the result */
447  rc = cass_future_error_code(prepare_future);
448 
449  if (rc != CASS_OK)
450  {
451  /* Handle error */
452  cass_future_free(prepare_future);
453 
455  ss << "nodestore: Error preparing select : " << rc << ", "
456  << cass_error_desc(rc);
457  JLOG(j_.error()) << ss.str();
458  continue;
459  }
460 
461  /* Get the prepared object from the future */
462  select_ = cass_future_get_prepared(prepare_future);
463 
464  /* The future can be freed immediately after getting the prepared
465  * object
466  */
467  cass_future_free(prepare_future);
468  setupPreparedStatements = true;
469  }
470 
471  work_.emplace(ioContext_);
472  ioThread_ = std::thread{[this]() { ioContext_.run(); }};
473  open_ = true;
474  }
475 
476  // Close the connection to the database
477  void
478  close() override
479  {
480  {
482  if (insert_)
483  {
484  cass_prepared_free(insert_);
485  insert_ = nullptr;
486  }
487  if (select_)
488  {
489  cass_prepared_free(select_);
490  select_ = nullptr;
491  }
492  work_.reset();
493  ioThread_.join();
494  }
495  open_ = false;
496  }
497 
498  // Synchronously fetch the object with key key and store the result in pno
499  // @param key the key of the object
500  // @param pno object in which to store the result
501  // @return result status of query
502  Status
503  fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
504  {
505  JLOG(j_.trace()) << "Fetching from cassandra";
506  CassStatement* statement = cass_prepared_bind(select_);
507  cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
508  CassError rc = cass_statement_bind_bytes(
509  statement, 0, static_cast<cass_byte_t const*>(key), keyBytes_);
510  if (rc != CASS_OK)
511  {
512  cass_statement_free(statement);
513  JLOG(j_.error()) << "Binding Cassandra fetch query: " << rc << ", "
514  << cass_error_desc(rc);
515  pno->reset();
516  return backendError;
517  }
518  CassFuture* fut;
519  do
520  {
521  fut = cass_session_execute(session_.get(), statement);
522  rc = cass_future_error_code(fut);
523  if (rc != CASS_OK)
524  {
526  ss << "Cassandra fetch error";
527  ss << ", retrying";
528  ++counters_.readRetries;
529  ss << ": " << cass_error_desc(rc);
530  JLOG(j_.warn()) << ss.str();
531  }
532  } while (rc != CASS_OK);
533 
534  CassResult const* res = cass_future_get_result(fut);
535  cass_statement_free(statement);
536  cass_future_free(fut);
537 
538  CassRow const* row = cass_result_first_row(res);
539  if (!row)
540  {
541  cass_result_free(res);
542  pno->reset();
543  return notFound;
544  }
545  cass_byte_t const* buf;
546  std::size_t bufSize;
547  rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
548  if (rc != CASS_OK)
549  {
550  cass_result_free(res);
551  pno->reset();
552  JLOG(j_.error()) << "Cassandra fetch result error: " << rc << ", "
553  << cass_error_desc(rc);
554  ++counters_.readErrors;
555  return backendError;
556  }
557 
558  nudb::detail::buffer bf;
560  nodeobject_decompress(buf, bufSize, bf);
561  DecodedBlob decoded(key, uncompressed.first, uncompressed.second);
562  cass_result_free(res);
563 
564  if (!decoded.wasOk())
565  {
566  pno->reset();
567  JLOG(j_.error()) << "Cassandra error decoding result: " << rc
568  << ", " << cass_error_desc(rc);
569  ++counters_.readErrors;
570  return dataCorrupt;
571  }
572  *pno = decoded.createObject();
573  return ok;
574  }
575 
576  struct ReadCallbackData
577  {
578  CassandraBackend& backend;
579  const void* const key;
582 
583  std::atomic_uint32_t& numFinished;
584  size_t batchSize;
585 
586  ReadCallbackData(
587  CassandraBackend& backend,
588  const void* const key,
591  std::atomic_uint32_t& numFinished,
592  size_t batchSize)
593  : backend(backend)
594  , key(key)
595  , result(result)
596  , cv(cv)
597  , numFinished(numFinished)
598  , batchSize(batchSize)
599  {
600  }
601 
602  ReadCallbackData(ReadCallbackData const& other) = default;
603  };
604 
606  fetchBatch(std::vector<uint256 const*> const& hashes) override
607  {
608  std::size_t const numHashes = hashes.size();
609  JLOG(j_.trace()) << "Fetching " << numHashes
610  << " records from Cassandra";
611  std::atomic_uint32_t numFinished = 0;
613  std::mutex mtx;
614  std::vector<std::shared_ptr<NodeObject>> results{numHashes};
616  cbs.reserve(numHashes);
617  for (std::size_t i = 0; i < hashes.size(); ++i)
618  {
619  cbs.push_back(std::make_shared<ReadCallbackData>(
620  *this,
621  static_cast<void const*>(hashes[i]),
622  results[i],
623  cv,
624  numFinished,
625  numHashes));
626  read(*cbs[i]);
627  }
628  assert(results.size() == cbs.size());
629 
631  cv.wait(lck, [&numFinished, &numHashes]() {
632  return numFinished == numHashes;
633  });
634 
635  JLOG(j_.trace()) << "Fetched " << numHashes
636  << " records from Cassandra";
637  return {results, ok};
638  }
639 
640  void
641  read(ReadCallbackData& data)
642  {
643  CassStatement* statement = cass_prepared_bind(select_);
644  cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
645  CassError rc = cass_statement_bind_bytes(
646  statement, 0, static_cast<cass_byte_t const*>(data.key), keyBytes_);
647  if (rc != CASS_OK)
648  {
649  size_t batchSize = data.batchSize;
650  if (++(data.numFinished) == batchSize)
651  data.cv.notify_all();
652  cass_statement_free(statement);
653  JLOG(j_.error()) << "Binding Cassandra fetch query: " << rc << ", "
654  << cass_error_desc(rc);
655  return;
656  }
657 
658  CassFuture* fut = cass_session_execute(session_.get(), statement);
659 
660  cass_statement_free(statement);
661 
662  cass_future_set_callback(fut, readCallback, static_cast<void*>(&data));
663  cass_future_free(fut);
664  }
665 
666  struct WriteCallbackData
667  {
668  CassandraBackend* backend;
669  // The shared pointer to the node object must exist until it's
670  // confirmed persisted. Otherwise, it can become deleted
671  // prematurely if other copies are removed from caches.
675  std::chrono::steady_clock::time_point begin;
676  // The data is stored in this buffer. The void* in the above member
677  // is a pointer into the below buffer
678  nudb::detail::buffer bf;
679  std::atomic<std::uint64_t>& totalWriteRetries;
680 
681  uint32_t currentRetries = 0;
682 
683  WriteCallbackData(
684  CassandraBackend* f,
685  std::shared_ptr<NodeObject> const& nobj,
687  : backend(f), no(nobj), totalWriteRetries(retries)
688  {
689  e.emplace(no);
690 
691  compressed =
692  NodeStore::nodeobject_compress(e->getData(), e->getSize(), bf);
693  }
694  };
695 
696  void
697  write(WriteCallbackData& data, bool isRetry)
698  {
699  {
700  // We limit the total number of concurrent inflight writes. This is
701  // a client side throttling to prevent overloading the database.
702  // This is mostly useful when the very first ledger is being written
703  // in full, which is several millions records. On sufficiently large
704  // Cassandra clusters, this throttling is not needed; the default
705  // value of maxRequestsOutstanding is 10 million, which is more
706  // records than are present in any single ledger
707  std::unique_lock<std::mutex> lck(throttleMutex_);
708  if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
709  {
710  JLOG(j_.trace()) << __func__ << " : "
711  << "Max outstanding requests reached. "
712  << "Waiting for other requests to finish";
713  ++counters_.writesDelayed;
714  throttleCv_.wait(lck, [this]() {
715  return numRequestsOutstanding_ < maxRequestsOutstanding;
716  });
717  }
718  }
719 
720  CassStatement* statement = cass_prepared_bind(insert_);
721  cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
722  CassError rc = cass_statement_bind_bytes(
723  statement,
724  0,
725  static_cast<cass_byte_t const*>(data.e->getKey()),
726  keyBytes_);
727  if (rc != CASS_OK)
728  {
729  cass_statement_free(statement);
731  ss << "Binding cassandra insert hash: " << rc << ", "
732  << cass_error_desc(rc);
733  JLOG(j_.error()) << __func__ << " : " << ss.str();
734  Throw<std::runtime_error>(ss.str());
735  }
736  rc = cass_statement_bind_bytes(
737  statement,
738  1,
739  static_cast<cass_byte_t const*>(data.compressed.first),
740  data.compressed.second);
741  if (rc != CASS_OK)
742  {
743  cass_statement_free(statement);
745  ss << "Binding cassandra insert object: " << rc << ", "
746  << cass_error_desc(rc);
747  JLOG(j_.error()) << __func__ << " : " << ss.str();
748  Throw<std::runtime_error>(ss.str());
749  }
751  CassFuture* fut = cass_session_execute(session_.get(), statement);
752  cass_statement_free(statement);
753 
754  cass_future_set_callback(fut, writeCallback, static_cast<void*>(&data));
755  cass_future_free(fut);
756  }
757 
758  void
759  store(std::shared_ptr<NodeObject> const& no) override
760  {
761  JLOG(j_.trace()) << "Writing to cassandra";
762  WriteCallbackData* data =
763  new WriteCallbackData(this, no, counters_.writeRetries);
764 
765  ++numRequestsOutstanding_;
766  write(*data, false);
767  }
768 
769  void
770  storeBatch(Batch const& batch) override
771  {
772  for (auto const& no : batch)
773  {
774  store(no);
775  }
776  }
777 
778  void
779  sync() override
780  {
781  std::unique_lock<std::mutex> lck(syncMutex_);
782 
783  syncCv_.wait(lck, [this]() { return numRequestsOutstanding_ == 0; });
784  }
785 
786  // Iterate through entire table and execute f(). Used for import only,
787  // with database not being written to, so safe to paginate through
788  // objects table with LIMIT x OFFSET y.
789  void
791  {
792  assert(false);
793  Throw<std::runtime_error>("not implemented");
794  }
795 
796  int
797  getWriteLoad() override
798  {
799  return 0;
800  }
801 
802  void
803  setDeletePath() override
804  {
805  }
806 
807  int
808  fdRequired() const override
809  {
810  return 0;
811  }
812 
814  counters() const override
815  {
816  return counters_;
817  }
818 
819  friend void
820  writeCallback(CassFuture* fut, void* cbData);
821 
822  friend void
823  readCallback(CassFuture* fut, void* cbData);
824 };
825 
826 // Process the result of an asynchronous read. Retry on error
827 // @param fut cassandra future associated with the read
828 // @param cbData struct that holds the request parameters
829 void
830 readCallback(CassFuture* fut, void* cbData)
831 {
832  CassandraBackend::ReadCallbackData& requestParams =
833  *static_cast<CassandraBackend::ReadCallbackData*>(cbData);
834 
835  CassError rc = cass_future_error_code(fut);
836 
837  if (rc != CASS_OK)
838  {
839  ++(requestParams.backend.counters_.readRetries);
840  JLOG(requestParams.backend.j_.warn())
841  << "Cassandra fetch error : " << rc << " : " << cass_error_desc(rc)
842  << " - retrying";
843  // Retry right away. The only time the cluster should ever be overloaded
844  // is when the very first ledger is being written in full (millions of
845  // writes at once), during which no reads should be occurring. If reads
846  // are timing out, the code/architecture should be modified to handle
847  // greater read load, as opposed to just exponential backoff
848  requestParams.backend.read(requestParams);
849  }
850  else
851  {
852  auto finish = [&requestParams]() {
853  size_t batchSize = requestParams.batchSize;
854  if (++(requestParams.numFinished) == batchSize)
855  requestParams.cv.notify_all();
856  };
857  CassResult const* res = cass_future_get_result(fut);
858 
859  CassRow const* row = cass_result_first_row(res);
860  if (!row)
861  {
862  cass_result_free(res);
863  JLOG(requestParams.backend.j_.error())
864  << "Cassandra fetch get row error : " << rc << ", "
865  << cass_error_desc(rc);
866  finish();
867  return;
868  }
869  cass_byte_t const* buf;
870  std::size_t bufSize;
871  rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
872  if (rc != CASS_OK)
873  {
874  cass_result_free(res);
875  JLOG(requestParams.backend.j_.error())
876  << "Cassandra fetch get bytes error : " << rc << ", "
877  << cass_error_desc(rc);
878  ++requestParams.backend.counters_.readErrors;
879  finish();
880  return;
881  }
882  nudb::detail::buffer bf;
884  nodeobject_decompress(buf, bufSize, bf);
885  DecodedBlob decoded(
886  requestParams.key, uncompressed.first, uncompressed.second);
887  cass_result_free(res);
888 
889  if (!decoded.wasOk())
890  {
891  JLOG(requestParams.backend.j_.fatal())
892  << "Cassandra fetch error - data corruption : " << rc << ", "
893  << cass_error_desc(rc);
894  ++requestParams.backend.counters_.readErrors;
895  finish();
896  return;
897  }
898  requestParams.result = decoded.createObject();
899  finish();
900  }
901 }
902 
903 // Process the result of an asynchronous write. Retry on error
904 // @param fut cassandra future associated with the write
905 // @param cbData struct that holds the request parameters
906 void
907 writeCallback(CassFuture* fut, void* cbData)
908 {
909  CassandraBackend::WriteCallbackData& requestParams =
910  *static_cast<CassandraBackend::WriteCallbackData*>(cbData);
911  CassandraBackend& backend = *requestParams.backend;
912  auto rc = cass_future_error_code(fut);
913  if (rc != CASS_OK)
914  {
915  JLOG(backend.j_.error())
916  << "ERROR!!! Cassandra insert error: " << rc << ", "
917  << cass_error_desc(rc) << ", retrying ";
918  ++requestParams.totalWriteRetries;
919  // exponential backoff with a max wait of 2^10 ms (about 1 second)
920  auto wait = std::chrono::milliseconds(
921  lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
922  ++requestParams.currentRetries;
924  std::make_shared<boost::asio::steady_timer>(
925  backend.ioContext_, std::chrono::steady_clock::now() + wait);
926  timer->async_wait([timer, &requestParams, &backend](
927  const boost::system::error_code& error) {
928  backend.write(requestParams, true);
929  });
930  }
931  else
932  {
933  backend.counters_.writeDurationUs +=
934  std::chrono::duration_cast<std::chrono::microseconds>(
935  std::chrono::steady_clock::now() - requestParams.begin)
936  .count();
937  --(backend.numRequestsOutstanding_);
938 
939  backend.throttleCv_.notify_all();
940  if (backend.numRequestsOutstanding_ == 0)
941  backend.syncCv_.notify_all();
942  delete &requestParams;
943  }
944 }
945 
946 //------------------------------------------------------------------------------
947 
948 class CassandraFactory : public Factory
949 {
950 public:
951  CassandraFactory()
952  {
953  Manager::instance().insert(*this);
954  }
955 
956  ~CassandraFactory() override
957  {
958  Manager::instance().erase(*this);
959  }
960 
962  getName() const override
963  {
964  return "cassandra";
965  }
966 
968  createInstance(
969  size_t keyBytes,
970  Section const& keyValues,
971  std::size_t burstSize,
972  Scheduler& scheduler,
973  beast::Journal journal) override
974  {
975  return std::make_unique<CassandraBackend>(keyBytes, keyValues, journal);
976  }
977 };
978 
979 static CassandraFactory cassandraFactory;
980 
981 } // namespace NodeStore
982 } // namespace ripple
983 #endif
ripple::NodeStore::nodeobject_decompress
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition: codec.h:108
sstream
std::this_thread::sleep_for
T sleep_for(T... args)
std::lock
T lock(T... args)
std::for_each
T for_each(T... args)
fstream
ripple::NodeStore::read
void read(nudb::detail::istream &is, std::size_t &u)
Definition: varint.h:120
std::string
STL class.
std::shared_ptr< NodeObject >
utility
exception
cstring
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::NodeStore::ok
@ ok
Definition: nodestore/Types.h:45
ripple::NodeStore::Manager::erase
virtual void erase(Factory &factory)=0
Remove a factory.
std::pair
std::vector::reserve
T reserve(T... args)
vector
std::string::size
T size(T... args)
std::chrono::seconds
std::optional::emplace
T emplace(T... args)
std::stringstream
STL class.
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
std::function
ripple::NodeStore::backendError
@ backendError
Definition: nodestore/Types.h:49
queue
cmath
ripple::NodeStore::write
void write(nudb::detail::ostream &os, std::size_t t)
Definition: varint.h:133
std::optional::reset
T reset(T... args)
std::vector::push_back
T push_back(T... args)
ripple::NodeStore::notFound
@ notFound
Definition: nodestore/Types.h:46
std::cout
ripple::NodeStore::Batch
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
Definition: nodestore/Types.h:55
ripple::NodeStore::Manager::insert
virtual void insert(Factory &factory)=0
Add a factory.
thread
std::generic_category
T generic_category(T... args)
chrono
std::string::c_str
T c_str(T... args)
std::unique_lock
STL class.
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
cstdint
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
ripple::NodeStore::dataCorrupt
@ dataCorrupt
Definition: nodestore/Types.h:47
std::condition_variable::wait
T wait(T... args)
atomic
std::istreambuf_iterator
memory
std::min
T min(T... args)
ripple::NodeStore::Status
Status
Return codes from Backend operations.
Definition: nodestore/Types.h:44
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std::lround
T lround(T... args)
std::endl
T endl(T... args)
std::begin
T begin(T... args)
cassert
std::condition_variable
std::string::empty
T empty(T... args)
std::optional
mutex
std::stringstream::str
T str(T... args)
std::size_t
ripple::NodeStore::nodeobject_compress
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition: codec.h:219
ripple::NodeStore::Manager::instance
static Manager & instance()
Returns the instance of the manager singleton.
Definition: ManagerImp.cpp:120
std::unique_ptr
STL class.
std::data
T data(T... args)
cstdio
std::thread::join
T join(T... args)
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::open
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition: SociDB.cpp:98
std::ifstream
STL class.
std::pow
T pow(T... args)
std::chrono::steady_clock::now
T now(T... args)