20 #ifdef RIPPLED_REPORTING
22 #include <cassandra.h>
25 #include <ripple/basics/Slice.h>
26 #include <ripple/basics/StringUtilities.h>
27 #include <ripple/basics/contract.h>
28 #include <ripple/basics/strHex.h>
29 #include <ripple/nodestore/Backend.h>
30 #include <ripple/nodestore/Factory.h>
31 #include <ripple/nodestore/Manager.h>
32 #include <ripple/nodestore/impl/DecodedBlob.h>
33 #include <ripple/nodestore/impl/EncodedBlob.h>
34 #include <ripple/nodestore/impl/codec.h>
35 #include <ripple/protocol/digest.h>
36 #include <boost/asio/steady_timer.hpp>
37 #include <boost/filesystem.hpp>
49 #include <nudb/nudb.hpp>
60 writeCallback(CassFuture* fut,
void* cbData);
62 readCallback(CassFuture* fut,
void* cbData);
64 class CassandraBackend :
public Backend
70 makeStatement(
char const* query,
std::size_t params)
72 CassStatement* ret = cass_statement_new(query, params);
74 cass_statement_set_consistency(ret, CASS_CONSISTENCY_QUORUM);
78 ss <<
"nodestore: Error setting query consistency: " << query
79 <<
", result: " << rc <<
", " << cass_error_desc(rc);
80 Throw<std::runtime_error>(ss.
str());
87 size_t const keyBytes_;
89 Section
const config_;
98 [](CassSession* session) {
100 CassFuture* fut = cass_session_close(session);
101 cass_future_wait(fut);
102 cass_future_free(fut);
103 cass_session_free(session);
108 const CassPrepared* insert_ =
nullptr;
109 const CassPrepared* select_ =
nullptr;
112 boost::asio::io_context ioContext_;
118 uint32_t maxRequestsOutstanding = 10000000;
131 Counters<std::atomic<std::uint64_t>> counters_;
136 Section
const& keyValues,
138 : j_(journal), keyBytes_(keyBytes), config_(keyValues)
142 ~CassandraBackend()
override
163 open(
bool createIfMissing)
override
168 JLOG(j_.
error()) <<
"database is already open";
173 CassCluster* cluster = cass_cluster_new();
175 Throw<std::runtime_error>(
176 "nodestore:: Failed to create CassCluster");
178 std::string secureConnectBundle =
get(config_,
"secure_connect_bundle");
180 if (!secureConnectBundle.
empty())
184 if (cass_cluster_set_cloud_secure_connection_bundle(
185 cluster, secureConnectBundle.
c_str()) != CASS_OK)
187 JLOG(j_.
error()) <<
"Unable to configure cloud using the "
188 "secure connection bundle: "
189 << secureConnectBundle;
190 Throw<std::runtime_error>(
191 "nodestore: Failed to connect using secure connection "
199 if (contact_points.
empty())
201 Throw<std::runtime_error>(
202 "nodestore: Missing contact_points in Cassandra config");
204 CassError rc = cass_cluster_set_contact_points(
205 cluster, contact_points.
c_str());
209 ss <<
"nodestore: Error setting Cassandra contact_points: "
210 << contact_points <<
", result: " << rc <<
", "
211 << cass_error_desc(rc);
213 Throw<std::runtime_error>(ss.
str());
216 int port = get<int>(config_,
"port");
219 rc = cass_cluster_set_port(cluster, port);
223 ss <<
"nodestore: Error setting Cassandra port: " << port
224 <<
", result: " << rc <<
", " << cass_error_desc(rc);
226 Throw<std::runtime_error>(ss.
str());
230 cass_cluster_set_token_aware_routing(cluster, cass_true);
231 CassError rc = cass_cluster_set_protocol_version(
232 cluster, CASS_PROTOCOL_VERSION_V4);
236 ss <<
"nodestore: Error setting cassandra protocol version: "
237 <<
", result: " << rc <<
", " << cass_error_desc(rc);
239 Throw<std::runtime_error>(ss.
str());
246 <<
" password = " <<
get(config_,
"password")
248 cass_cluster_set_credentials(
249 cluster, username.
c_str(),
get(config_,
"password").c_str());
252 unsigned int const ioThreads = get<int>(config_,
"io_threads", 4);
253 maxRequestsOutstanding =
254 get<int>(config_,
"max_requests_outstanding", 10000000);
255 JLOG(j_.
info()) <<
"Configuring Cassandra driver to use " << ioThreads
256 <<
" IO threads. Capping maximum pending requests at "
257 << maxRequestsOutstanding;
258 rc = cass_cluster_set_num_threads_io(cluster, ioThreads);
262 ss <<
"nodestore: Error setting Cassandra io threads to "
263 << ioThreads <<
", result: " << rc <<
", "
264 << cass_error_desc(rc);
265 Throw<std::runtime_error>(ss.
str());
268 rc = cass_cluster_set_queue_size_io(
270 maxRequestsOutstanding);
275 ss <<
"nodestore: Error setting Cassandra max core connections per "
277 <<
", result: " << rc <<
", " << cass_error_desc(rc);
282 cass_cluster_set_request_timeout(cluster, 2000);
288 boost::filesystem::path(certfile).
string(), std::ios::in);
292 ss <<
"opening config file " << certfile;
293 Throw<std::system_error>(
299 if (fileStream.bad())
302 ss <<
"reading config file " << certfile;
303 Throw<std::system_error>(
307 CassSsl* context = cass_ssl_new();
308 cass_ssl_set_verify_flags(context, CASS_SSL_VERIFY_NONE);
309 rc = cass_ssl_add_trusted_cert(context, cert.c_str());
313 ss <<
"nodestore: Error setting Cassandra ssl context: " << rc
314 <<
", " << cass_error_desc(rc);
315 Throw<std::runtime_error>(ss.
str());
318 cass_cluster_set_ssl(cluster, context);
319 cass_ssl_free(context);
323 if (keyspace.
empty())
325 Throw<std::runtime_error>(
326 "nodestore: Missing keyspace in Cassandra config");
330 if (tableName.
empty())
332 Throw<std::runtime_error>(
333 "nodestore: Missing table name in Cassandra config");
336 cass_cluster_set_connect_timeout(cluster, 10000);
338 CassStatement* statement;
340 bool setupSessionAndTable =
false;
341 while (!setupSessionAndTable)
344 session_.reset(cass_session_new());
347 fut = cass_session_connect_keyspace(
348 session_.get(), cluster, keyspace.
c_str());
349 rc = cass_future_error_code(fut);
350 cass_future_free(fut);
354 ss <<
"nodestore: Error connecting Cassandra session keyspace: "
355 << rc <<
", " << cass_error_desc(rc);
361 query <<
"CREATE TABLE IF NOT EXISTS " << tableName
362 <<
" ( hash blob PRIMARY KEY, object blob)";
364 statement = makeStatement(query.
str().c_str(), 0);
365 fut = cass_session_execute(session_.get(), statement);
366 rc = cass_future_error_code(fut);
367 cass_future_free(fut);
368 cass_statement_free(statement);
369 if (rc != CASS_OK && rc != CASS_ERROR_SERVER_INVALID_QUERY)
372 ss <<
"nodestore: Error creating Cassandra table: " << rc
373 <<
", " << cass_error_desc(rc);
379 query <<
"SELECT * FROM " << tableName <<
" LIMIT 1";
380 statement = makeStatement(query.
str().c_str(), 0);
381 fut = cass_session_execute(session_.get(), statement);
382 rc = cass_future_error_code(fut);
383 cass_future_free(fut);
384 cass_statement_free(statement);
387 if (rc == CASS_ERROR_SERVER_INVALID_QUERY)
389 JLOG(j_.
warn()) <<
"table not here yet, sleeping 1s to "
390 "see if table creation propagates";
396 ss <<
"nodestore: Error checking for table: " << rc <<
", "
397 << cass_error_desc(rc);
403 setupSessionAndTable =
true;
406 cass_cluster_free(cluster);
408 bool setupPreparedStatements =
false;
409 while (!setupPreparedStatements)
413 query <<
"INSERT INTO " << tableName
414 <<
" (hash, object) VALUES (?, ?)";
415 CassFuture* prepare_future =
416 cass_session_prepare(session_.get(), query.
str().c_str());
419 rc = cass_future_error_code(prepare_future);
424 cass_future_free(prepare_future);
427 ss <<
"nodestore: Error preparing insert : " << rc <<
", "
428 << cass_error_desc(rc);
434 insert_ = cass_future_get_prepared(prepare_future);
439 cass_future_free(prepare_future);
442 query <<
"SELECT object FROM " << tableName <<
" WHERE hash = ?";
444 cass_session_prepare(session_.get(), query.
str().c_str());
447 rc = cass_future_error_code(prepare_future);
452 cass_future_free(prepare_future);
455 ss <<
"nodestore: Error preparing select : " << rc <<
", "
456 << cass_error_desc(rc);
462 select_ = cass_future_get_prepared(prepare_future);
467 cass_future_free(prepare_future);
468 setupPreparedStatements =
true;
472 ioThread_ =
std::thread{[
this]() { ioContext_.run(); }};
484 cass_prepared_free(insert_);
489 cass_prepared_free(select_);
505 JLOG(j_.
trace()) <<
"Fetching from cassandra";
506 CassStatement* statement = cass_prepared_bind(select_);
507 cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
508 CassError rc = cass_statement_bind_bytes(
509 statement, 0,
static_cast<cass_byte_t const*
>(key), keyBytes_);
512 cass_statement_free(statement);
513 JLOG(j_.
error()) <<
"Binding Cassandra fetch query: " << rc <<
", "
514 << cass_error_desc(rc);
521 fut = cass_session_execute(session_.get(), statement);
522 rc = cass_future_error_code(fut);
526 ss <<
"Cassandra fetch error";
528 ++counters_.readRetries;
529 ss <<
": " << cass_error_desc(rc);
532 }
while (rc != CASS_OK);
534 CassResult
const* res = cass_future_get_result(fut);
535 cass_statement_free(statement);
536 cass_future_free(fut);
538 CassRow
const* row = cass_result_first_row(res);
541 cass_result_free(res);
545 cass_byte_t
const* buf;
547 rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
550 cass_result_free(res);
552 JLOG(j_.
error()) <<
"Cassandra fetch result error: " << rc <<
", "
553 << cass_error_desc(rc);
554 ++counters_.readErrors;
558 nudb::detail::buffer bf;
561 DecodedBlob decoded(key, uncompressed.
first, uncompressed.
second);
562 cass_result_free(res);
564 if (!decoded.wasOk())
567 JLOG(j_.
error()) <<
"Cassandra error decoding result: " << rc
568 <<
", " << cass_error_desc(rc);
569 ++counters_.readErrors;
572 *pno = decoded.createObject();
576 struct ReadCallbackData
578 CassandraBackend& backend;
579 const void*
const key;
587 CassandraBackend& backend,
588 const void*
const key,
597 , numFinished(numFinished)
598 , batchSize(batchSize)
602 ReadCallbackData(ReadCallbackData
const& other) =
default;
609 JLOG(j_.
trace()) <<
"Fetching " << numHashes
610 <<
" records from Cassandra";
619 cbs.
push_back(std::make_shared<ReadCallbackData>(
621 static_cast<void const*
>(hashes[i]),
628 assert(results.size() == cbs.
size());
631 cv.
wait(lck, [&numFinished, &numHashes]() {
632 return numFinished == numHashes;
635 JLOG(j_.
trace()) <<
"Fetched " << numHashes
636 <<
" records from Cassandra";
637 return {results,
ok};
641 read(ReadCallbackData& data)
643 CassStatement* statement = cass_prepared_bind(select_);
644 cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
645 CassError rc = cass_statement_bind_bytes(
646 statement, 0,
static_cast<cass_byte_t const*
>(
data.key), keyBytes_);
649 size_t batchSize =
data.batchSize;
650 if (++(
data.numFinished) == batchSize)
651 data.cv.notify_all();
652 cass_statement_free(statement);
653 JLOG(j_.
error()) <<
"Binding Cassandra fetch query: " << rc <<
", "
654 << cass_error_desc(rc);
658 CassFuture* fut = cass_session_execute(session_.get(), statement);
660 cass_statement_free(statement);
662 cass_future_set_callback(fut, readCallback,
static_cast<void*
>(&data));
663 cass_future_free(fut);
666 struct WriteCallbackData
668 CassandraBackend* backend;
675 std::chrono::steady_clock::time_point
begin;
678 nudb::detail::buffer bf;
681 uint32_t currentRetries = 0;
687 : backend(f), no(nobj), totalWriteRetries(retries)
697 write(WriteCallbackData& data,
bool isRetry)
708 if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
710 JLOG(j_.
trace()) << __func__ <<
" : "
711 <<
"Max outstanding requests reached. "
712 <<
"Waiting for other requests to finish";
713 ++counters_.writesDelayed;
714 throttleCv_.
wait(lck, [
this]() {
715 return numRequestsOutstanding_ < maxRequestsOutstanding;
720 CassStatement* statement = cass_prepared_bind(insert_);
721 cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
722 CassError rc = cass_statement_bind_bytes(
725 static_cast<cass_byte_t const*
>(
data.e->getKey()),
729 cass_statement_free(statement);
731 ss <<
"Binding cassandra insert hash: " << rc <<
", "
732 << cass_error_desc(rc);
733 JLOG(j_.
error()) << __func__ <<
" : " << ss.
str();
734 Throw<std::runtime_error>(ss.
str());
736 rc = cass_statement_bind_bytes(
739 static_cast<cass_byte_t const*
>(
data.compressed.first),
740 data.compressed.second);
743 cass_statement_free(statement);
745 ss <<
"Binding cassandra insert object: " << rc <<
", "
746 << cass_error_desc(rc);
747 JLOG(j_.
error()) << __func__ <<
" : " << ss.
str();
748 Throw<std::runtime_error>(ss.
str());
751 CassFuture* fut = cass_session_execute(session_.get(), statement);
752 cass_statement_free(statement);
754 cass_future_set_callback(fut, writeCallback,
static_cast<void*
>(&data));
755 cass_future_free(fut);
761 JLOG(j_.
trace()) <<
"Writing to cassandra";
762 WriteCallbackData*
data =
763 new WriteCallbackData(
this, no, counters_.writeRetries);
765 ++numRequestsOutstanding_;
770 storeBatch(
Batch const& batch)
override
772 for (
auto const& no : batch)
783 syncCv_.
wait(lck, [
this]() {
return numRequestsOutstanding_ == 0; });
793 Throw<std::runtime_error>(
"not implemented");
797 getWriteLoad()
override
803 setDeletePath()
override
808 fdRequired()
const override
814 counters()
const override
820 writeCallback(CassFuture* fut,
void* cbData);
823 readCallback(CassFuture* fut,
void* cbData);
830 readCallback(CassFuture* fut,
void* cbData)
832 CassandraBackend::ReadCallbackData& requestParams =
833 *
static_cast<CassandraBackend::ReadCallbackData*
>(cbData);
835 CassError rc = cass_future_error_code(fut);
839 ++(requestParams.backend.counters_.readRetries);
840 JLOG(requestParams.backend.j_.warn())
841 <<
"Cassandra fetch error : " << rc <<
" : " << cass_error_desc(rc)
848 requestParams.backend.read(requestParams);
852 auto finish = [&requestParams]() {
853 size_t batchSize = requestParams.batchSize;
854 if (++(requestParams.numFinished) == batchSize)
855 requestParams.cv.notify_all();
857 CassResult
const* res = cass_future_get_result(fut);
859 CassRow
const* row = cass_result_first_row(res);
862 cass_result_free(res);
863 JLOG(requestParams.backend.j_.error())
864 <<
"Cassandra fetch get row error : " << rc <<
", "
865 << cass_error_desc(rc);
869 cass_byte_t
const* buf;
871 rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize);
874 cass_result_free(res);
875 JLOG(requestParams.backend.j_.error())
876 <<
"Cassandra fetch get bytes error : " << rc <<
", "
877 << cass_error_desc(rc);
878 ++requestParams.backend.counters_.readErrors;
882 nudb::detail::buffer bf;
886 requestParams.key, uncompressed.
first, uncompressed.
second);
887 cass_result_free(res);
889 if (!decoded.wasOk())
891 JLOG(requestParams.backend.j_.fatal())
892 <<
"Cassandra fetch error - data corruption : " << rc <<
", "
893 << cass_error_desc(rc);
894 ++requestParams.backend.counters_.readErrors;
898 requestParams.result = decoded.createObject();
907 writeCallback(CassFuture* fut,
void* cbData)
909 CassandraBackend::WriteCallbackData& requestParams =
910 *
static_cast<CassandraBackend::WriteCallbackData*
>(cbData);
911 CassandraBackend& backend = *requestParams.backend;
912 auto rc = cass_future_error_code(fut);
915 JLOG(backend.j_.error())
916 <<
"ERROR!!! Cassandra insert error: " << rc <<
", "
917 << cass_error_desc(rc) <<
", retrying ";
918 ++requestParams.totalWriteRetries;
922 ++requestParams.currentRetries;
924 std::make_shared<boost::asio::steady_timer>(
926 timer->async_wait([timer, &requestParams, &backend](
927 const boost::system::error_code& error) {
928 backend.write(requestParams,
true);
933 backend.counters_.writeDurationUs +=
934 std::chrono::duration_cast<std::chrono::microseconds>(
937 --(backend.numRequestsOutstanding_);
939 backend.throttleCv_.notify_all();
940 if (backend.numRequestsOutstanding_ == 0)
941 backend.syncCv_.notify_all();
942 delete &requestParams;
948 class CassandraFactory :
public Factory
956 ~CassandraFactory()
override
962 getName()
const override
970 Section
const& keyValues,
972 Scheduler& scheduler,
975 return std::make_unique<CassandraBackend>(keyBytes, keyValues, journal);
979 static CassandraFactory cassandraFactory;