rippled
RPCSub.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/Log.h>
21 #include <ripple/basics/StringUtilities.h>
22 #include <ripple/basics/contract.h>
23 #include <ripple/json/to_string.h>
24 #include <ripple/net/RPCCall.h>
25 #include <ripple/net/RPCSub.h>
26 #include <deque>
27 
28 namespace ripple {
29 
30 // Subscription object for JSON-RPC
31 class RPCSubImp : public RPCSub
32 {
33 public:
35  InfoSub::Source& source,
36  boost::asio::io_service& io_service,
37  JobQueue& jobQueue,
38  std::string const& strUrl,
39  std::string const& strUsername,
40  std::string const& strPassword,
41  Logs& logs)
42  : RPCSub(source)
43  , m_io_service(io_service)
44  , m_jobQueue(jobQueue)
45  , mUrl(strUrl)
46  , mSSL(false)
47  , mUsername(strUsername)
48  , mPassword(strPassword)
49  , mSending(false)
50  , j_(logs.journal("RPCSub"))
51  , logs_(logs)
52  {
53  parsedURL pUrl;
54 
55  if (!parseUrl(pUrl, strUrl))
56  Throw<std::runtime_error>("Failed to parse url.");
57  else if (pUrl.scheme == "https")
58  mSSL = true;
59  else if (pUrl.scheme != "http")
60  Throw<std::runtime_error>("Only http and https is supported.");
61 
62  mSeq = 1;
63 
64  mIp = pUrl.domain;
65  mPort = (!pUrl.port) ? (mSSL ? 443 : 80) : *pUrl.port;
66  mPath = pUrl.path;
67 
68  JLOG(j_.info()) << "RPCCall::fromNetwork sub: ip=" << mIp
69  << " port=" << mPort
70  << " ssl= " << (mSSL ? "yes" : "no") << " path='"
71  << mPath << "'";
72  }
73 
74  ~RPCSubImp() = default;
75 
76  void
77  send(Json::Value const& jvObj, bool broadcast) override
78  {
80 
81  if (mDeque.size() >= eventQueueMax)
82  {
83  // Drop the previous event.
84  JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
85  mDeque.pop_back();
86  }
87 
88  auto jm = broadcast ? j_.debug() : j_.info();
89  JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
90 
92 
93  if (!mSending)
94  {
95  // Start a sending thread.
96  JLOG(j_.info()) << "RPCCall::fromNetwork start";
97 
99  jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
100  sendThread();
101  });
102  }
103  }
104 
105  void
106  setUsername(std::string const& strUsername) override
107  {
109 
110  mUsername = strUsername;
111  }
112 
113  void
114  setPassword(std::string const& strPassword) override
115  {
117 
118  mPassword = strPassword;
119  }
120 
121 private:
122  // XXX Could probably create a bunch of send jobs in a single get of the
123  // lock.
124  void
126  {
127  Json::Value jvEvent;
128  bool bSend;
129 
130  do
131  {
132  {
133  // Obtain the lock to manipulate the queue and change sending.
135 
136  if (mDeque.empty())
137  {
138  mSending = false;
139  bSend = false;
140  }
141  else
142  {
143  auto const [seq, env] = mDeque.front();
144 
145  mDeque.pop_front();
146 
147  jvEvent = env;
148  jvEvent["seq"] = seq;
149 
150  bSend = true;
151  }
152  }
153 
154  // Send outside of the lock.
155  if (bSend)
156  {
157  // XXX Might not need this in a try.
158  try
159  {
160  JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
161 
163  m_io_service,
164  mIp,
165  mPort,
166  mUsername,
167  mPassword,
168  mPath,
169  "event",
170  jvEvent,
171  mSSL,
172  true,
173  logs_);
174  }
175  catch (const std::exception& e)
176  {
177  JLOG(j_.info())
178  << "RPCCall::fromNetwork exception: " << e.what();
179  }
180  }
181  } while (bSend);
182  }
183 
184 private:
185  enum { eventQueueMax = 32 };
186 
187  boost::asio::io_service& m_io_service;
189 
193  bool mSSL;
197 
198  int mSeq; // Next id to allocate.
199 
200  bool mSending; // Sending threead is active.
201 
203 
206 };
207 
208 //------------------------------------------------------------------------------
209 
211 {
212 }
213 
216  InfoSub::Source& source,
217  boost::asio::io_service& io_service,
218  JobQueue& jobQueue,
219  std::string const& strUrl,
220  std::string const& strUsername,
221  std::string const& strPassword,
222  Logs& logs)
223 {
224  return std::make_shared<RPCSubImp>(
225  std::ref(source),
226  std::ref(io_service),
227  std::ref(jobQueue),
228  strUrl,
229  strUsername,
230  strPassword,
231  logs);
232 }
233 
234 } // namespace ripple
ripple::RPCSubImp::mDeque
std::deque< std::pair< int, Json::Value > > mDeque
Definition: RPCSub.cpp:202
ripple::RPCSubImp::mUrl
std::string mUrl
Definition: RPCSub.cpp:190
ripple::RPCSubImp::mPassword
std::string mPassword
Definition: RPCSub.cpp:195
ripple::jtCLIENT_SUBSCRIBE
@ jtCLIENT_SUBSCRIBE
Definition: Job.h:46
ripple::RPCSub::RPCSub
RPCSub(InfoSub::Source &source)
Definition: RPCSub.cpp:210
std::string
STL class.
std::shared_ptr
STL class.
ripple::RPCSub
Subscription object for JSON RPC.
Definition: RPCSub.h:30
std::exception
STL class.
ripple::Logs
Manages partitions for logging.
Definition: Log.h:48
ripple::parsedURL
Definition: StringUtilities.h:116
ripple::RPCSubImp::m_io_service
boost::asio::io_service & m_io_service
Definition: RPCSub.cpp:187
std::deque::pop_front
T pop_front(T... args)
ripple::RPCSubImp::mPath
std::string mPath
Definition: RPCSub.cpp:196
ripple::RPCSubImp::~RPCSubImp
~RPCSubImp()=default
ripple::InfoSub
Manages a client's subscription to data feeds.
Definition: InfoSub.h:51
ripple::RPCSubImp::mPort
std::uint16_t mPort
Definition: RPCSub.cpp:192
std::deque::size
T size(T... args)
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::lock_guard
STL class.
ripple::RPCSubImp::eventQueueMax
@ eventQueueMax
Definition: RPCSub.cpp:185
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::parsedURL::path
std::string path
Definition: StringUtilities.h:125
ripple::RPCSubImp::mSSL
bool mSSL
Definition: RPCSub.cpp:193
ripple::RPCSubImp::logs_
Logs & logs_
Definition: RPCSub.cpp:205
std::deque::front
T front(T... args)
ripple::RPCSubImp::setPassword
void setPassword(std::string const &strPassword) override
Definition: RPCSub.cpp:114
std::deque::push_back
T push_back(T... args)
ripple::RPCSubImp::setUsername
void setUsername(std::string const &strUsername) override
Definition: RPCSub.cpp:106
ripple::InfoSub::mLock
std::mutex mLock
Definition: InfoSub.h:233
ripple::RPCSubImp
Definition: RPCSub.cpp:31
ripple::RPCSubImp::send
void send(Json::Value const &jvObj, bool broadcast) override
Definition: RPCSub.cpp:77
ripple::RPCSubImp::mSeq
int mSeq
Definition: RPCSub.cpp:198
ripple::parsedURL::port
std::optional< std::uint16_t > port
Definition: StringUtilities.h:124
ripple::InfoSub::Source
Abstracts the source of subscription data.
Definition: InfoSub.h:67
ripple::parseUrl
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
Definition: StringUtilities.cpp:47
beast::Journal::info
Stream info() const
Definition: Journal.h:321
deque
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint16_t
ripple::RPCSubImp::m_jobQueue
JobQueue & m_jobQueue
Definition: RPCSub.cpp:188
ripple::RPCSubImp::mSending
bool mSending
Definition: RPCSub.cpp:200
ripple::RPCSubImp::mIp
std::string mIp
Definition: RPCSub.cpp:191
std::deque::pop_back
T pop_back(T... args)
ripple::make_RPCSub
std::shared_ptr< RPCSub > make_RPCSub(InfoSub::Source &source, boost::asio::io_service &io_service, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition: RPCSub.cpp:215
ripple::JobQueue
A pool of threads to perform work.
Definition: JobQueue.h:55
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::RPCSubImp::mUsername
std::string mUsername
Definition: RPCSub.cpp:194
std::deque::empty
T empty(T... args)
ripple::parsedURL::scheme
std::string scheme
Definition: StringUtilities.h:120
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::make_pair
T make_pair(T... args)
ripple::parsedURL::domain
std::string domain
Definition: StringUtilities.h:123
ripple::RPCCall::fromNetwork
void fromNetwork(boost::asio::io_service &io_service, std::string const &strIp, const std::uint16_t iPort, std::string const &strUsername, std::string const &strPassword, std::string const &strPath, std::string const &strMethod, Json::Value const &jvParams, const bool bSSL, const bool quiet, Logs &logs, std::function< void(Json::Value const &jvInput)> callbackFuncP, std::unordered_map< std::string, std::string > headers)
Definition: RPCCall.cpp:1685
ripple::RPCSubImp::j_
const beast::Journal j_
Definition: RPCSub.cpp:204
ripple::RPCSubImp::RPCSubImp
RPCSubImp(InfoSub::Source &source, boost::asio::io_service &io_service, JobQueue &jobQueue, std::string const &strUrl, std::string const &strUsername, std::string const &strPassword, Logs &logs)
Definition: RPCSub.cpp:34
std::ref
T ref(T... args)
std::exception::what
T what(T... args)
ripple::RPCSubImp::sendThread
void sendThread()
Definition: RPCSub.cpp:125
Json::Value
Represents a JSON value.
Definition: json_value.h:145