rippled
PerfLogImp.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2018 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/perflog/impl/PerfLogImp.h>
21 
22 #include <ripple/basics/BasicConfig.h>
23 #include <ripple/beast/core/CurrentThreadName.h>
24 #include <ripple/beast/utility/Journal.h>
25 #include <ripple/core/JobTypes.h>
26 #include <ripple/json/json_writer.h>
27 #include <ripple/json/to_string.h>
28 #include <ripple/nodestore/DatabaseShard.h>
29 #include <atomic>
30 #include <cstdint>
31 #include <cstdlib>
32 #include <iostream>
33 #include <iterator>
34 #include <mutex>
35 #include <optional>
36 #include <sstream>
37 #include <stdexcept>
38 #include <string>
39 #include <unordered_map>
40 #include <utility>
41 
42 namespace ripple {
43 namespace perf {
44 
46  std::vector<char const*> const& labels,
47  JobTypes const& jobTypes)
48 {
49  {
50  // populateRpc
51  rpc_.reserve(labels.size());
52  for (std::string const label : labels)
53  {
54  auto const inserted = rpc_.emplace(label, Rpc()).second;
55  if (!inserted)
56  {
57  // Ensure that no other function populates this entry.
58  assert(false);
59  }
60  }
61  }
62  {
63  // populateJq
64  jq_.reserve(jobTypes.size());
65  for (auto const& [jobType, _] : jobTypes)
66  {
67  auto const inserted = jq_.emplace(jobType, Jq()).second;
68  if (!inserted)
69  {
70  // Ensure that no other function populates this entry.
71  assert(false);
72  }
73  }
74  }
75 }
76 
79 {
81  // totalRpc represents all rpc methods. All that started, finished, etc.
82  Rpc totalRpc;
83  for (auto const& proc : rpc_)
84  {
85  Rpc value;
86  {
87  std::lock_guard lock(proc.second.mutex);
88  if (!proc.second.value.started && !proc.second.value.finished &&
89  !proc.second.value.errored)
90  {
91  continue;
92  }
93  value = proc.second.value;
94  }
95 
97  p[jss::started] = std::to_string(value.started);
98  totalRpc.started += value.started;
99  p[jss::finished] = std::to_string(value.finished);
100  totalRpc.finished += value.finished;
101  p[jss::errored] = std::to_string(value.errored);
102  totalRpc.errored += value.errored;
103  p[jss::duration_us] = std::to_string(value.duration.count());
104  totalRpc.duration += value.duration;
105  rpcobj[proc.first] = p;
106  }
107 
108  if (totalRpc.started)
109  {
110  Json::Value totalRpcJson(Json::objectValue);
111  totalRpcJson[jss::started] = std::to_string(totalRpc.started);
112  totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
113  totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
114  totalRpcJson[jss::duration_us] =
115  std::to_string(totalRpc.duration.count());
116  rpcobj[jss::total] = totalRpcJson;
117  }
118 
120  // totalJq represents all jobs. All enqueued, started, finished, etc.
121  Jq totalJq;
122  for (auto const& proc : jq_)
123  {
124  Jq value;
125  {
126  std::lock_guard lock(proc.second.mutex);
127  if (!proc.second.value.queued && !proc.second.value.started &&
128  !proc.second.value.finished)
129  {
130  continue;
131  }
132  value = proc.second.value;
133  }
134 
136  j[jss::queued] = std::to_string(value.queued);
137  totalJq.queued += value.queued;
138  j[jss::started] = std::to_string(value.started);
139  totalJq.started += value.started;
140  j[jss::finished] = std::to_string(value.finished);
141  totalJq.finished += value.finished;
142  j[jss::queued_duration_us] =
144  totalJq.queuedDuration += value.queuedDuration;
145  j[jss::running_duration_us] =
147  totalJq.runningDuration += value.runningDuration;
148  jqobj[JobTypes::name(proc.first)] = j;
149  }
150 
151  if (totalJq.queued)
152  {
153  Json::Value totalJqJson(Json::objectValue);
154  totalJqJson[jss::queued] = std::to_string(totalJq.queued);
155  totalJqJson[jss::started] = std::to_string(totalJq.started);
156  totalJqJson[jss::finished] = std::to_string(totalJq.finished);
157  totalJqJson[jss::queued_duration_us] =
159  totalJqJson[jss::running_duration_us] =
161  jqobj[jss::total] = totalJqJson;
162  }
163 
164  Json::Value counters(Json::objectValue);
165  // Be kind to reporting tools and let them expect rpc and jq objects
166  // even if empty.
167  counters[jss::rpc] = rpcobj;
168  counters[jss::job_queue] = jqobj;
169  return counters;
170 }
171 
174 {
175  auto const present = steady_clock::now();
176 
177  Json::Value jobsArray(Json::arrayValue);
178  auto const jobs = [this] {
179  std::lock_guard lock(jobsMutex_);
180  return jobs_;
181  }();
182 
183  for (auto const& j : jobs)
184  {
185  if (j.first == jtINVALID)
186  continue;
188  jobj[jss::job] = JobTypes::name(j.first);
189  jobj[jss::duration_us] = std::to_string(
190  std::chrono::duration_cast<microseconds>(present - j.second)
191  .count());
192  jobsArray.append(jobj);
193  }
194 
195  Json::Value methodsArray(Json::arrayValue);
196  std::vector<MethodStart> methods;
197  {
198  std::lock_guard lock(methodsMutex_);
199  methods.reserve(methods_.size());
200  for (auto const& m : methods_)
201  methods.push_back(m.second);
202  }
203  for (auto m : methods)
204  {
205  Json::Value methodobj(Json::objectValue);
206  methodobj[jss::method] = m.first;
207  methodobj[jss::duration_us] = std::to_string(
208  std::chrono::duration_cast<microseconds>(present - m.second)
209  .count());
210  methodsArray.append(methodobj);
211  }
212 
214  current[jss::jobs] = jobsArray;
215  current[jss::methods] = methodsArray;
216  return current;
217 }
218 
219 //-----------------------------------------------------------------------------
220 
221 void
223 {
224  if (setup_.perfLog.empty())
225  return;
226 
227  if (logFile_.is_open())
228  logFile_.close();
229 
230  auto logDir = setup_.perfLog.parent_path();
231  if (!boost::filesystem::is_directory(logDir))
232  {
233  boost::system::error_code ec;
234  boost::filesystem::create_directories(logDir, ec);
235  if (ec)
236  {
237  JLOG(j_.fatal()) << "Unable to create performance log "
238  "directory "
239  << logDir << ": " << ec.message();
240  signalStop_();
241  return;
242  }
243  }
244 
245  logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
246 
247  if (!logFile_)
248  {
249  JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog
250  << ".";
251  signalStop_();
252  }
253 }
254 
255 void
257 {
258  beast::setCurrentThreadName("perflog");
260 
261  while (true)
262  {
263  {
265  if (cond_.wait_until(
266  lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
267  {
268  return;
269  }
270  if (rotate_)
271  {
272  openLog();
273  rotate_ = false;
274  }
275  }
276  report();
277  }
278 }
279 
280 void
282 {
283  if (!logFile_)
284  // If logFile_ is not writable do no further work.
285  return;
286 
287  auto const present = system_clock::now();
288  if (present < lastLog_ + setup_.logInterval)
289  return;
290  lastLog_ = present;
291 
293  report[jss::time] = to_string(std::chrono::floor<microseconds>(present));
294  {
296  report[jss::workers] =
297  static_cast<unsigned int>(counters_.jobs_.size());
298  }
299  report[jss::hostid] = hostname_;
300  report[jss::counters] = counters_.countersJson();
301  report[jss::nodestore] = Json::objectValue;
302  if (app_.getShardStore())
303  app_.getShardStore()->getCountsJson(report[jss::nodestore]);
304  else
305  app_.getNodeStore().getCountsJson(report[jss::nodestore]);
306  report[jss::current_activities] = counters_.currentJson();
308 
309  logFile_ << Json::Compact{std::move(report)} << std::endl;
310 }
311 
313  Setup const& setup,
314  Application& app,
315  beast::Journal journal,
316  std::function<void()>&& signalStop)
317  : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop))
318 {
319  openLog();
320 }
321 
323 {
324  stop();
325 }
326 
327 void
328 PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
329 {
330  auto counter = counters_.rpc_.find(method);
331  if (counter == counters_.rpc_.end())
332  {
333  assert(false);
334  return;
335  }
336 
337  {
338  std::lock_guard lock(counter->second.mutex);
339  ++counter->second.value.started;
340  }
342  counters_.methods_[requestId] = {
343  counter->first.c_str(), steady_clock::now()};
344 }
345 
346 void
348  std::string const& method,
349  std::uint64_t const requestId,
350  bool finish)
351 {
352  auto counter = counters_.rpc_.find(method);
353  if (counter == counters_.rpc_.end())
354  {
355  assert(false);
356  return;
357  }
358  steady_time_point startTime;
359  {
361  auto const e = counters_.methods_.find(requestId);
362  if (e != counters_.methods_.end())
363  {
364  startTime = e->second.second;
365  counters_.methods_.erase(e);
366  }
367  else
368  {
369  assert(false);
370  }
371  }
372  std::lock_guard lock(counter->second.mutex);
373  if (finish)
374  ++counter->second.value.finished;
375  else
376  ++counter->second.value.errored;
377  counter->second.value.duration += std::chrono::duration_cast<microseconds>(
378  steady_clock::now() - startTime);
379 }
380 
381 void
383 {
384  auto counter = counters_.jq_.find(type);
385  if (counter == counters_.jq_.end())
386  {
387  assert(false);
388  return;
389  }
390  std::lock_guard lock(counter->second.mutex);
391  ++counter->second.value.queued;
392 }
393 
394 void
396  JobType const type,
397  microseconds dur,
398  steady_time_point startTime,
399  int instance)
400 {
401  auto counter = counters_.jq_.find(type);
402  if (counter == counters_.jq_.end())
403  {
404  assert(false);
405  return;
406  }
407  {
408  std::lock_guard lock(counter->second.mutex);
409  ++counter->second.value.started;
410  counter->second.value.queuedDuration += dur;
411  }
413  if (instance >= 0 && instance < counters_.jobs_.size())
414  counters_.jobs_[instance] = {type, startTime};
415 }
416 
417 void
418 PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
419 {
420  auto counter = counters_.jq_.find(type);
421  if (counter == counters_.jq_.end())
422  {
423  assert(false);
424  return;
425  }
426  {
427  std::lock_guard lock(counter->second.mutex);
428  ++counter->second.value.finished;
429  counter->second.value.runningDuration += dur;
430  }
432  if (instance >= 0 && instance < counters_.jobs_.size())
433  counters_.jobs_[instance] = {jtINVALID, steady_time_point()};
434 }
435 
436 void
437 PerfLogImp::resizeJobs(int const resize)
438 {
440  if (resize > counters_.jobs_.size())
442 }
443 
444 void
446 {
447  if (setup_.perfLog.empty())
448  return;
449 
450  std::lock_guard lock(mutex_);
451  rotate_ = true;
452  cond_.notify_one();
453 }
454 
455 void
457 {
458  if (setup_.perfLog.size())
460 }
461 
462 void
464 {
465  if (thread_.joinable())
466  {
467  {
468  std::lock_guard lock(mutex_);
469  stop_ = true;
470  cond_.notify_one();
471  }
472  thread_.join();
473  }
474 }
475 
476 //-----------------------------------------------------------------------------
477 
479 setup_PerfLog(Section const& section, boost::filesystem::path const& configDir)
480 {
481  PerfLog::Setup setup;
482  std::string perfLog;
483  set(perfLog, "perf_log", section);
484  if (perfLog.size())
485  {
486  setup.perfLog = boost::filesystem::path(perfLog);
487  if (setup.perfLog.is_relative())
488  {
489  setup.perfLog =
490  boost::filesystem::absolute(setup.perfLog, configDir);
491  }
492  }
493 
494  std::uint64_t logInterval;
495  if (get_if_exists(section, "log_interval", logInterval))
496  setup.logInterval = std::chrono::seconds(logInterval);
497  return setup;
498 }
499 
502  PerfLog::Setup const& setup,
503  Application& app,
504  beast::Journal journal,
505  std::function<void()>&& signalStop)
506 {
507  return std::make_unique<PerfLogImp>(
508  setup, app, journal, std::move(signalStop));
509 }
510 
511 } // namespace perf
512 } // namespace ripple
beast::Journal::fatal
Stream fatal() const
Definition: Journal.h:339
ripple::Section
Holds a collection of configuration values.
Definition: BasicConfig.h:42
ripple::Application
Definition: Application.h:115
sstream
std::vector::resize
T resize(T... args)
ripple::perf::PerfLogImp::Counters::Jq::runningDuration
microseconds runningDuration
Definition: PerfLogImp.h:104
ripple::perf::PerfLogImp::Counters::Rpc::started
std::uint64_t started
Definition: PerfLogImp.h:85
std::string
STL class.
ripple::perf::PerfLogImp::Counters::Jq::finished
std::uint64_t finished
Definition: PerfLogImp.h:101
ripple::perf::PerfLogImp::lastLog_
system_time_point lastLog_
Definition: PerfLogImp.h:134
utility
ripple::JobTypes
Definition: JobTypes.h:32
ripple::perf::PerfLogImp::Counters::Jq::queued
std::uint64_t queued
Definition: PerfLogImp.h:99
ripple::perf::PerfLogImp::openLog
void openLog()
Definition: PerfLogImp.cpp:222
ripple::perf::PerfLog::Setup::perfLog
boost::filesystem::path perfLog
Definition: PerfLog.h:64
Json::arrayValue
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
ripple::perf::PerfLogImp::Counters::jobsMutex_
std::mutex jobsMutex_
Definition: PerfLogImp.h:112
std::vector::reserve
T reserve(T... args)
ripple::perf::PerfLogImp::PerfLogImp
PerfLogImp(Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
Definition: PerfLogImp.cpp:312
Json::Compact
Decorator for streaming out compact json.
Definition: json_writer.h:316
ripple::perf::PerfLogImp::Counters::methodsMutex_
std::mutex methodsMutex_
Definition: PerfLogImp.h:114
std::vector
STL class.
std::vector::size
T size(T... args)
ripple::perf::PerfLogImp::jobFinish
void jobFinish(JobType const type, microseconds dur, int instance) override
Log job finishing.
Definition: PerfLogImp.cpp:418
ripple::perf::PerfLogImp::j_
const beast::Journal j_
Definition: PerfLogImp.h:127
ripple::perf::PerfLogImp::rotate
void rotate() override
Rotate perf log file.
Definition: PerfLogImp.cpp:445
std::chrono::microseconds
ripple::perf::PerfLogImp::cond_
std::condition_variable cond_
Definition: PerfLogImp.h:133
iterator
ripple::perf::PerfLogImp::Counters::Rpc::finished
std::uint64_t finished
Definition: PerfLogImp.h:86
ripple::perf::make_PerfLog
std::unique_ptr< PerfLog > make_PerfLog(PerfLog::Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
Definition: PerfLogImp.cpp:501
std::lock_guard
STL class.
ripple::perf::PerfLogImp::Counters::Jq::started
std::uint64_t started
Definition: PerfLogImp.h:100
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::perf::PerfLogImp::Counters::methods_
std::unordered_map< std::uint64_t, MethodStart > methods_
Definition: PerfLogImp.h:113
std::function
ripple::perf::PerfLogImp::Counters::Rpc::errored
std::uint64_t errored
Definition: PerfLogImp.h:87
iostream
ripple::NetworkOPs::stateAccounting
virtual void stateAccounting(Json::Value &obj)=0
ripple::perf::PerfLogImp::hostname_
const std::string hostname_
Definition: PerfLogImp.h:135
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::perf::PerfLog::Setup
Configuration from [perf] section of rippled.cfg.
Definition: PerfLog.h:62
ripple::perf::PerfLogImp::jobStart
void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance) override
Log job executing.
Definition: PerfLogImp.cpp:395
ripple::get_if_exists
bool get_if_exists(Section const &section, std::string const &name, T &v)
Definition: BasicConfig.h:384
std::vector::push_back
T push_back(T... args)
stdexcept
ripple::perf::PerfLogImp::rotate_
bool rotate_
Definition: PerfLogImp.h:137
std::thread::joinable
T joinable(T... args)
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:882
std::thread
STL class.
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::perf::PerfLogImp::mutex_
std::mutex mutex_
Definition: PerfLogImp.h:132
ripple::perf::PerfLogImp::Counters::jobs_
std::vector< std::pair< JobType, steady_time_point > > jobs_
Definition: PerfLogImp.h:111
ripple::perf::PerfLogImp::Counters::jq_
std::unordered_map< JobType, Locked< Jq > > jq_
Definition: PerfLogImp.h:110
std::unique_lock
STL class.
std::to_string
T to_string(T... args)
ripple::jtINVALID
@ jtINVALID
Definition: Job.h:37
ripple::set
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
Definition: BasicConfig.h:313
ripple::JobTypes::name
static std::string const & name(JobType jt)
Definition: JobTypes.h:132
std::ofstream::close
T close(T... args)
std::chrono::time_point
cstdint
ripple::perf::PerfLogImp::thread_
std::thread thread_
Definition: PerfLogImp.h:131
ripple::ValStatus::current
@ current
This was a new validation and was added.
std::ofstream::open
T open(T... args)
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint64_t
atomic
ripple::perf::PerfLogImp::signalStop_
const std::function< void()> signalStop_
Definition: PerfLogImp.h:128
ripple::perf::PerfLogImp::Counters::rpc_
std::unordered_map< std::string, Locked< Rpc > > rpc_
Definition: PerfLogImp.h:109
ripple::perf::PerfLogImp::Counters::Jq
Job Queue task performance counters.
Definition: PerfLogImp.h:95
ripple::perf::PerfLogImp::~PerfLogImp
~PerfLogImp() override
Definition: PerfLogImp.cpp:322
std::condition_variable::notify_one
T notify_one(T... args)
ripple::perf::setup_PerfLog
PerfLog::Setup setup_PerfLog(Section const &section, boost::filesystem::path const &configDir)
Definition: PerfLogImp.cpp:479
ripple::perf::PerfLogImp::Counters::Jq::queuedDuration
microseconds queuedDuration
Definition: PerfLogImp.h:103
beast::setCurrentThreadName
void setCurrentThreadName(std::string_view name)
Changes the name of the caller thread.
Definition: CurrentThreadName.cpp:119
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::perf::PerfLog::steady_time_point
std::chrono::time_point< steady_clock > steady_time_point
Definition: PerfLog.h:53
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
cstdlib
std::endl
T endl(T... args)
ripple::perf::PerfLogImp::stop_
bool stop_
Definition: PerfLogImp.h:136
ripple::perf::PerfLogImp::Counters::Rpc
RPC performance counters.
Definition: PerfLogImp.h:81
std::condition_variable::wait_until
T wait_until(T... args)
std
STL namespace.
ripple::perf::PerfLogImp::rpcEnd
void rpcEnd(std::string const &method, std::uint64_t const requestId, bool finish)
Definition: PerfLogImp.cpp:347
ripple::perf::PerfLogImp::rpcStart
void rpcStart(std::string const &method, std::uint64_t const requestId) override
Log start of RPC call.
Definition: PerfLogImp.cpp:328
ripple::NodeStore::Database::getCountsJson
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:378
ripple::perf::PerfLogImp::Counters::currentJson
Json::Value currentJson() const
Definition: PerfLogImp.cpp:173
std::chrono::microseconds::count
T count(T... args)
ripple::perf::PerfLogImp::run
void run()
Definition: PerfLogImp.cpp:256
optional
mutex
ripple::JobType
JobType
Definition: Job.h:35
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::perf::PerfLogImp::setup_
const Setup setup_
Definition: PerfLogImp.h:125
ripple::perf::PerfLogImp::stop
void stop() override
Definition: PerfLogImp.cpp:463
ripple::perf::PerfLogImp::app_
Application & app_
Definition: PerfLogImp.h:126
ripple::perf::PerfLogImp::Counters::Counters
Counters(std::vector< char const * > const &labels, JobTypes const &jobTypes)
Definition: PerfLogImp.cpp:45
ripple::perf::PerfLogImp::logFile_
std::ofstream logFile_
Definition: PerfLogImp.h:130
ripple::perf::PerfLogImp::counters_
Counters counters_
Definition: PerfLogImp.h:129
std::unique_ptr
STL class.
ripple::perf::PerfLogImp::Counters::Rpc::duration
microseconds duration
Definition: PerfLogImp.h:89
ripple::perf::PerfLogImp::jobQueue
void jobQueue(JobType const type) override
Log queued job.
Definition: PerfLogImp.cpp:382
unordered_map
ripple::perf::PerfLog::Setup::logInterval
milliseconds logInterval
Definition: PerfLog.h:66
ripple::perf::PerfLogImp::start
void start() override
Definition: PerfLogImp.cpp:456
ripple::JobTypes::size
Map::size_type size() const
Definition: JobTypes.h:156
std::ofstream::is_open
T is_open(T... args)
ripple::perf::PerfLogImp::resizeJobs
void resizeJobs(int const resize) override
Ensure enough room to store each currently executing job.
Definition: PerfLogImp.cpp:437
ripple::perf::PerfLogImp::Counters::countersJson
Json::Value countersJson() const
Definition: PerfLogImp.cpp:78
ripple::perf::PerfLogImp::report
void report()
Definition: PerfLogImp.cpp:281
std::thread::join
T join(T... args)
Json::Value
Represents a JSON value.
Definition: json_value.h:145
string
std::chrono::steady_clock::now
T now(T... args)