rippled
Pg.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2020 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifdef RIPPLED_REPORTING
21 #ifndef RIPPLE_CORE_PG_H_INCLUDED
22 #define RIPPLE_CORE_PG_H_INCLUDED
23 
24 #include <ripple/basics/BasicConfig.h>
25 #include <ripple/basics/Log.h>
26 #include <ripple/protocol/Protocol.h>
27 #include <boost/lexical_cast.hpp>
28 #include <atomic>
29 #include <chrono>
30 #include <condition_variable>
31 #include <functional>
32 #include <libpq-fe.h>
33 #include <memory>
34 #include <mutex>
35 #include <optional>
36 #include <string>
37 #include <utility>
38 #include <vector>
39 
40 namespace ripple {
41 
42 // These postgres structs must be freed only by the postgres API.
43 using pg_result_type = std::unique_ptr<PGresult, void (*)(PGresult*)>;
44 using pg_connection_type = std::unique_ptr<PGconn, void (*)(PGconn*)>;
45 
57 using pg_params =
59 
61 using pg_formatted_params = std::vector<char const*>;
62 
64 struct PgConfig
65 {
69  std::chrono::seconds timeout{600};
70 
72  std::vector<char const*> keywordsIdx;
74  std::vector<std::string> keywords;
76  std::vector<char const*> valuesIdx;
79 };
80 
81 //-----------------------------------------------------------------------------
82 
94 class PgResult
95 {
96  // The result object must be freed using the libpq API PQclear() call.
97  pg_result_type result_{nullptr, [](PGresult* result) { PQclear(result); }};
99 
100 public:
104  PgResult()
105  {
106  }
107 
112  explicit PgResult(pg_result_type&& result) : result_(std::move(result))
113  {
114  }
115 
121  PgResult(PGresult* result, PGconn* conn)
122  : error_({PQresultStatus(result), PQerrorMessage(conn)})
123  {
124  }
125 
136  char const*
137  c_str(int ntuple = 0, int nfield = 0) const
138  {
139  return PQgetvalue(result_.get(), ntuple, nfield);
140  }
141 
153  asInt(int ntuple = 0, int nfield = 0) const
154  {
155  return boost::lexical_cast<std::int32_t>(
156  PQgetvalue(result_.get(), ntuple, nfield));
157  }
158 
170  asBigInt(int ntuple = 0, int nfield = 0) const
171  {
172  return boost::lexical_cast<std::int64_t>(
173  PQgetvalue(result_.get(), ntuple, nfield));
174  }
175 
185  bool
186  isNull(int ntuple = 0, int nfield = 0) const
187  {
188  return PQgetisnull(result_.get(), ntuple, nfield);
189  }
190 
195  operator bool() const
196  {
197  return result_ != nullptr;
198  }
199 
208  msg() const;
209 
217  int
218  ntuples() const
219  {
220  return PQntuples(result_.get());
221  }
222 
230  int
231  nfields() const
232  {
233  return PQnfields(result_.get());
234  }
235 
243  ExecStatusType
244  status() const
245  {
246  return PQresultStatus(result_.get());
247  }
248 };
249 
250 /* Class that contains and operates upon a postgres connection. */
251 class Pg
252 {
253  friend class PgPool;
254  friend class PgQuery;
255 
256  PgConfig const& config_;
257  beast::Journal const j_;
258  bool& stop_;
259  std::mutex& mutex_;
260 
261  // The connection object must be freed using the libpq API PQfinish() call.
262  pg_connection_type conn_{nullptr, [](PGconn* conn) { PQfinish(conn); }};
263 
273  bool
274  clear();
275 
282  void
283  connect();
284 
286  void
287  disconnect()
288  {
289  conn_.reset();
290  }
291 
304  PgResult
305  query(char const* command, std::size_t nParams, char const* const* values);
306 
312  PgResult
313  query(char const* command)
314  {
315  return query(command, 0, nullptr);
316  }
317 
323  PgResult
324  query(pg_params const& dbParams);
325 
333  void
334  bulkInsert(char const* table, std::string const& records);
335 
336 public:
344  Pg(PgConfig const& config,
345  beast::Journal const j,
346  bool& stop,
347  std::mutex& mutex)
348  : config_(config), j_(j), stop_(stop), mutex_(mutex)
349  {
350  }
351 };
352 
353 //-----------------------------------------------------------------------------
354 
368 class PgPool
369 {
370  friend class PgQuery;
371 
373 
374  PgConfig config_;
375  beast::Journal const j_;
376  std::mutex mutex_;
378  std::size_t connections_{};
379  bool stop_{false};
380 
383  idle_;
384 
394  checkout();
395 
403  void
404  checkin(std::unique_ptr<Pg>& pg);
405 
406 public:
412  PgPool(Section const& pgConfig, beast::Journal j);
413 
419  void
420  setup();
421 
423  void
424  stop();
425 
427  void
428  idleSweeper();
429 };
430 
431 //-----------------------------------------------------------------------------
432 
439 class PgQuery
440 {
441 private:
444 
445 public:
446  PgQuery() = delete;
447 
448  PgQuery(std::shared_ptr<PgPool> const& pool)
449  : pool_(pool), pg_(pool->checkout())
450  {
451  }
452 
453  ~PgQuery()
454  {
455  pool_->checkin(pg_);
456  }
457 
463  PgResult
464  operator()(pg_params const& dbParams)
465  {
466  if (!pg_) // It means we're stopping. Return empty result.
467  return PgResult();
468  return pg_->query(dbParams);
469  }
470 
476  PgResult
477  operator()(char const* command)
478  {
479  return operator()(pg_params{command, {}});
480  }
481 
489  void
490  bulkInsert(char const* table, std::string const& records)
491  {
492  pg_->bulkInsert(table, records);
493  }
494 };
495 
496 //-----------------------------------------------------------------------------
497 
505 make_PgPool(Section const& pgConfig, beast::Journal j);
506 
514 void
515 initSchema(std::shared_ptr<PgPool> const& pool);
516 
517 } // namespace ripple
518 
519 #endif // RIPPLE_CORE_PG_H_INCLUDED
520 #endif // RIPPLED_REPORTING
std::chrono::steady_clock
std::string
STL class.
std::shared_ptr< PgPool >
utility
functional
std::pair
vector
std::chrono::seconds
chrono
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::int32_t
atomic
memory
std::experimental::filesystem::status
T status(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
std
STL namespace.
condition_variable
ripple::PeerFinder::clock_type
beast::abstract_clock< std::chrono::steady_clock > clock_type
Definition: PeerfinderManager.h:32
std::multimap
STL class.
optional
mutex
std::size_t
std::numeric_limits::max
T max(T... args)
std::unique_ptr
STL class.
string