rippled
ProtocolMessage.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
21 #define RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
22 
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/overlay/Compression.h>
25 #include <ripple/overlay/Message.h>
26 #include <ripple/overlay/impl/ZeroCopyStream.h>
27 #include <ripple/protocol/messages.h>
28 #include <boost/asio/buffer.hpp>
29 #include <boost/asio/buffers_iterator.hpp>
30 #include <boost/system/error_code.hpp>
31 #include <cassert>
32 #include <cstdint>
33 #include <memory>
34 #include <optional>
35 #include <ripple.pb.h>
36 #include <type_traits>
37 #include <vector>
38 
39 namespace ripple {
40 
41 inline protocol::MessageType
42 protocolMessageType(protocol::TMGetLedger const&)
43 {
44  return protocol::mtGET_LEDGER;
45 }
46 
47 inline protocol::MessageType
48 protocolMessageType(protocol::TMReplayDeltaRequest const&)
49 {
50  return protocol::mtREPLAY_DELTA_REQ;
51 }
52 
53 inline protocol::MessageType
54 protocolMessageType(protocol::TMProofPathRequest const&)
55 {
56  return protocol::mtPROOF_PATH_REQ;
57 }
58 
60 template <class = void>
63 {
64  switch (type)
65  {
66  case protocol::mtMANIFESTS:
67  return "manifests";
68  case protocol::mtPING:
69  return "ping";
70  case protocol::mtCLUSTER:
71  return "cluster";
72  case protocol::mtENDPOINTS:
73  return "endpoints";
74  case protocol::mtTRANSACTION:
75  return "tx";
76  case protocol::mtGET_LEDGER:
77  return "get_ledger";
78  case protocol::mtLEDGER_DATA:
79  return "ledger_data";
80  case protocol::mtPROPOSE_LEDGER:
81  return "propose";
82  case protocol::mtSTATUS_CHANGE:
83  return "status";
84  case protocol::mtHAVE_SET:
85  return "have_set";
86  case protocol::mtVALIDATORLIST:
87  return "validator_list";
88  case protocol::mtVALIDATORLISTCOLLECTION:
89  return "validator_list_collection";
90  case protocol::mtVALIDATION:
91  return "validation";
92  case protocol::mtGET_PEER_SHARD_INFO:
93  return "get_peer_shard_info";
94  case protocol::mtPEER_SHARD_INFO:
95  return "peer_shard_info";
96  case protocol::mtGET_OBJECTS:
97  return "get_objects";
98  case protocol::mtHAVE_TRANSACTIONS:
99  return "have_transactions";
100  case protocol::mtTRANSACTIONS:
101  return "transactions";
102  case protocol::mtSQUELCH:
103  return "squelch";
104  case protocol::mtPROOF_PATH_REQ:
105  return "proof_path_request";
106  case protocol::mtPROOF_PATH_RESPONSE:
107  return "proof_path_response";
108  case protocol::mtREPLAY_DELTA_REQ:
109  return "replay_delta_request";
110  case protocol::mtREPLAY_DELTA_RESPONSE:
111  return "replay_delta_response";
112  case protocol::mtGET_PEER_SHARD_INFO_V2:
113  return "get_peer_shard_info_v2";
114  case protocol::mtPEER_SHARD_INFO_V2:
115  return "peer_shard_info_v2";
116  default:
117  break;
118  }
119  return "unknown";
120 }
121 
122 namespace detail {
123 
125 {
131 
134 
137 
140 
143 
149 };
150 
151 template <typename BufferSequence>
152 auto
153 buffersBegin(BufferSequence const& bufs)
154 {
155  return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(
156  bufs);
157 }
158 
159 template <typename BufferSequence>
160 auto
161 buffersEnd(BufferSequence const& bufs)
162 {
163  return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(
164  bufs);
165 }
166 
176 template <class BufferSequence>
179  boost::system::error_code& ec,
180  BufferSequence const& bufs,
181  std::size_t size)
182 {
183  using namespace ripple::compression;
184 
185  MessageHeader hdr;
186  auto iter = buffersBegin(bufs);
187  assert(iter != buffersEnd(bufs));
188 
189  // Check valid header compressed message:
190  // - 4 bits are the compression algorithm, 1st bit is always set to 1
191  // - 2 bits are always set to 0
192  // - 26 bits are the payload size
193  // - 32 bits are the uncompressed data size
194  if (*iter & 0x80)
195  {
197 
198  // not enough bytes to parse the header
199  if (size < hdr.header_size)
200  {
201  ec = make_error_code(boost::system::errc::success);
202  return std::nullopt;
203  }
204 
205  if (*iter & 0x0C)
206  {
207  ec = make_error_code(boost::system::errc::protocol_error);
208  return std::nullopt;
209  }
210 
211  hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
212 
214  {
215  ec = make_error_code(boost::system::errc::protocol_error);
216  return std::nullopt;
217  }
218 
219  for (int i = 0; i != 4; ++i)
220  hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
221 
222  // clear the top four bits (the compression bits).
223  hdr.payload_wire_size &= 0x0FFFFFFF;
224 
226 
227  for (int i = 0; i != 2; ++i)
228  hdr.message_type = (hdr.message_type << 8) + *iter++;
229 
230  for (int i = 0; i != 4; ++i)
231  hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
232 
233  return hdr;
234  }
235 
236  // Check valid header uncompressed message:
237  // - 6 bits are set to 0
238  // - 26 bits are the payload size
239  if ((*iter & 0xFC) == 0)
240  {
241  hdr.header_size = headerBytes;
242 
243  if (size < hdr.header_size)
244  {
245  ec = make_error_code(boost::system::errc::success);
246  return std::nullopt;
247  }
248 
249  hdr.algorithm = Algorithm::None;
250 
251  for (int i = 0; i != 4; ++i)
252  hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
253 
256 
257  for (int i = 0; i != 2; ++i)
258  hdr.message_type = (hdr.message_type << 8) + *iter++;
259 
260  return hdr;
261  }
262 
263  ec = make_error_code(boost::system::errc::no_message);
264  return std::nullopt;
265 }
266 
267 template <
268  class T,
269  class Buffers,
270  class = std::enable_if_t<
273 parseMessageContent(MessageHeader const& header, Buffers const& buffers)
274 {
275  auto const m = std::make_shared<T>();
276 
277  ZeroCopyInputStream<Buffers> stream(buffers);
278  stream.Skip(header.header_size);
279 
281  {
283  payload.resize(header.uncompressed_size);
284 
285  auto const payloadSize = ripple::compression::decompress(
286  stream,
287  header.payload_wire_size,
288  payload.data(),
289  header.uncompressed_size,
290  header.algorithm);
291 
292  if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
293  return {};
294  }
295  else if (!m->ParseFromZeroCopyStream(&stream))
296  return {};
297 
298  return m;
299 }
300 
301 template <
302  class T,
303  class Buffers,
304  class Handler,
305  class = std::enable_if_t<
307 bool
308 invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
309 {
310  auto const m = parseMessageContent<T>(header, buffers);
311  if (!m)
312  return false;
313 
314  using namespace ripple::compression;
315  handler.onMessageBegin(
316  header.message_type,
317  m,
318  header.payload_wire_size,
319  header.uncompressed_size,
320  header.algorithm != Algorithm::None);
321  handler.onMessage(m);
322  handler.onMessageEnd(header.message_type, m);
323 
324  return true;
325 }
326 
327 } // namespace detail
328 
341 template <class Buffers, class Handler>
344  Buffers const& buffers,
345  Handler& handler,
346  std::size_t& hint)
347 {
349 
350  auto const size = boost::asio::buffer_size(buffers);
351 
352  if (size == 0)
353  return result;
354 
355  auto header = detail::parseMessageHeader(result.second, buffers, size);
356 
357  // If we can't parse the header then it may be that we don't have enough
358  // bytes yet, or because the message was cut off (if error_code is success).
359  // Otherwise we failed to match the header's marker (error_code is set to
360  // no_message) or the compression algorithm is invalid (error_code is
361  // protocol_error) and signal an error.
362  if (!header)
363  return result;
364 
365  // We implement a maximum size for protocol messages. Sending a message
366  // whose size exceeds this may result in the connection being dropped. A
367  // larger message size may be supported in the future or negotiated as
368  // part of a protocol upgrade.
369  if (header->payload_wire_size > maximiumMessageSize ||
370  header->uncompressed_size > maximiumMessageSize)
371  {
372  result.second = make_error_code(boost::system::errc::message_size);
373  return result;
374  }
375 
376  // We requested uncompressed messages from the peer but received compressed.
377  if (!handler.compressionEnabled() &&
378  header->algorithm != compression::Algorithm::None)
379  {
380  result.second = make_error_code(boost::system::errc::protocol_error);
381  return result;
382  }
383 
384  // We don't have the whole message yet. This isn't an error but we have
385  // nothing to do.
386  if (header->total_wire_size > size)
387  {
388  hint = header->total_wire_size - size;
389  return result;
390  }
391 
392  bool success;
393 
394  switch (header->message_type)
395  {
396  case protocol::mtMANIFESTS:
397  success = detail::invoke<protocol::TMManifests>(
398  *header, buffers, handler);
399  break;
400  case protocol::mtPING:
401  success =
402  detail::invoke<protocol::TMPing>(*header, buffers, handler);
403  break;
404  case protocol::mtCLUSTER:
405  success =
406  detail::invoke<protocol::TMCluster>(*header, buffers, handler);
407  break;
408  case protocol::mtENDPOINTS:
409  success = detail::invoke<protocol::TMEndpoints>(
410  *header, buffers, handler);
411  break;
412  case protocol::mtTRANSACTION:
413  success = detail::invoke<protocol::TMTransaction>(
414  *header, buffers, handler);
415  break;
416  case protocol::mtGET_LEDGER:
417  success = detail::invoke<protocol::TMGetLedger>(
418  *header, buffers, handler);
419  break;
420  case protocol::mtLEDGER_DATA:
421  success = detail::invoke<protocol::TMLedgerData>(
422  *header, buffers, handler);
423  break;
424  case protocol::mtPROPOSE_LEDGER:
425  success = detail::invoke<protocol::TMProposeSet>(
426  *header, buffers, handler);
427  break;
428  case protocol::mtSTATUS_CHANGE:
429  success = detail::invoke<protocol::TMStatusChange>(
430  *header, buffers, handler);
431  break;
432  case protocol::mtHAVE_SET:
433  success = detail::invoke<protocol::TMHaveTransactionSet>(
434  *header, buffers, handler);
435  break;
436  case protocol::mtVALIDATION:
437  success = detail::invoke<protocol::TMValidation>(
438  *header, buffers, handler);
439  break;
440  case protocol::mtGET_PEER_SHARD_INFO:
441  success = detail::invoke<protocol::TMGetPeerShardInfo>(
442  *header, buffers, handler);
443  break;
444  case protocol::mtPEER_SHARD_INFO:
445  success = detail::invoke<protocol::TMPeerShardInfo>(
446  *header, buffers, handler);
447  break;
448  case protocol::mtVALIDATORLIST:
449  success = detail::invoke<protocol::TMValidatorList>(
450  *header, buffers, handler);
451  break;
452  case protocol::mtVALIDATORLISTCOLLECTION:
453  success = detail::invoke<protocol::TMValidatorListCollection>(
454  *header, buffers, handler);
455  break;
456  case protocol::mtGET_OBJECTS:
457  success = detail::invoke<protocol::TMGetObjectByHash>(
458  *header, buffers, handler);
459  break;
460  case protocol::mtHAVE_TRANSACTIONS:
461  success = detail::invoke<protocol::TMHaveTransactions>(
462  *header, buffers, handler);
463  break;
464  case protocol::mtTRANSACTIONS:
465  success = detail::invoke<protocol::TMTransactions>(
466  *header, buffers, handler);
467  break;
468  case protocol::mtSQUELCH:
469  success =
470  detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
471  break;
472  case protocol::mtPROOF_PATH_REQ:
473  success = detail::invoke<protocol::TMProofPathRequest>(
474  *header, buffers, handler);
475  break;
476  case protocol::mtPROOF_PATH_RESPONSE:
477  success = detail::invoke<protocol::TMProofPathResponse>(
478  *header, buffers, handler);
479  break;
480  case protocol::mtREPLAY_DELTA_REQ:
481  success = detail::invoke<protocol::TMReplayDeltaRequest>(
482  *header, buffers, handler);
483  break;
484  case protocol::mtREPLAY_DELTA_RESPONSE:
485  success = detail::invoke<protocol::TMReplayDeltaResponse>(
486  *header, buffers, handler);
487  break;
488  case protocol::mtGET_PEER_SHARD_INFO_V2:
489  success = detail::invoke<protocol::TMGetPeerShardInfoV2>(
490  *header, buffers, handler);
491  break;
492  case protocol::mtPEER_SHARD_INFO_V2:
493  success = detail::invoke<protocol::TMPeerShardInfoV2>(
494  *header, buffers, handler);
495  break;
496  default:
497  handler.onMessageUnknown(header->message_type);
498  success = true;
499  break;
500  }
501 
502  result.first = header->total_wire_size;
503 
504  if (!success)
505  result.second = make_error_code(boost::system::errc::bad_message);
506 
507  return result;
508 }
509 
510 } // namespace ripple
511 
512 #endif
std::vector::resize
T resize(T... args)
ripple::detail::MessageHeader::message_type
std::uint16_t message_type
The type of the message.
Definition: ProtocolMessage.h:142
ripple::detail::parseMessageContent
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
Definition: ProtocolMessage.h:273
std::string
STL class.
std::shared_ptr
STL class.
ripple::maximiumMessageSize
constexpr std::size_t maximiumMessageSize
Definition: overlay/Message.h:38
ripple::detail::MessageHeader::payload_wire_size
std::uint32_t payload_wire_size
The size of the payload on the wire.
Definition: ProtocolMessage.h:136
ripple::detail::invoke
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
Definition: ProtocolMessage.h:308
std::pair
ripple::detail::MessageHeader::header_size
std::uint32_t header_size
The size of the header associated with this message.
Definition: ProtocolMessage.h:133
vector
ripple::compression::headerBytes
constexpr std::size_t headerBytes
Definition: Compression.h:31
ripple::detail::buffersEnd
auto buffersEnd(BufferSequence const &bufs)
Definition: ProtocolMessage.h:161
ripple::compression::Algorithm::LZ4
@ LZ4
ripple::ZeroCopyInputStream
Implements ZeroCopyInputStream around a buffer sequence.
Definition: ZeroCopyStream.h:35
ripple::detail::buffersBegin
auto buffersBegin(BufferSequence const &bufs)
Definition: ProtocolMessage.h:153
ripple::detail::MessageHeader::algorithm
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
Definition: ProtocolMessage.h:148
ripple::protocolMessageName
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
Definition: ProtocolMessage.h:62
ripple::detail::MessageHeader::uncompressed_size
std::uint32_t uncompressed_size
Uncompressed message size if the message is compressed.
Definition: ProtocolMessage.h:139
std::enable_if_t
ripple::detail::MessageHeader::total_wire_size
std::uint32_t total_wire_size
The size of the message on the wire.
Definition: ProtocolMessage.h:130
ripple::detail::MessageHeader
Definition: ProtocolMessage.h:124
cstdint
std::uint32_t
ripple::compression
Definition: Compression.h:29
ripple::compression::Algorithm
Algorithm
Definition: Compression.h:36
memory
ripple::protocolMessageType
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
Definition: ProtocolMessage.h:42
ripple::compression::decompress
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition: Compression.h:50
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
cassert
optional
std::size_t
ripple::compression::headerBytesCompressed
constexpr std::size_t headerBytesCompressed
Definition: Compression.h:32
ripple::invokeProtocolMessage
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
Definition: ProtocolMessage.h:343
std::vector::data
T data(T... args)
type_traits
ripple::detail::parseMessageHeader
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
Definition: ProtocolMessage.h:178
std::is_base_of
ripple::compression::Algorithm::None
@ None