rippled
ConnectAttempt.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/json/json_reader.h>
21 #include <ripple/overlay/Cluster.h>
22 #include <ripple/overlay/impl/ConnectAttempt.h>
23 #include <ripple/overlay/impl/PeerImp.h>
24 #include <ripple/overlay/impl/ProtocolVersion.h>
25 
26 namespace ripple {
27 
29  Application& app,
30  boost::asio::io_service& io_service,
31  endpoint_type const& remote_endpoint,
32  Resource::Consumer usage,
33  shared_context const& context,
34  std::uint32_t id,
36  beast::Journal journal,
37  OverlayImpl& overlay)
38  : Child(overlay)
39  , app_(app)
40  , id_(id)
41  , sink_(journal, OverlayImpl::makePrefix(id))
42  , journal_(sink_)
43  , remote_endpoint_(remote_endpoint)
44  , usage_(usage)
45  , strand_(io_service)
46  , timer_(io_service)
47  , stream_ptr_(std::make_unique<stream_type>(
48  socket_type(std::forward<boost::asio::io_service&>(io_service)),
49  *context))
50  , socket_(stream_ptr_->next_layer().socket())
51  , stream_(*stream_ptr_)
52  , slot_(slot)
53 {
54  JLOG(journal_.debug()) << "Connect " << remote_endpoint;
55 }
56 
58 {
59  if (slot_ != nullptr)
61  JLOG(journal_.trace()) << "~ConnectAttempt";
62 }
63 
64 void
66 {
67  if (!strand_.running_in_this_thread())
68  return strand_.post(
70  if (socket_.is_open())
71  {
72  JLOG(journal_.debug()) << "Stop";
73  }
74  close();
75 }
76 
77 void
79 {
80  stream_.next_layer().async_connect(
82  strand_.wrap(std::bind(
85  std::placeholders::_1)));
86 }
87 
88 //------------------------------------------------------------------------------
89 
90 void
92 {
93  assert(strand_.running_in_this_thread());
94  if (socket_.is_open())
95  {
96  error_code ec;
97  timer_.cancel(ec);
98  socket_.close(ec);
99  JLOG(journal_.debug()) << "Closed";
100  }
101 }
102 
103 void
105 {
106  JLOG(journal_.debug()) << reason;
107  close();
108 }
109 
110 void
112 {
113  JLOG(journal_.debug()) << name << ": " << ec.message();
114  close();
115 }
116 
117 void
119 {
120  error_code ec;
121  timer_.expires_from_now(std::chrono::seconds(15), ec);
122  if (ec)
123  {
124  JLOG(journal_.error()) << "setTimer: " << ec.message();
125  return;
126  }
127 
128  timer_.async_wait(strand_.wrap(std::bind(
129  &ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));
130 }
131 
132 void
134 {
135  error_code ec;
136  timer_.cancel(ec);
137 }
138 
139 void
141 {
142  if (!socket_.is_open())
143  return;
144  if (ec == boost::asio::error::operation_aborted)
145  return;
146  if (ec)
147  {
148  // This should never happen
149  JLOG(journal_.error()) << "onTimer: " << ec.message();
150  return close();
151  }
152  fail("Timeout");
153 }
154 
155 void
157 {
158  cancelTimer();
159 
160  if (ec == boost::asio::error::operation_aborted)
161  return;
162  endpoint_type local_endpoint;
163  if (!ec)
164  local_endpoint = socket_.local_endpoint(ec);
165  if (ec)
166  return fail("onConnect", ec);
167  if (!socket_.is_open())
168  return;
169  JLOG(journal_.trace()) << "onConnect";
170 
171  setTimer();
172  stream_.set_verify_mode(boost::asio::ssl::verify_none);
173  stream_.async_handshake(
174  boost::asio::ssl::stream_base::client,
175  strand_.wrap(std::bind(
178  std::placeholders::_1)));
179 }
180 
181 void
183 {
184  cancelTimer();
185  if (!socket_.is_open())
186  return;
187  if (ec == boost::asio::error::operation_aborted)
188  return;
189  endpoint_type local_endpoint;
190  if (!ec)
191  local_endpoint = socket_.local_endpoint(ec);
192  if (ec)
193  return fail("onHandshake", ec);
194  JLOG(journal_.trace()) << "onHandshake";
195 
198  return fail("Duplicate connection");
199 
200  auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
201  if (!sharedValue)
202  return close(); // makeSharedValue logs
203 
204  req_ = makeRequest(
210 
212  req_,
213  *sharedValue,
214  overlay_.setup().networkID,
215  overlay_.setup().public_ip,
216  remote_endpoint_.address(),
217  app_);
218 
219  setTimer();
220  boost::beast::http::async_write(
221  stream_,
222  req_,
223  strand_.wrap(std::bind(
226  std::placeholders::_1)));
227 }
228 
229 void
231 {
232  cancelTimer();
233  if (!socket_.is_open())
234  return;
235  if (ec == boost::asio::error::operation_aborted)
236  return;
237  if (ec)
238  return fail("onWrite", ec);
239  boost::beast::http::async_read(
240  stream_,
241  read_buf_,
242  response_,
243  strand_.wrap(std::bind(
246  std::placeholders::_1)));
247 }
248 
249 void
251 {
252  cancelTimer();
253 
254  if (!socket_.is_open())
255  return;
256  if (ec == boost::asio::error::operation_aborted)
257  return;
258  if (ec == boost::asio::error::eof)
259  {
260  JLOG(journal_.info()) << "EOF";
261  setTimer();
262  return stream_.async_shutdown(strand_.wrap(std::bind(
265  std::placeholders::_1)));
266  }
267  if (ec)
268  return fail("onRead", ec);
269  processResponse();
270 }
271 
272 void
274 {
275  cancelTimer();
276  if (!ec)
277  {
278  JLOG(journal_.error()) << "onShutdown: expected error condition";
279  return close();
280  }
281  if (ec != boost::asio::error::eof)
282  return fail("onShutdown", ec);
283  close();
284 }
285 
286 //--------------------------------------------------------------------------
287 
288 void
290 {
291  if (response_.result() == boost::beast::http::status::service_unavailable)
292  {
293  Json::Value json;
294  Json::Reader r;
295  std::string s;
296  s.reserve(boost::asio::buffer_size(response_.body().data()));
297  for (auto const buffer : response_.body().data())
298  s.append(
299  boost::asio::buffer_cast<char const*>(buffer),
300  boost::asio::buffer_size(buffer));
301  auto const success = r.parse(s, json);
302  if (success)
303  {
304  if (json.isObject() && json.isMember("peer-ips"))
305  {
306  Json::Value const& ips = json["peer-ips"];
307  if (ips.isArray())
308  {
310  eps.reserve(ips.size());
311  for (auto const& v : ips)
312  {
313  if (v.isString())
314  {
315  error_code ec;
316  auto const ep = parse_endpoint(v.asString(), ec);
317  if (!ec)
318  eps.push_back(ep);
319  }
320  }
322  }
323  }
324  }
325  }
326 
328  {
329  JLOG(journal_.info())
330  << "Unable to upgrade to peer protocol: " << response_.result()
331  << " (" << response_.reason() << ")";
332  return close();
333  }
334 
335  // Just because our peer selected a particular protocol version doesn't
336  // mean that it's acceptable to us. Check that it is:
337  std::optional<ProtocolVersion> negotiatedProtocol;
338 
339  {
340  auto const pvs = parseProtocolVersions(response_["Upgrade"]);
341 
342  if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
343  negotiatedProtocol = pvs[0];
344 
345  if (!negotiatedProtocol)
346  return fail(
347  "processResponse: Unable to negotiate protocol version");
348  }
349 
350  auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
351  if (!sharedValue)
352  return close(); // makeSharedValue logs
353 
354  try
355  {
356  auto publicKey = verifyHandshake(
357  response_,
358  *sharedValue,
359  overlay_.setup().networkID,
360  overlay_.setup().public_ip,
361  remote_endpoint_.address(),
362  app_);
363 
364  JLOG(journal_.info())
365  << "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
366 
367  JLOG(journal_.debug())
368  << "Protocol: " << to_string(*negotiatedProtocol);
369 
370  auto const member = app_.cluster().member(publicKey);
371  if (member)
372  {
373  JLOG(journal_.info()) << "Cluster name: " << *member;
374  }
375 
376  auto const result = overlay_.peerFinder().activate(
377  slot_, publicKey, static_cast<bool>(member));
378  if (result != PeerFinder::Result::success)
379  return fail("Outbound slots full");
380 
381  auto const peer = std::make_shared<PeerImp>(
382  app_,
383  std::move(stream_ptr_),
384  read_buf_.data(),
385  std::move(slot_),
386  std::move(response_),
387  usage_,
388  publicKey,
389  *negotiatedProtocol,
390  id_,
391  overlay_);
392 
393  overlay_.add_active(peer);
394  }
395  catch (std::exception const& e)
396  {
397  return fail(std::string("Handshake failure (") + e.what() + ")");
398  }
399 }
400 
401 } // namespace ripple
ripple::ConnectAttempt::onHandshake
void onHandshake(error_code ec)
Definition: ConnectAttempt.cpp:182
ripple::Application
Definition: Application.h:115
ripple::ConnectAttempt::cancelTimer
void cancelTimer()
Definition: ConnectAttempt.cpp:133
ripple::ConnectAttempt::remote_endpoint_
endpoint_type remote_endpoint_
Definition: ConnectAttempt.h:52
ripple::Application::cluster
virtual Cluster & cluster()=0
std::bind
T bind(T... args)
Json::Value::isObject
bool isObject() const
Definition: json_value.cpp:1027
std::string
STL class.
std::shared_ptr< boost::asio::ssl::context >
std::exception
STL class.
ripple::ConnectAttempt::strand_
boost::asio::io_service::strand strand_
Definition: ConnectAttempt.h:54
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::OverlayImpl::Child::overlay_
OverlayImpl & overlay_
Definition: OverlayImpl.h:64
ripple::Config::LEDGER_REPLAY
bool LEDGER_REPLAY
Definition: Config.h:228
std::string::reserve
T reserve(T... args)
std::vector
STL class.
std::chrono::seconds
ripple::ConnectAttempt::journal_
const beast::Journal journal_
Definition: ConnectAttempt.h:51
ripple::toBase58
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition: AccountID.cpp:104
ripple::parseProtocolVersions
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
Definition: ProtocolVersion.cpp:78
ripple::makeSharedValue
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
Definition: Handshake.cpp:145
ripple::Cluster::member
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition: Cluster.cpp:39
boost
Definition: IPAddress.h:103
ripple::ConnectAttempt::processResponse
void processResponse()
Definition: ConnectAttempt.cpp:289
Json::Reader
Unserialize a JSON document into a Value.
Definition: json_reader.h:36
ripple::OverlayImpl::setup
Setup const & setup() const
Definition: OverlayImpl.h:176
ripple::ConnectAttempt::socket_type
boost::asio::ip::tcp::socket socket_type
Definition: ConnectAttempt.h:43
ripple::ConnectAttempt::response_
response_type response_
Definition: ConnectAttempt.h:60
ripple::PeerFinder::Config::peerPrivate
bool peerPrivate
true if we want our IP address kept private.
Definition: PeerfinderManager.h:61
ripple::ConnectAttempt::onRead
void onRead(error_code ec)
Definition: ConnectAttempt.cpp:250
ripple::ConnectAttempt::setTimer
void setTimer()
Definition: ConnectAttempt.cpp:118
ripple::ConnectAttempt::slot_
std::shared_ptr< PeerFinder::Slot > slot_
Definition: ConnectAttempt.h:61
ripple::PeerFinder::Manager::activate
virtual Result activate(std::shared_ptr< Slot > const &slot, PublicKey const &key, bool reserved)=0
Request an active slot type.
std::vector::push_back
T push_back(T... args)
beast::IPAddressConversion::from_asio
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Definition: IPAddressConversion.h:63
ripple::PeerFinder::Manager::on_closed
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
ripple::PeerFinder::Result::success
@ success
ripple::OverlayImpl::peerFinder
PeerFinder::Manager & peerFinder()
Definition: OverlayImpl.h:164
ripple::ConnectAttempt::id_
const std::uint32_t id_
Definition: ConnectAttempt.h:49
ripple::ConnectAttempt::socket_
socket_type & socket_
Definition: ConnectAttempt.h:57
ripple::ConnectAttempt::error_code
boost::system::error_code error_code
Definition: ConnectAttempt.h:33
std::enable_shared_from_this< ConnectAttempt >::shared_from_this
T shared_from_this(T... args)
ripple::Application::config
virtual Config & config()=0
ripple::ConnectAttempt::onShutdown
void onShutdown(error_code ec)
Definition: ConnectAttempt.cpp:273
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
Json::Value::size
UInt size() const
Number of values in array or object.
Definition: json_value.cpp:706
ripple::ConnectAttempt::usage_
Resource::Consumer usage_
Definition: ConnectAttempt.h:53
Json::Value::isMember
bool isMember(const char *key) const
Return true if the object has a member named key.
Definition: json_value.cpp:932
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint32_t
ripple::ConnectAttempt::stop
void stop() override
Definition: ConnectAttempt.cpp:65
ripple::ConnectAttempt::endpoint_type
boost::asio::ip::tcp::endpoint endpoint_type
Definition: ConnectAttempt.h:35
std::string::append
T append(T... args)
ripple::ConnectAttempt::app_
Application & app_
Definition: ConnectAttempt.h:48
ripple::ConnectAttempt::fail
void fail(std::string const &reason)
Definition: ConnectAttempt.cpp:104
Json::Value::isArray
bool isArray() const
Definition: json_value.cpp:1015
ripple::Config::VP_REDUCE_RELAY_ENABLE
bool VP_REDUCE_RELAY_ENABLE
Definition: Config.h:253
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::PeerFinder::Manager::onConnected
virtual bool onConnected(std::shared_ptr< Slot > const &slot, beast::IP::Endpoint const &local_endpoint)=0
Called when an outbound connection attempt succeeds.
ripple::PeerFinder::Manager::onRedirects
virtual void onRedirects(boost::asio::ip::tcp::endpoint const &remote_address, std::vector< boost::asio::ip::tcp::endpoint > const &eps)=0
Called when we received redirect IPs from a busy peer.
ripple::ConnectAttempt::stream_type
boost::beast::ssl_stream< middle_type > stream_type
Definition: ConnectAttempt.h:45
ripple::PeerFinder::Manager::config
virtual Config config()=0
Returns the configuration for the manager.
std
STL namespace.
ripple::ConnectAttempt::stream_ptr_
std::unique_ptr< stream_type > stream_ptr_
Definition: ConnectAttempt.h:56
Json::Reader::parse
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Definition: json_reader.cpp:74
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::ConnectAttempt::run
void run()
Definition: ConnectAttempt.cpp:78
ripple::ConnectAttempt::timer_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
Definition: ConnectAttempt.h:55
ripple::ConnectAttempt::onConnect
void onConnect(error_code ec)
Definition: ConnectAttempt.cpp:156
ripple::Config::COMPRESSION
bool COMPRESSION
Definition: Config.h:225
ripple::isProtocolSupported
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
Definition: ProtocolVersion.cpp:174
ripple::TokenType::NodePublic
@ NodePublic
std::optional
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::ConnectAttempt::stream_
stream_type & stream_
Definition: ConnectAttempt.h:58
ripple::ConnectAttempt::req_
request_type req_
Definition: ConnectAttempt.h:62
ripple::ConnectAttempt::close
void close()
Definition: ConnectAttempt.cpp:91
ripple::Config::TX_REDUCE_RELAY_ENABLE
bool TX_REDUCE_RELAY_ENABLE
Definition: Config.h:264
ripple::verifyHandshake
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
Definition: Handshake.cpp:226
ripple::OverlayImpl
Definition: OverlayImpl.h:58
ripple::ConnectAttempt::ConnectAttempt
ConnectAttempt(Application &app, boost::asio::io_service &io_service, endpoint_type const &remote_endpoint, Resource::Consumer usage, shared_context const &context, std::uint32_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
Definition: ConnectAttempt.cpp:28
ripple::makeRequest
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
Definition: Handshake.cpp:365
ripple::ConnectAttempt::~ConnectAttempt
~ConnectAttempt()
Definition: ConnectAttempt.cpp:57
ripple::ConnectAttempt::parse_endpoint
static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const &s, boost::system::error_code &ec)
Definition: ConnectAttempt.h:112
ripple::OverlayImpl::isPeerUpgrade
static bool isPeerUpgrade(http_request_type const &request)
Definition: OverlayImpl.cpp:326
ripple::OverlayImpl::add_active
void add_active(std::shared_ptr< PeerImp > const &peer)
Definition: OverlayImpl.cpp:428
std::exception::what
T what(T... args)
ripple::ConnectAttempt::read_buf_
boost::beast::multi_buffer read_buf_
Definition: ConnectAttempt.h:59
ripple::ConnectAttempt::onTimer
void onTimer(error_code ec)
Definition: ConnectAttempt.cpp:140
ripple::ConnectAttempt::onWrite
void onWrite(error_code ec)
Definition: ConnectAttempt.cpp:230
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::buildHandshake
void buildHandshake(boost::beast::http::fields &h, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
Definition: Handshake.cpp:177