rippled
WSClient.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2016 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/json/json_reader.h>
21 #include <ripple/json/to_string.h>
22 #include <ripple/protocol/jss.h>
23 #include <ripple/server/Port.h>
24 #include <boost/beast/core/multi_buffer.hpp>
25 #include <boost/beast/websocket.hpp>
26 #include <test/jtx.h>
27 #include <test/jtx/WSClient.h>
28 
29 #include <condition_variable>
30 #include <string>
31 #include <unordered_map>
32 
33 #include <iostream>
34 
35 #include <ripple/beast/unit_test.h>
36 
37 namespace ripple {
38 namespace test {
39 
40 class WSClientImpl : public WSClient
41 {
42  using error_code = boost::system::error_code;
43 
44  struct msg
45  {
47 
48  explicit msg(Json::Value&& jv_) : jv(jv_)
49  {
50  }
51  };
52 
53  static boost::asio::ip::tcp::endpoint
54  getEndpoint(BasicConfig const& cfg, bool v2)
55  {
56  auto& log = std::cerr;
57  ParsedPort common;
58  parse_Port(common, cfg["server"], log);
59  auto const ps = v2 ? "ws2" : "ws";
60  for (auto const& name : cfg.section("server").values())
61  {
62  if (!cfg.exists(name))
63  continue;
64  ParsedPort pp;
65  parse_Port(pp, cfg[name], log);
66  if (pp.protocol.count(ps) == 0)
67  continue;
68  using namespace boost::asio::ip;
69  if (pp.ip && pp.ip->is_unspecified())
70  *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
71  : address{address_v4::loopback()};
72  return {*pp.ip, *pp.port};
73  }
74  Throw<std::runtime_error>("Missing WebSocket port");
75  return {}; // Silence compiler control paths return value warning
76  }
77 
78  template <class ConstBuffers>
79  static std::string
80  buffer_string(ConstBuffers const& b)
81  {
82  using boost::asio::buffer;
83  using boost::asio::buffer_size;
84  std::string s;
85  s.resize(buffer_size(b));
86  buffer_copy(buffer(&s[0], s.size()), b);
87  return s;
88  }
89 
90  boost::asio::io_service ios_;
92  boost::asio::io_service::strand strand_;
94  boost::asio::ip::tcp::socket stream_;
95  boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
96  boost::beast::multi_buffer rb_;
97 
98  bool peerClosed_ = false;
99 
100  // synchronize destructor
101  bool b0_ = false;
104 
105  // synchronize message queue
109 
110  unsigned rpc_version_;
111 
112  void
114  {
115  ios_.post(strand_.wrap([this] {
116  if (!peerClosed_)
117  {
118  ws_.async_close({}, strand_.wrap([&](error_code ec) {
119  stream_.cancel(ec);
120  }));
121  }
122  }));
123  work_ = std::nullopt;
124  thread_.join();
125  }
126 
127 public:
129  Config const& cfg,
130  bool v2,
131  unsigned rpc_version,
133  : work_(ios_)
134  , strand_(ios_)
135  , thread_([&] { ios_.run(); })
136  , stream_(ios_)
137  , ws_(stream_)
138  , rpc_version_(rpc_version)
139  {
140  try
141  {
142  auto const ep = getEndpoint(cfg, v2);
143  stream_.connect(ep);
144  ws_.set_option(boost::beast::websocket::stream_base::decorator(
145  [&](boost::beast::websocket::request_type& req) {
146  for (auto const& h : headers)
147  req.set(h.first, h.second);
148  }));
149  ws_.handshake(
150  ep.address().to_string() + ":" + std::to_string(ep.port()),
151  "/");
152  ws_.async_read(
153  rb_,
154  strand_.wrap(std::bind(
155  &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
156  }
157  catch (std::exception&)
158  {
159  cleanup();
160  Rethrow();
161  }
162  }
163 
164  ~WSClientImpl() override
165  {
166  cleanup();
167  }
168 
170  invoke(std::string const& cmd, Json::Value const& params) override
171  {
172  using boost::asio::buffer;
173  using namespace std::chrono_literals;
174 
175  {
176  Json::Value jp;
177  if (params)
178  jp = params;
179  if (rpc_version_ == 2)
180  {
181  jp[jss::method] = cmd;
182  jp[jss::jsonrpc] = "2.0";
183  jp[jss::ripplerpc] = "2.0";
184  jp[jss::id] = 5;
185  }
186  else
187  jp[jss::command] = cmd;
188  auto const s = to_string(jp);
189  ws_.write_some(true, buffer(s));
190  }
191 
192  auto jv = findMsg(5s, [&](Json::Value const& jval) {
193  return jval[jss::type] == jss::response;
194  });
195  if (jv)
196  {
197  // Normalize JSON output
198  jv->removeMember(jss::type);
199  if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
200  {
201  Json::Value ret;
202  ret[jss::result] = *jv;
203  if ((*jv).isMember(jss::error))
204  ret[jss::error] = (*jv)[jss::error];
205  ret[jss::status] = jss::error;
206  return ret;
207  }
208  if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
209  (*jv)[jss::result][jss::status] = (*jv)[jss::status];
210  return *jv;
211  }
212  return {};
213  }
214 
216  getMsg(std::chrono::milliseconds const& timeout) override
217  {
219  {
221  if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
222  return std::nullopt;
223  m = std::move(msgs_.back());
224  msgs_.pop_back();
225  }
226  return std::move(m->jv);
227  }
228 
231  std::chrono::milliseconds const& timeout,
232  std::function<bool(Json::Value const&)> pred) override
233  {
235  {
237  if (!cv_.wait_for(lock, timeout, [&] {
238  for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
239  {
240  if (pred((*it)->jv))
241  {
242  m = std::move(*it);
243  msgs_.erase(it);
244  return true;
245  }
246  }
247  return false;
248  }))
249  {
250  return std::nullopt;
251  }
252  }
253  return std::move(m->jv);
254  }
255 
256  unsigned
257  version() const override
258  {
259  return rpc_version_;
260  }
261 
262 private:
263  void
265  {
266  if (ec)
267  {
268  if (ec == boost::beast::websocket::error::closed)
269  peerClosed_ = true;
270  return;
271  }
272 
273  Json::Value jv;
274  Json::Reader jr;
275  jr.parse(buffer_string(rb_.data()), jv);
276  rb_.consume(rb_.size());
277  auto m = std::make_shared<msg>(std::move(jv));
278  {
279  std::lock_guard lock(m_);
280  msgs_.push_front(m);
281  cv_.notify_all();
282  }
283  ws_.async_read(
284  rb_,
285  strand_.wrap(std::bind(
286  &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
287  }
288 
289  // Called when the read op terminates
290  void
292  {
293  std::lock_guard lock(m0_);
294  b0_ = true;
295  cv0_.notify_all();
296  }
297 };
298 
301  Config const& cfg,
302  bool v2,
303  unsigned rpc_version,
305 {
306  return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
307 }
308 
309 } // namespace test
310 } // namespace ripple
ripple::test::WSClientImpl::version
unsigned version() const override
Get RPC 1.0 or RPC 2.0.
Definition: WSClient.cpp:257
ripple::test::WSClientImpl::cv_
std::condition_variable cv_
Definition: WSClient.cpp:107
std::string::resize
T resize(T... args)
ripple::test::WSClientImpl::ios_
boost::asio::io_service ios_
Definition: WSClient.cpp:90
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr
STL class.
std::exception
STL class.
std::list
STL class.
ripple::test::WSClientImpl::getMsg
std::optional< Json::Value > getMsg(std::chrono::milliseconds const &timeout) override
Retrieve a message.
Definition: WSClient.cpp:216
ripple::test::WSClientImpl::error_code
boost::system::error_code error_code
Definition: WSClient.cpp:42
std::string::size
T size(T... args)
ripple::test::WSClientImpl::msg::msg
msg(Json::Value &&jv_)
Definition: WSClient.cpp:48
std::chrono::milliseconds
ripple::test::WSClientImpl::on_read_msg
void on_read_msg(error_code const &ec)
Definition: WSClient.cpp:264
std::lock_guard
STL class.
std::cerr
ripple::test::WSClientImpl::stream_
boost::asio::ip::tcp::socket stream_
Definition: WSClient.cpp:94
ripple::test::WSClientImpl::msg::jv
Json::Value jv
Definition: WSClient.cpp:46
ripple::test::WSClientImpl::invoke
Json::Value invoke(std::string const &cmd, Json::Value const &params) override
Submit a command synchronously.
Definition: WSClient.cpp:170
std::function
ripple::parse_Port
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition: Port.cpp:199
Json::Reader
Unserialize a JSON document into a Value.
Definition: json_reader.h:36
ripple::test::WSClientImpl::buffer_string
static std::string buffer_string(ConstBuffers const &b)
Definition: WSClient.cpp:80
ripple::ParsedPort
Definition: Port.h:96
iostream
ripple::test::WSClientImpl::b0_
bool b0_
Definition: WSClient.cpp:101
ripple::test::WSClientImpl::ws_
boost::beast::websocket::stream< boost::asio::ip::tcp::socket & > ws_
Definition: WSClient.cpp:95
ripple::test::WSClientImpl::m0_
std::mutex m0_
Definition: WSClient.cpp:102
ripple::Section::values
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition: BasicConfig.h:77
ripple::test::WSClientImpl
Definition: WSClient.cpp:40
ripple::test::WSClientImpl::on_read_done
void on_read_done()
Definition: WSClient.cpp:291
std::thread
STL class.
ripple::Config
Definition: Config.h:89
ripple::test::WSClientImpl::~WSClientImpl
~WSClientImpl() override
Definition: WSClient.cpp:164
ripple::Rethrow
void Rethrow()
Rethrow the exception currently being handled.
Definition: contract.h:48
std::unique_lock
STL class.
std::to_string
T to_string(T... args)
ripple::test::WSClientImpl::msgs_
std::list< std::shared_ptr< msg > > msgs_
Definition: WSClient.cpp:108
ripple::test::WSClientImpl::peerClosed_
bool peerClosed_
Definition: WSClient.cpp:98
std::condition_variable::wait_for
T wait_for(T... args)
ripple::test::WSClientImpl::work_
std::optional< boost::asio::io_service::work > work_
Definition: WSClient.cpp:91
ripple::test::WSClient
Definition: WSClient.h:33
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::ParsedPort::ip
std::optional< boost::asio::ip::address > ip
Definition: Port.h:114
ripple::test::WSClientImpl::rpc_version_
unsigned rpc_version_
Definition: WSClient.cpp:110
ripple::ParsedPort::port
std::optional< std::uint16_t > port
Definition: Port.h:115
Json::Reader::parse
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:74
condition_variable
std::set::count
T count(T... args)
ripple::test::makeWSClient
std::unique_ptr< WSClient > makeWSClient(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers)
Returns a client operating through WebSockets/S.
Definition: WSClient.cpp:300
ripple::test::WSClientImpl::msg
Definition: WSClient.cpp:44
ripple::test::WSClientImpl::m_
std::mutex m_
Definition: WSClient.cpp:106
std::optional< boost::asio::io_service::work >
std::mutex
STL class.
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::test::WSClientImpl::rb_
boost::beast::multi_buffer rb_
Definition: WSClient.cpp:96
ripple::test::WSClientImpl::thread_
std::thread thread_
Definition: WSClient.cpp:93
ripple::ParsedPort::protocol
std::set< std::string, boost::beast::iless > protocol
Definition: Port.h:101
ripple::test::WSClientImpl::cv0_
std::condition_variable cv0_
Definition: WSClient.cpp:103
std::unique_ptr
STL class.
ripple::test::WSClientImpl::strand_
boost::asio::io_service::strand strand_
Definition: WSClient.cpp:92
ripple::test::WSClientImpl::WSClientImpl
WSClientImpl(Config const &cfg, bool v2, unsigned rpc_version, std::unordered_map< std::string, std::string > const &headers={})
Definition: WSClient.cpp:128
unordered_map
ripple::test::WSClientImpl::cleanup
void cleanup()
Definition: WSClient.cpp:113
std::condition_variable::notify_all
T notify_all(T... args)
ripple::BasicConfig
Holds unparsed configuration information.
Definition: BasicConfig.h:215
ripple::test::WSClientImpl::findMsg
std::optional< Json::Value > findMsg(std::chrono::milliseconds const &timeout, std::function< bool(Json::Value const &)> pred) override
Retrieve a message that meets the predicate criteria.
Definition: WSClient.cpp:230
std::thread::join
T join(T... args)
ripple::test::WSClientImpl::getEndpoint
static boost::asio::ip::tcp::endpoint getEndpoint(BasicConfig const &cfg, bool v2)
Definition: WSClient.cpp:54
ripple::BasicConfig::exists
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Definition: BasicConfig.cpp:121
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::BasicConfig::section
Section & section(std::string const &name)
Returns the section with the given name.
Definition: BasicConfig.cpp:127
string