rippled
PeerImp.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/app/consensus/RCLValidations.h>
21 #include <ripple/app/ledger/InboundLedgers.h>
22 #include <ripple/app/ledger/InboundTransactions.h>
23 #include <ripple/app/ledger/LedgerMaster.h>
24 #include <ripple/app/ledger/TransactionMaster.h>
25 #include <ripple/app/misc/HashRouter.h>
26 #include <ripple/app/misc/LoadFeeTrack.h>
27 #include <ripple/app/misc/NetworkOPs.h>
28 #include <ripple/app/misc/Transaction.h>
29 #include <ripple/app/misc/ValidatorList.h>
30 #include <ripple/app/tx/apply.h>
31 #include <ripple/basics/UptimeClock.h>
32 #include <ripple/basics/base64.h>
33 #include <ripple/basics/random.h>
34 #include <ripple/basics/safe_cast.h>
35 #include <ripple/beast/core/LexicalCast.h>
36 #include <ripple/beast/core/SemanticVersion.h>
37 #include <ripple/nodestore/DatabaseShard.h>
38 #include <ripple/overlay/Cluster.h>
39 #include <ripple/overlay/impl/PeerImp.h>
40 #include <ripple/overlay/impl/Tuning.h>
41 #include <ripple/overlay/predicates.h>
42 #include <ripple/protocol/digest.h>
43 
44 #include <boost/algorithm/string.hpp>
45 #include <boost/algorithm/string/predicate.hpp>
46 #include <boost/beast/core/ostream.hpp>
47 
48 #include <algorithm>
49 #include <memory>
50 #include <mutex>
51 #include <numeric>
52 #include <sstream>
53 
54 using namespace std::chrono_literals;
55 
56 namespace ripple {
57 
58 namespace {
60 std::chrono::milliseconds constexpr peerHighLatency{300};
61 
63 std::chrono::seconds constexpr peerTimerInterval{60};
64 } // namespace
65 
66 PeerImp::PeerImp(
67  Application& app,
68  id_t id,
70  http_request_type&& request,
71  PublicKey const& publicKey,
73  Resource::Consumer consumer,
74  std::unique_ptr<stream_type>&& stream_ptr,
75  OverlayImpl& overlay)
76  : Child(overlay)
77  , app_(app)
78  , id_(id)
79  , sink_(app_.journal("Peer"), makePrefix(id))
80  , p_sink_(app_.journal("Protocol"), makePrefix(id))
81  , journal_(sink_)
82  , p_journal_(p_sink_)
83  , stream_ptr_(std::move(stream_ptr))
84  , socket_(stream_ptr_->next_layer().socket())
85  , stream_(*stream_ptr_)
86  , strand_(socket_.get_executor())
87  , timer_(waitable_timer{socket_.get_executor()})
88  , remote_address_(slot->remote_endpoint())
89  , overlay_(overlay)
90  , inbound_(true)
91  , protocol_(protocol)
92  , tracking_(Tracking::unknown)
93  , trackingTime_(clock_type::now())
94  , publicKey_(publicKey)
95  , lastPingTime_(clock_type::now())
96  , creationTime_(clock_type::now())
97  , squelch_(app_.journal("Squelch"))
98  , usage_(consumer)
100  , slot_(slot)
101  , request_(std::move(request))
102  , headers_(request_)
103  , compressionEnabled_(
105  headers_,
107  "lz4",
108  app_.config().COMPRESSION)
109  ? Compressed::On
110  : Compressed::Off)
111  , txReduceRelayEnabled_(peerFeatureEnabled(
112  headers_,
113  FEATURE_TXRR,
114  app_.config().TX_REDUCE_RELAY_ENABLE))
115  , vpReduceRelayEnabled_(peerFeatureEnabled(
116  headers_,
117  FEATURE_VPRR,
118  app_.config().VP_REDUCE_RELAY_ENABLE))
119  , ledgerReplayEnabled_(peerFeatureEnabled(
120  headers_,
122  app_.config().LEDGER_REPLAY))
123  , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
124 {
125  JLOG(journal_.info()) << "compression enabled "
126  << (compressionEnabled_ == Compressed::On)
127  << " vp reduce-relay enabled "
128  << vpReduceRelayEnabled_
129  << " tx reduce-relay enabled "
130  << txReduceRelayEnabled_ << " on " << remote_address_
131  << " " << id_;
132 }
133 
135 {
136  const bool inCluster{cluster()};
137 
142 
143  if (inCluster)
144  {
145  JLOG(journal_.warn()) << name() << " left cluster";
146  }
147 }
148 
149 // Helper function to check for valid uint256 values in protobuf buffers
150 static bool
152 {
153  return pBuffStr.size() == uint256::size();
154 }
155 
156 void
158 {
159  if (!strand_.running_in_this_thread())
160  return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
161 
162  auto parseLedgerHash =
163  [](std::string const& value) -> std::optional<uint256> {
164  if (uint256 ret; ret.parseHex(value))
165  return ret;
166 
167  if (auto const s = base64_decode(value); s.size() == uint256::size())
168  return uint256{s};
169 
170  return std::nullopt;
171  };
172 
173  std::optional<uint256> closed;
174  std::optional<uint256> previous;
175 
176  if (auto const iter = headers_.find("Closed-Ledger");
177  iter != headers_.end())
178  {
179  closed = parseLedgerHash(iter->value().to_string());
180 
181  if (!closed)
182  fail("Malformed handshake data (1)");
183  }
184 
185  if (auto const iter = headers_.find("Previous-Ledger");
186  iter != headers_.end())
187  {
188  previous = parseLedgerHash(iter->value().to_string());
189 
190  if (!previous)
191  fail("Malformed handshake data (2)");
192  }
193 
194  if (previous && !closed)
195  fail("Malformed handshake data (3)");
196 
197  {
199  if (closed)
200  closedLedgerHash_ = *closed;
201  if (previous)
202  previousLedgerHash_ = *previous;
203  }
204 
205  if (inbound_)
206  doAccept();
207  else
208  doProtocolStart();
209 
210  // Anything else that needs to be done with the connection should be
211  // done in doProtocolStart
212 }
213 
214 void
216 {
217  if (!strand_.running_in_this_thread())
218  return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
219  if (socket_.is_open())
220  {
221  // The rationale for using different severity levels is that
222  // outbound connections are under our control and may be logged
223  // at a higher level, but inbound connections are more numerous and
224  // uncontrolled so to prevent log flooding the severity is reduced.
225  //
226  if (inbound_)
227  {
228  JLOG(journal_.debug()) << "Stop";
229  }
230  else
231  {
232  JLOG(journal_.info()) << "Stop";
233  }
234  }
235  close();
236 }
237 
238 //------------------------------------------------------------------------------
239 
240 void
242 {
243  if (!strand_.running_in_this_thread())
244  return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
245  if (gracefulClose_)
246  return;
247  if (detaching_)
248  return;
249 
250  auto validator = m->getValidatorKey();
251  if (validator && !squelch_.expireSquelch(*validator))
252  return;
253 
255  safe_cast<TrafficCount::category>(m->getCategory()),
256  false,
257  static_cast<int>(m->getBuffer(compressionEnabled_).size()));
258 
259  auto sendq_size = send_queue_.size();
260 
261  if (sendq_size < Tuning::targetSendQueue)
262  {
263  // To detect a peer that does not read from their
264  // side of the connection, we expect a peer to have
265  // a small senq periodically
266  large_sendq_ = 0;
267  }
268  else if (auto sink = journal_.debug();
269  sink && (sendq_size % Tuning::sendQueueLogFreq) == 0)
270  {
271  std::string const n = name();
272  sink << (n.empty() ? remote_address_.to_string() : n)
273  << " sendq: " << sendq_size;
274  }
275 
276  send_queue_.push(m);
277 
278  if (sendq_size != 0)
279  return;
280 
281  boost::asio::async_write(
282  stream_,
283  boost::asio::buffer(
284  send_queue_.front()->getBuffer(compressionEnabled_)),
285  bind_executor(
286  strand_,
287  std::bind(
290  std::placeholders::_1,
291  std::placeholders::_2)));
292 }
293 
294 void
296 {
297  if (!strand_.running_in_this_thread())
298  return post(
300 
301  if (!txQueue_.empty())
302  {
303  protocol::TMHaveTransactions ht;
304  std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
305  ht.add_hashes(hash.data(), hash.size());
306  });
307  JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
308  txQueue_.clear();
309  send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
310  }
311 }
312 
313 void
315 {
316  if (!strand_.running_in_this_thread())
317  return post(
319 
321  {
322  JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
323  sendTxQueue();
324  }
325 
326  txQueue_.insert(hash);
327  JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
328 }
329 
330 void
332 {
333  if (!strand_.running_in_this_thread())
334  return post(
335  strand_,
337 
338  auto removed = txQueue_.erase(hash);
339  JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
340 }
341 
342 void
344 {
345  if ((usage_.charge(fee) == Resource::drop) &&
346  usage_.disconnect(p_journal_) && strand_.running_in_this_thread())
347  {
348  // Sever the connection
350  fail("charge: Resources");
351  }
352 }
353 
354 //------------------------------------------------------------------------------
355 
356 bool
358 {
359  auto const iter = headers_.find("Crawl");
360  if (iter == headers_.end())
361  return false;
362  return boost::iequals(iter->value(), "public");
363 }
364 
365 bool
367 {
368  return static_cast<bool>(app_.cluster().member(publicKey_));
369 }
370 
373 {
374  if (inbound_)
375  return headers_["User-Agent"].to_string();
376  return headers_["Server"].to_string();
377 }
378 
381 {
383 
384  ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
385  ret[jss::address] = remote_address_.to_string();
386 
387  if (inbound_)
388  ret[jss::inbound] = true;
389 
390  if (cluster())
391  {
392  ret[jss::cluster] = true;
393 
394  if (auto const n = name(); !n.empty())
395  // Could move here if Json::Value supported moving from a string
396  ret[jss::name] = n;
397  }
398 
399  if (auto const d = domain(); !d.empty())
400  ret[jss::server_domain] = domain();
401 
402  if (auto const nid = headers_["Network-ID"].to_string(); !nid.empty())
403  ret[jss::network_id] = nid;
404 
405  ret[jss::load] = usage_.balance();
406 
407  if (auto const version = getVersion(); !version.empty())
408  ret[jss::version] = version;
409 
410  ret[jss::protocol] = to_string(protocol_);
411 
412  {
414  if (latency_)
415  ret[jss::latency] = static_cast<Json::UInt>(latency_->count());
416  }
417 
418  ret[jss::uptime] = static_cast<Json::UInt>(
419  std::chrono::duration_cast<std::chrono::seconds>(uptime()).count());
420 
421  std::uint32_t minSeq, maxSeq;
422  ledgerRange(minSeq, maxSeq);
423 
424  if ((minSeq != 0) || (maxSeq != 0))
425  ret[jss::complete_ledgers] =
426  std::to_string(minSeq) + " - " + std::to_string(maxSeq);
427 
428  switch (tracking_.load())
429  {
430  case Tracking::diverged:
431  ret[jss::track] = "diverged";
432  break;
433 
434  case Tracking::unknown:
435  ret[jss::track] = "unknown";
436  break;
437 
438  case Tracking::converged:
439  // Nothing to do here
440  break;
441  }
442 
443  uint256 closedLedgerHash;
444  protocol::TMStatusChange last_status;
445  {
447  closedLedgerHash = closedLedgerHash_;
448  last_status = last_status_;
449  }
450 
451  if (closedLedgerHash != beast::zero)
452  ret[jss::ledger] = to_string(closedLedgerHash);
453 
454  if (last_status.has_newstatus())
455  {
456  switch (last_status.newstatus())
457  {
458  case protocol::nsCONNECTING:
459  ret[jss::status] = "connecting";
460  break;
461 
462  case protocol::nsCONNECTED:
463  ret[jss::status] = "connected";
464  break;
465 
466  case protocol::nsMONITORING:
467  ret[jss::status] = "monitoring";
468  break;
469 
470  case protocol::nsVALIDATING:
471  ret[jss::status] = "validating";
472  break;
473 
474  case protocol::nsSHUTTING:
475  ret[jss::status] = "shutting";
476  break;
477 
478  default:
479  JLOG(p_journal_.warn())
480  << "Unknown status: " << last_status.newstatus();
481  }
482  }
483 
484  ret[jss::metrics] = Json::Value(Json::objectValue);
485  ret[jss::metrics][jss::total_bytes_recv] =
486  std::to_string(metrics_.recv.total_bytes());
487  ret[jss::metrics][jss::total_bytes_sent] =
488  std::to_string(metrics_.sent.total_bytes());
489  ret[jss::metrics][jss::avg_bps_recv] =
490  std::to_string(metrics_.recv.average_bytes());
491  ret[jss::metrics][jss::avg_bps_sent] =
492  std::to_string(metrics_.sent.average_bytes());
493 
494  return ret;
495 }
496 
497 bool
499 {
500  switch (f)
501  {
503  return protocol_ >= make_protocol(2, 1);
505  return protocol_ >= make_protocol(2, 2);
507  return ledgerReplayEnabled_;
508  }
509  return false;
510 }
511 
512 //------------------------------------------------------------------------------
513 
514 bool
515 PeerImp::hasLedger(uint256 const& hash, std::uint32_t seq) const
516 {
517  {
519  if ((seq != 0) && (seq >= minLedger_) && (seq <= maxLedger_) &&
521  return true;
522  if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
523  recentLedgers_.end())
524  return true;
525  }
526 
527  if (seq >= app_.getNodeStore().earliestLedgerSeq())
528  {
530  auto const it{shardInfos_.find(publicKey_)};
531  if (it != shardInfos_.end())
532  {
533  auto const shardIndex{app_.getNodeStore().seqToShardIndex(seq)};
534  return boost::icl::contains(it->second.finalized(), shardIndex);
535  }
536  }
537  return false;
538 }
539 
540 void
542 {
544 
545  minSeq = minLedger_;
546  maxSeq = maxLedger_;
547 }
548 
549 bool
550 PeerImp::hasTxSet(uint256 const& hash) const
551 {
553  return std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
554  recentTxSets_.end();
555 }
556 
557 void
559 {
560  // Operations on closedLedgerHash_ and previousLedgerHash_ must be
561  // guarded by recentLock_.
565 }
566 
567 bool
569 {
571  return (tracking_ != Tracking::diverged) && (uMin >= minLedger_) &&
572  (uMax <= maxLedger_);
573 }
574 
575 //------------------------------------------------------------------------------
576 
577 void
579 {
580  assert(strand_.running_in_this_thread());
581  if (socket_.is_open())
582  {
583  detaching_ = true; // DEPRECATED
584  error_code ec;
585  timer_.cancel(ec);
586  socket_.close(ec);
588  if (inbound_)
589  {
590  JLOG(journal_.debug()) << "Closed";
591  }
592  else
593  {
594  JLOG(journal_.info()) << "Closed";
595  }
596  }
597 }
598 
599 void
601 {
602  if (!strand_.running_in_this_thread())
603  return post(
604  strand_,
605  std::bind(
606  (void (Peer::*)(std::string const&)) & PeerImp::fail,
608  reason));
610  {
611  std::string const n = name();
612  JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
613  << " failed: " << reason;
614  }
615  close();
616 }
617 
618 void
620 {
621  assert(strand_.running_in_this_thread());
622  if (socket_.is_open())
623  {
624  JLOG(journal_.warn())
625  << name << " from " << toBase58(TokenType::NodePublic, publicKey_)
626  << " at " << remote_address_.to_string() << ": " << ec.message();
627  }
628  close();
629 }
630 
633 {
635  return shardInfos_;
636 }
637 
638 void
640 {
641  assert(strand_.running_in_this_thread());
642  assert(socket_.is_open());
643  assert(!gracefulClose_);
644  gracefulClose_ = true;
645  if (send_queue_.size() > 0)
646  return;
647  setTimer();
648  stream_.async_shutdown(bind_executor(
649  strand_,
650  std::bind(
651  &PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
652 }
653 
654 void
656 {
657  error_code ec;
658  timer_.expires_from_now(peerTimerInterval, ec);
659 
660  if (ec)
661  {
662  JLOG(journal_.error()) << "setTimer: " << ec.message();
663  return;
664  }
665  timer_.async_wait(bind_executor(
666  strand_,
667  std::bind(
668  &PeerImp::onTimer, shared_from_this(), std::placeholders::_1)));
669 }
670 
671 // convenience for ignoring the error code
672 void
674 {
675  error_code ec;
676  timer_.cancel(ec);
677 }
678 
679 //------------------------------------------------------------------------------
680 
683 {
685  ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
686  return ss.str();
687 }
688 
689 void
691 {
692  if (!socket_.is_open())
693  return;
694 
695  if (ec == boost::asio::error::operation_aborted)
696  return;
697 
698  if (ec)
699  {
700  // This should never happen
701  JLOG(journal_.error()) << "onTimer: " << ec.message();
702  return close();
703  }
704 
706  {
707  fail("Large send queue");
708  return;
709  }
710 
711  if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
712  {
713  clock_type::duration duration;
714 
715  {
717  duration = clock_type::now() - trackingTime_;
718  }
719 
720  if ((t == Tracking::diverged &&
721  (duration > app_.config().MAX_DIVERGED_TIME)) ||
722  (t == Tracking::unknown &&
723  (duration > app_.config().MAX_UNKNOWN_TIME)))
724  {
726  fail("Not useful");
727  return;
728  }
729  }
730 
731  // Already waiting for PONG
732  if (lastPingSeq_)
733  {
734  fail("Ping Timeout");
735  return;
736  }
737 
739  lastPingSeq_ = rand_int<std::uint32_t>();
740 
741  protocol::TMPing message;
742  message.set_type(protocol::TMPing::ptPING);
743  message.set_seq(*lastPingSeq_);
744 
745  send(std::make_shared<Message>(message, protocol::mtPING));
746 
747  setTimer();
748 }
749 
750 void
752 {
753  cancelTimer();
754  // If we don't get eof then something went wrong
755  if (!ec)
756  {
757  JLOG(journal_.error()) << "onShutdown: expected error condition";
758  return close();
759  }
760  if (ec != boost::asio::error::eof)
761  return fail("onShutdown", ec);
762  close();
763 }
764 
765 //------------------------------------------------------------------------------
766 void
768 {
769  assert(read_buffer_.size() == 0);
770 
771  JLOG(journal_.debug()) << "doAccept: " << remote_address_;
772 
773  auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
774 
775  // This shouldn't fail since we already computed
776  // the shared value successfully in OverlayImpl
777  if (!sharedValue)
778  return fail("makeSharedValue: Unexpected failure");
779 
780  JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
781  JLOG(journal_.info()) << "Public Key: "
783 
784  if (auto member = app_.cluster().member(publicKey_))
785  {
786  {
788  name_ = *member;
789  }
790  JLOG(journal_.info()) << "Cluster name: " << *member;
791  }
792 
794 
795  // XXX Set timer: connection is in grace period to be useful.
796  // XXX Set timer: connection idle (idle may vary depending on connection
797  // type.)
798 
799  auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
800 
801  boost::beast::ostream(*write_buffer) << makeResponse(
803  request_,
806  *sharedValue,
808  protocol_,
809  app_);
810 
811  // Write the whole buffer and only start protocol when that's done.
812  boost::asio::async_write(
813  stream_,
814  write_buffer->data(),
815  boost::asio::transfer_all(),
816  bind_executor(
817  strand_,
818  [this, write_buffer, self = shared_from_this()](
819  error_code ec, std::size_t bytes_transferred) {
820  if (!socket_.is_open())
821  return;
822  if (ec == boost::asio::error::operation_aborted)
823  return;
824  if (ec)
825  return fail("onWriteResponse", ec);
826  if (write_buffer->size() == bytes_transferred)
827  return doProtocolStart();
828  return fail("Failed to write header");
829  }));
830 }
831 
834 {
835  std::shared_lock read_lock{nameMutex_};
836  return name_;
837 }
838 
841 {
842  return headers_["Server-Domain"].to_string();
843 }
844 
845 //------------------------------------------------------------------------------
846 
847 // Protocol logic
848 
849 void
851 {
853 
854  // Send all the validator lists that have been loaded
856  {
858  [&](std::string const& manifest,
859  std::uint32_t version,
861  PublicKey const& pubKey,
862  std::size_t maxSequence,
863  uint256 const& hash) {
865  *this,
866  0,
867  pubKey,
868  maxSequence,
869  version,
870  manifest,
871  blobInfos,
873  p_journal_);
874 
875  // Don't send it next time.
877  });
878  }
879 
880  if (auto m = overlay_.getManifestsMessage())
881  send(m);
882 
883  // Request shard info from peer
884  protocol::TMGetPeerShardInfoV2 tmGPS;
885  tmGPS.set_relays(0);
886  send(std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2));
887 
888  setTimer();
889 }
890 
891 // Called repeatedly with protocol message data
892 void
894 {
895  if (!socket_.is_open())
896  return;
897  if (ec == boost::asio::error::operation_aborted)
898  return;
899  if (ec == boost::asio::error::eof)
900  {
901  JLOG(journal_.info()) << "EOF";
902  return gracefulClose();
903  }
904  if (ec)
905  return fail("onReadMessage", ec);
906  if (auto stream = journal_.trace())
907  {
908  if (bytes_transferred > 0)
909  stream << "onReadMessage: " << bytes_transferred << " bytes";
910  else
911  stream << "onReadMessage";
912  }
913 
914  metrics_.recv.add_message(bytes_transferred);
915 
916  read_buffer_.commit(bytes_transferred);
917 
918  auto hint = Tuning::readBufferBytes;
919 
920  while (read_buffer_.size() > 0)
921  {
922  std::size_t bytes_consumed;
923  std::tie(bytes_consumed, ec) =
924  invokeProtocolMessage(read_buffer_.data(), *this, hint);
925  if (ec)
926  return fail("onReadMessage", ec);
927  if (!socket_.is_open())
928  return;
929  if (gracefulClose_)
930  return;
931  if (bytes_consumed == 0)
932  break;
933  read_buffer_.consume(bytes_consumed);
934  }
935 
936  // Timeout on writes only
937  stream_.async_read_some(
939  bind_executor(
940  strand_,
941  std::bind(
944  std::placeholders::_1,
945  std::placeholders::_2)));
946 }
947 
948 void
950 {
951  if (!socket_.is_open())
952  return;
953  if (ec == boost::asio::error::operation_aborted)
954  return;
955  if (ec)
956  return fail("onWriteMessage", ec);
957  if (auto stream = journal_.trace())
958  {
959  if (bytes_transferred > 0)
960  stream << "onWriteMessage: " << bytes_transferred << " bytes";
961  else
962  stream << "onWriteMessage";
963  }
964 
965  metrics_.sent.add_message(bytes_transferred);
966 
967  assert(!send_queue_.empty());
968  send_queue_.pop();
969  if (!send_queue_.empty())
970  {
971  // Timeout on writes only
972  return boost::asio::async_write(
973  stream_,
974  boost::asio::buffer(
975  send_queue_.front()->getBuffer(compressionEnabled_)),
976  bind_executor(
977  strand_,
978  std::bind(
981  std::placeholders::_1,
982  std::placeholders::_2)));
983  }
984 
985  if (gracefulClose_)
986  {
987  return stream_.async_shutdown(bind_executor(
988  strand_,
989  std::bind(
992  std::placeholders::_1)));
993  }
994 }
995 
996 //------------------------------------------------------------------------------
997 //
998 // ProtocolHandler
999 //
1000 //------------------------------------------------------------------------------
1001 
1002 void
1004 {
1005  // TODO
1006 }
1007 
1008 void
1010  std::uint16_t type,
1012  std::size_t size,
1013  std::size_t uncompressed_size,
1014  bool isCompressed)
1015 {
1016  load_event_ =
1019  auto const category = TrafficCount::categorize(*m, type, true);
1020  overlay_.reportTraffic(category, true, static_cast<int>(size));
1021  using namespace protocol;
1022  if ((type == MessageType::mtTRANSACTION ||
1023  type == MessageType::mtHAVE_TRANSACTIONS ||
1024  type == MessageType::mtTRANSACTIONS ||
1025  // GET_OBJECTS
1026  category == TrafficCount::category::get_transactions ||
1027  // GET_LEDGER
1028  category == TrafficCount::category::ld_tsc_get ||
1029  category == TrafficCount::category::ld_tsc_share ||
1030  // LEDGER_DATA
1031  category == TrafficCount::category::gl_tsc_share ||
1032  category == TrafficCount::category::gl_tsc_get) &&
1034  {
1036  static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
1037  }
1038  JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
1039  << uncompressed_size << " " << isCompressed;
1040 }
1041 
1042 void
1044  std::uint16_t,
1046 {
1047  load_event_.reset();
1048  charge(fee_);
1049 }
1050 
1051 void
1053 {
1054  auto const s = m->list_size();
1055 
1056  if (s == 0)
1057  {
1059  return;
1060  }
1061 
1062  if (s > 100)
1064 
1066  jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
1067  overlay_.onManifests(m, that);
1068  });
1069 }
1070 
1071 void
1073 {
1074  if (m->type() == protocol::TMPing::ptPING)
1075  {
1076  // We have received a ping request, reply with a pong
1078  m->set_type(protocol::TMPing::ptPONG);
1079  send(std::make_shared<Message>(*m, protocol::mtPING));
1080  return;
1081  }
1082 
1083  if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
1084  {
1085  // Only reset the ping sequence if we actually received a
1086  // PONG with the correct cookie. That way, any peers which
1087  // respond with incorrect cookies will eventually time out.
1088  if (m->seq() == lastPingSeq_)
1089  {
1090  lastPingSeq_.reset();
1091 
1092  // Update latency estimate
1093  auto const rtt = std::chrono::round<std::chrono::milliseconds>(
1095 
1097 
1098  if (latency_)
1099  latency_ = (*latency_ * 7 + rtt) / 8;
1100  else
1101  latency_ = rtt;
1102  }
1103 
1104  return;
1105  }
1106 }
1107 
1108 void
1110 {
1111  // VFALCO NOTE I think we should drop the peer immediately
1112  if (!cluster())
1113  {
1115  return;
1116  }
1117 
1118  for (int i = 0; i < m->clusternodes().size(); ++i)
1119  {
1120  protocol::TMClusterNode const& node = m->clusternodes(i);
1121 
1122  std::string name;
1123  if (node.has_nodename())
1124  name = node.nodename();
1125 
1126  auto const publicKey =
1127  parseBase58<PublicKey>(TokenType::NodePublic, node.publickey());
1128 
1129  // NIKB NOTE We should drop the peer immediately if
1130  // they send us a public key we can't parse
1131  if (publicKey)
1132  {
1133  auto const reportTime =
1134  NetClock::time_point{NetClock::duration{node.reporttime()}};
1135 
1136  app_.cluster().update(
1137  *publicKey, name, node.nodeload(), reportTime);
1138  }
1139  }
1140 
1141  int loadSources = m->loadsources().size();
1142  if (loadSources != 0)
1143  {
1144  Resource::Gossip gossip;
1145  gossip.items.reserve(loadSources);
1146  for (int i = 0; i < m->loadsources().size(); ++i)
1147  {
1148  protocol::TMLoadSource const& node = m->loadsources(i);
1150  item.address = beast::IP::Endpoint::from_string(node.name());
1151  item.balance = node.cost();
1152  if (item.address != beast::IP::Endpoint())
1153  gossip.items.push_back(item);
1154  }
1156  }
1157 
1158  // Calculate the cluster fee:
1159  auto const thresh = app_.timeKeeper().now() - 90s;
1160  std::uint32_t clusterFee = 0;
1161 
1163  fees.reserve(app_.cluster().size());
1164 
1165  app_.cluster().for_each([&fees, thresh](ClusterNode const& status) {
1166  if (status.getReportTime() >= thresh)
1167  fees.push_back(status.getLoadFee());
1168  });
1169 
1170  if (!fees.empty())
1171  {
1172  auto const index = fees.size() / 2;
1173  std::nth_element(fees.begin(), fees.begin() + index, fees.end());
1174  clusterFee = fees[index];
1175  }
1176 
1177  app_.getFeeTrack().setClusterFee(clusterFee);
1178 }
1179 
1180 void
1182 {
1183  // DEPRECATED
1184 }
1185 
1186 void
1188 {
1189  // DEPRECATED
1190 }
1191 
1192 void
1194 {
1195  auto badData = [&](std::string msg) {
1197  JLOG(p_journal_.warn()) << msg;
1198  };
1199 
1200  // Verify relays
1201  if (m->relays() > relayLimit)
1202  return badData("Invalid relays");
1203 
1204  // Verify peer chain
1205  // The peer chain should not contain this node's public key
1206  // nor the public key of the sending peer
1207  std::set<PublicKey> pubKeyChain;
1208  pubKeyChain.insert(app_.nodeIdentity().first);
1209  pubKeyChain.insert(publicKey_);
1210 
1211  auto const peerChainSz{m->peerchain_size()};
1212  if (peerChainSz > 0)
1213  {
1214  if (peerChainSz > relayLimit)
1215  return badData("Invalid peer chain size");
1216 
1217  if (peerChainSz + m->relays() > relayLimit)
1218  return badData("Invalid relays and peer chain size");
1219 
1220  for (int i = 0; i < peerChainSz; ++i)
1221  {
1222  auto const slice{makeSlice(m->peerchain(i).publickey())};
1223 
1224  // Verify peer public key
1225  if (!publicKeyType(slice))
1226  return badData("Invalid peer public key");
1227 
1228  // Verify peer public key is unique in the peer chain
1229  if (!pubKeyChain.emplace(slice).second)
1230  return badData("Invalid peer public key");
1231  }
1232  }
1233 
1234  // Reply with shard info this node may have
1235  if (auto shardStore = app_.getShardStore())
1236  {
1237  auto reply{shardStore->getShardInfo()->makeMessage(app_)};
1238  if (peerChainSz > 0)
1239  *(reply.mutable_peerchain()) = m->peerchain();
1240  send(std::make_shared<Message>(reply, protocol::mtPEER_SHARD_INFO_V2));
1241  }
1242 
1243  if (m->relays() == 0)
1244  return;
1245 
1246  // Charge originating peer a fee for requesting relays
1247  if (peerChainSz == 0)
1249 
1250  // Add peer to the peer chain
1251  m->add_peerchain()->set_publickey(publicKey_.data(), publicKey_.size());
1252 
1253  // Relay the request to peers, exclude the peer chain
1254  m->set_relays(m->relays() - 1);
1256  std::make_shared<Message>(*m, protocol::mtGET_PEER_SHARD_INFO_V2),
1257  [&](std::shared_ptr<Peer> const& peer) {
1258  return pubKeyChain.find(peer->getNodePublic()) != pubKeyChain.end();
1259  }));
1260 }
1261 
1262 void
1264 {
1265  // Find the earliest and latest shard indexes
1266  auto const& db{app_.getNodeStore()};
1267  auto const earliestShardIndex{db.earliestShardIndex()};
1268  auto const latestShardIndex{[&]() -> std::optional<std::uint32_t> {
1269  auto const curLedgerSeq{app_.getLedgerMaster().getCurrentLedgerIndex()};
1270  if (curLedgerSeq >= db.earliestLedgerSeq())
1271  return db.seqToShardIndex(curLedgerSeq);
1272  return std::nullopt;
1273  }()};
1274 
1275  auto badData = [&](std::string msg) {
1277  JLOG(p_journal_.warn()) << msg;
1278  };
1279 
1280  // Used to create a digest and verify the message signature
1281  Serializer s;
1283 
1284  // Verify message creation time
1286  {
1287  auto const timestamp{
1288  NetClock::time_point{std::chrono::seconds{m->timestamp()}}};
1289  auto const now{app_.timeKeeper().now()};
1290  if (timestamp > (now + 5s))
1291  return badData("Invalid timestamp");
1292 
1293  // Check if stale
1294  using namespace std::chrono_literals;
1295  if (timestamp < (now - 5min))
1296  return badData("Stale timestamp");
1297 
1298  s.add32(m->timestamp());
1299  shardInfo.setMsgTimestamp(timestamp);
1300  }
1301 
1302  // Verify incomplete shards
1303  auto const numIncomplete{m->incomplete_size()};
1304  if (numIncomplete > 0)
1305  {
1306  if (latestShardIndex && numIncomplete > *latestShardIndex)
1307  return badData("Invalid number of incomplete shards");
1308 
1309  // Verify each incomplete shard
1310  for (int i = 0; i < numIncomplete; ++i)
1311  {
1312  auto const& incomplete{m->incomplete(i)};
1313  auto const shardIndex{incomplete.shardindex()};
1314 
1315  // Verify shard index
1316  if (shardIndex < earliestShardIndex ||
1317  (latestShardIndex && shardIndex > latestShardIndex))
1318  {
1319  return badData("Invalid incomplete shard index");
1320  }
1321  s.add32(shardIndex);
1322 
1323  // Verify state
1324  auto const state{static_cast<ShardState>(incomplete.state())};
1325  switch (state)
1326  {
1327  // Incomplete states
1328  case ShardState::acquire:
1329  case ShardState::complete:
1331  case ShardState::queued:
1332  break;
1333 
1334  // case ShardState::finalized:
1335  default:
1336  return badData("Invalid incomplete shard state");
1337  }
1338  s.add32(incomplete.state());
1339 
1340  // Verify progress
1341  std::uint32_t progress{0};
1342  if (incomplete.has_progress())
1343  {
1344  progress = incomplete.progress();
1345  if (progress < 1 || progress > 100)
1346  return badData("Invalid incomplete shard progress");
1347  s.add32(progress);
1348  }
1349 
1350  // Verify each incomplete shard is unique
1351  if (!shardInfo.update(shardIndex, state, progress))
1352  return badData("Invalid duplicate incomplete shards");
1353  }
1354  }
1355 
1356  // Verify finalized shards
1357  if (m->has_finalized())
1358  {
1359  auto const& str{m->finalized()};
1360  if (str.empty())
1361  return badData("Invalid finalized shards");
1362 
1363  if (!shardInfo.setFinalizedFromString(str))
1364  return badData("Invalid finalized shard indexes");
1365 
1366  auto const& finalized{shardInfo.finalized()};
1367  auto const numFinalized{boost::icl::length(finalized)};
1368  if (numFinalized == 0 ||
1369  boost::icl::first(finalized) < earliestShardIndex ||
1370  (latestShardIndex &&
1371  boost::icl::last(finalized) > latestShardIndex))
1372  {
1373  return badData("Invalid finalized shard indexes");
1374  }
1375 
1376  if (latestShardIndex &&
1377  (numFinalized + numIncomplete) > *latestShardIndex)
1378  {
1379  return badData("Invalid number of finalized and incomplete shards");
1380  }
1381 
1382  s.addRaw(str.data(), str.size());
1383  }
1384 
1385  // Verify public key
1386  auto slice{makeSlice(m->publickey())};
1387  if (!publicKeyType(slice))
1388  return badData("Invalid public key");
1389 
1390  // Verify peer public key isn't this nodes's public key
1391  PublicKey const publicKey(slice);
1392  if (publicKey == app_.nodeIdentity().first)
1393  return badData("Invalid public key");
1394 
1395  // Verify signature
1396  if (!verify(publicKey, s.slice(), makeSlice(m->signature()), false))
1397  return badData("Invalid signature");
1398 
1399  // Forward the message if a peer chain exists
1400  auto const peerChainSz{m->peerchain_size()};
1401  if (peerChainSz > 0)
1402  {
1403  // Verify peer chain
1404  if (peerChainSz > relayLimit)
1405  return badData("Invalid peer chain size");
1406 
1407  // The peer chain should not contain this node's public key
1408  // nor the public key of the sending peer
1409  std::set<PublicKey> pubKeyChain;
1410  pubKeyChain.insert(app_.nodeIdentity().first);
1411  pubKeyChain.insert(publicKey_);
1412 
1413  for (int i = 0; i < peerChainSz; ++i)
1414  {
1415  // Verify peer public key
1416  slice = makeSlice(m->peerchain(i).publickey());
1417  if (!publicKeyType(slice))
1418  return badData("Invalid peer public key");
1419 
1420  // Verify peer public key is unique in the peer chain
1421  if (!pubKeyChain.emplace(slice).second)
1422  return badData("Invalid peer public key");
1423  }
1424 
1425  // If last peer in the chain is connected, relay the message
1426  PublicKey const peerPubKey(
1427  makeSlice(m->peerchain(peerChainSz - 1).publickey()));
1428  if (auto peer = overlay_.findPeerByPublicKey(peerPubKey))
1429  {
1430  m->mutable_peerchain()->RemoveLast();
1431  peer->send(
1432  std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO_V2));
1433  JLOG(p_journal_.trace())
1434  << "Relayed TMPeerShardInfoV2 from peer IP "
1435  << remote_address_.address().to_string() << " to peer IP "
1436  << peer->getRemoteAddress().to_string();
1437  }
1438  else
1439  {
1440  // Peer is no longer available so the relay ends
1441  JLOG(p_journal_.info()) << "Unable to relay peer shard info";
1442  }
1443  }
1444 
1445  JLOG(p_journal_.trace())
1446  << "Consumed TMPeerShardInfoV2 originating from public key "
1447  << toBase58(TokenType::NodePublic, publicKey) << " finalized shards["
1448  << ripple::to_string(shardInfo.finalized()) << "] incomplete shards["
1449  << (shardInfo.incomplete().empty() ? "empty"
1450  : shardInfo.incompleteToString())
1451  << "]";
1452 
1453  // Consume the message
1454  {
1456  auto const it{shardInfos_.find(publicKey_)};
1457  if (it == shardInfos_.end())
1458  shardInfos_.emplace(publicKey, std::move(shardInfo));
1459  else if (shardInfo.msgTimestamp() > it->second.msgTimestamp())
1460  it->second = std::move(shardInfo);
1461  }
1462 
1463  // Notify overlay a reply was received from the last peer in this chain
1464  if (peerChainSz == 0)
1466 }
1467 
1468 void
1470 {
1471  // Don't allow endpoints from peers that are not known tracking or are
1472  // not using a version of the message that we support:
1473  if (tracking_.load() != Tracking::converged || m->version() != 2)
1474  return;
1475 
1476  // The number is arbitrary and doesn't have any real significance or
1477  // implication for the protocol.
1478  if (m->endpoints_v2().size() >= 1024)
1479  {
1481  return;
1482  }
1483 
1485  endpoints.reserve(m->endpoints_v2().size());
1486 
1487  for (auto const& tm : m->endpoints_v2())
1488  {
1489  auto result = beast::IP::Endpoint::from_string_checked(tm.endpoint());
1490 
1491  if (!result)
1492  {
1493  JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
1494  << tm.endpoint() << "}";
1496  continue;
1497  }
1498 
1499  // If hops == 0, this Endpoint describes the peer we are connected
1500  // to -- in that case, we take the remote address seen on the
1501  // socket and store that in the IP::Endpoint. If this is the first
1502  // time, then we'll verify that their listener can receive incoming
1503  // by performing a connectivity test. if hops > 0, then we just
1504  // take the address/port we were given
1505  if (tm.hops() == 0)
1506  result = remote_address_.at_port(result->port());
1507 
1508  endpoints.emplace_back(*result, tm.hops());
1509  }
1510 
1511  if (!endpoints.empty())
1512  overlay_.peerFinder().on_endpoints(slot_, endpoints);
1513 }
1514 
1515 void
1517 {
1518  handleTransaction(m, true);
1519 }
1520 
1521 void
1524  bool eraseTxQueue)
1525 {
1527  return;
1528 
1530  {
1531  // If we've never been in synch, there's nothing we can do
1532  // with a transaction
1533  JLOG(p_journal_.debug()) << "Ignoring incoming transaction: "
1534  << "Need network ledger";
1535  return;
1536  }
1537 
1538  SerialIter sit(makeSlice(m->rawtransaction()));
1539 
1540  try
1541  {
1542  auto stx = std::make_shared<STTx const>(sit);
1543  uint256 txID = stx->getTransactionID();
1544 
1545  int flags;
1546  constexpr std::chrono::seconds tx_interval = 10s;
1547 
1548  if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
1549  {
1550  // we have seen this transaction recently
1551  if (flags & SF_BAD)
1552  {
1554  JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
1555  }
1556 
1557  // Erase only if the server has seen this tx. If the server has not
1558  // seen this tx then the tx could not has been queued for this peer.
1559  else if (eraseTxQueue && txReduceRelayEnabled())
1560  removeTxQueue(txID);
1561 
1562  return;
1563  }
1564 
1565  JLOG(p_journal_.debug()) << "Got tx " << txID;
1566 
1567  bool checkSignature = true;
1568  if (cluster())
1569  {
1570  if (!m->has_deferred() || !m->deferred())
1571  {
1572  // Skip local checks if a server we trust
1573  // put the transaction in its open ledger
1574  flags |= SF_TRUSTED;
1575  }
1576 
1578  {
1579  // For now, be paranoid and have each validator
1580  // check each transaction, regardless of source
1581  checkSignature = false;
1582  }
1583  }
1584 
1586  {
1587  JLOG(p_journal_.trace())
1588  << "No new transactions until synchronized";
1589  }
1590  else if (
1593  {
1595  JLOG(p_journal_.info()) << "Transaction queue is full";
1596  }
1597  else
1598  {
1600  jtTRANSACTION,
1601  "recvTransaction->checkTransaction",
1603  flags,
1604  checkSignature,
1605  stx]() {
1606  if (auto peer = weak.lock())
1607  peer->checkTransaction(flags, checkSignature, stx);
1608  });
1609  }
1610  }
1611  catch (std::exception const& ex)
1612  {
1613  JLOG(p_journal_.warn())
1614  << "Transaction invalid: " << strHex(m->rawtransaction())
1615  << ". Exception: " << ex.what();
1616  }
1617 }
1618 
1619 void
1621 {
1622  auto badData = [&](std::string const& msg) {
1624  JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
1625  };
1626  auto const itype{m->itype()};
1627 
1628  // Verify ledger info type
1629  if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
1630  return badData("Invalid ledger info type");
1631 
1632  auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
1633  if (m->has_ltype())
1634  return m->ltype();
1635  return std::nullopt;
1636  }();
1637 
1638  if (itype == protocol::liTS_CANDIDATE)
1639  {
1640  if (!m->has_ledgerhash())
1641  return badData("Invalid TX candidate set, missing TX set hash");
1642  }
1643  else if (
1644  !m->has_ledgerhash() && !m->has_ledgerseq() &&
1645  !(ltype && *ltype == protocol::ltCLOSED))
1646  {
1647  return badData("Invalid request");
1648  }
1649 
1650  // Verify ledger type
1651  if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
1652  return badData("Invalid ledger type");
1653 
1654  // Verify ledger hash
1655  if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
1656  return badData("Invalid ledger hash");
1657 
1658  // Verify ledger sequence
1659  if (m->has_ledgerseq())
1660  {
1661  auto const ledgerSeq{m->ledgerseq()};
1662  // Verifying the network's earliest ledger only pertains to shards.
1663  if (app_.getShardStore() &&
1664  ledgerSeq < app_.getNodeStore().earliestLedgerSeq())
1665  {
1666  return badData(
1667  "Invalid ledger sequence " + std::to_string(ledgerSeq));
1668  }
1669 
1670  // Check if within a reasonable range
1671  using namespace std::chrono_literals;
1672  if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1673  ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1674  {
1675  return badData(
1676  "Invalid ledger sequence " + std::to_string(ledgerSeq));
1677  }
1678  }
1679 
1680  // Verify ledger node IDs
1681  if (itype != protocol::liBASE)
1682  {
1683  if (m->nodeids_size() <= 0)
1684  return badData("Invalid ledger node IDs");
1685 
1686  for (auto const& nodeId : m->nodeids())
1687  {
1688  if (deserializeSHAMapNodeID(nodeId) == std::nullopt)
1689  return badData("Invalid SHAMap node ID");
1690  }
1691  }
1692 
1693  // Verify query type
1694  if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
1695  return badData("Invalid query type");
1696 
1697  // Verify query depth
1698  if (m->has_querydepth())
1699  {
1700  if (m->querydepth() > Tuning::maxQueryDepth ||
1701  itype == protocol::liBASE)
1702  {
1703  return badData("Invalid query depth");
1704  }
1705  }
1706 
1707  // Queue a job to process the request
1709  app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
1710  if (auto peer = weak.lock())
1711  peer->processLedgerRequest(m);
1712  });
1713 }
1714 
1715 void
1717 {
1718  JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
1719  if (!ledgerReplayEnabled_)
1720  {
1722  return;
1723  }
1724 
1728  jtREPLAY_REQ, "recvProofPathRequest", [weak, m]() {
1729  if (auto peer = weak.lock())
1730  {
1731  auto reply =
1732  peer->ledgerReplayMsgHandler_.processProofPathRequest(m);
1733  if (reply.has_error())
1734  {
1735  if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1736  peer->charge(Resource::feeInvalidRequest);
1737  else
1738  peer->charge(Resource::feeRequestNoReply);
1739  }
1740  else
1741  {
1742  peer->send(std::make_shared<Message>(
1743  reply, protocol::mtPROOF_PATH_RESPONSE));
1744  }
1745  }
1746  });
1747 }
1748 
1749 void
1751 {
1752  if (!ledgerReplayEnabled_)
1753  {
1755  return;
1756  }
1757 
1759  {
1761  }
1762 }
1763 
1764 void
1766 {
1767  JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
1768  if (!ledgerReplayEnabled_)
1769  {
1771  return;
1772  }
1773 
1777  jtREPLAY_REQ, "recvReplayDeltaRequest", [weak, m]() {
1778  if (auto peer = weak.lock())
1779  {
1780  auto reply =
1781  peer->ledgerReplayMsgHandler_.processReplayDeltaRequest(m);
1782  if (reply.has_error())
1783  {
1784  if (reply.error() == protocol::TMReplyError::reBAD_REQUEST)
1785  peer->charge(Resource::feeInvalidRequest);
1786  else
1787  peer->charge(Resource::feeRequestNoReply);
1788  }
1789  else
1790  {
1791  peer->send(std::make_shared<Message>(
1792  reply, protocol::mtREPLAY_DELTA_RESPONSE));
1793  }
1794  }
1795  });
1796 }
1797 
1798 void
1800 {
1801  if (!ledgerReplayEnabled_)
1802  {
1804  return;
1805  }
1806 
1808  {
1810  }
1811 }
1812 
1813 void
1815 {
1816  auto badData = [&](std::string const& msg) {
1818  JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
1819  };
1820 
1821  // Verify ledger hash
1822  if (!stringIsUint256Sized(m->ledgerhash()))
1823  return badData("Invalid ledger hash");
1824 
1825  // Verify ledger sequence
1826  {
1827  auto const ledgerSeq{m->ledgerseq()};
1828  if (m->type() == protocol::liTS_CANDIDATE)
1829  {
1830  if (ledgerSeq != 0)
1831  {
1832  return badData(
1833  "Invalid ledger sequence " + std::to_string(ledgerSeq));
1834  }
1835  }
1836  else
1837  {
1838  // Verifying the network's earliest ledger only pertains to shards.
1839  if (app_.getShardStore() &&
1840  ledgerSeq < app_.getNodeStore().earliestLedgerSeq())
1841  {
1842  return badData(
1843  "Invalid ledger sequence " + std::to_string(ledgerSeq));
1844  }
1845 
1846  // Check if within a reasonable range
1847  using namespace std::chrono_literals;
1848  if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
1849  ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
1850  {
1851  return badData(
1852  "Invalid ledger sequence " + std::to_string(ledgerSeq));
1853  }
1854  }
1855  }
1856 
1857  // Verify ledger info type
1858  if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
1859  return badData("Invalid ledger info type");
1860 
1861  // Verify reply error
1862  if (m->has_error() &&
1863  (m->error() < protocol::reNO_LEDGER ||
1864  m->error() > protocol::reBAD_REQUEST))
1865  {
1866  return badData("Invalid reply error");
1867  }
1868 
1869  // Verify ledger nodes.
1870  if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::hardMaxReplyNodes)
1871  {
1872  return badData(
1873  "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
1874  }
1875 
1876  // If there is a request cookie, attempt to relay the message
1877  if (m->has_requestcookie())
1878  {
1879  if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
1880  {
1881  m->clear_requestcookie();
1882  peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
1883  }
1884  else
1885  {
1886  JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
1887  }
1888  return;
1889  }
1890 
1891  uint256 const ledgerHash{m->ledgerhash()};
1892 
1893  // Otherwise check if received data for a candidate transaction set
1894  if (m->type() == protocol::liTS_CANDIDATE)
1895  {
1898  jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m]() {
1899  if (auto peer = weak.lock())
1900  {
1901  peer->app_.getInboundTransactions().gotData(
1902  ledgerHash, peer, m);
1903  }
1904  });
1905  return;
1906  }
1907 
1908  // Consume the message
1910 }
1911 
1912 void
1914 {
1915  protocol::TMProposeSet& set = *m;
1916 
1917  auto const sig = makeSlice(set.signature());
1918 
1919  // Preliminary check for the validity of the signature: A DER encoded
1920  // signature can't be longer than 72 bytes.
1921  if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
1922  (publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
1923  {
1924  JLOG(p_journal_.warn()) << "Proposal: malformed";
1926  return;
1927  }
1928 
1929  if (!stringIsUint256Sized(set.currenttxhash()) ||
1930  !stringIsUint256Sized(set.previousledger()))
1931  {
1932  JLOG(p_journal_.warn()) << "Proposal: malformed";
1934  return;
1935  }
1936 
1937  // RH TODO: when isTrusted = false we should probably also cache a key
1938  // suppression for 30 seconds to avoid doing a relatively expensive lookup
1939  // every time a spam packet is received
1940  PublicKey const publicKey{makeSlice(set.nodepubkey())};
1941  auto const isTrusted = app_.validators().trusted(publicKey);
1942 
1943  // If the operator has specified that untrusted proposals be dropped then
1944  // this happens here I.e. before further wasting CPU verifying the signature
1945  // of an untrusted key
1946  if (!isTrusted && app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
1947  return;
1948 
1949  uint256 const proposeHash{set.currenttxhash()};
1950  uint256 const prevLedger{set.previousledger()};
1951 
1952  NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
1953 
1954  uint256 const suppression = proposalUniqueId(
1955  proposeHash,
1956  prevLedger,
1957  set.proposeseq(),
1958  closeTime,
1959  publicKey.slice(),
1960  sig);
1961 
1962  if (auto [added, relayed] =
1964  !added)
1965  {
1966  // Count unique messages (Slots has it's own 'HashRouter'), which a peer
1967  // receives within IDLED seconds since the message has been relayed.
1968  if (reduceRelayReady() && relayed &&
1969  (stopwatch().now() - *relayed) < reduce_relay::IDLED)
1971  suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
1972  JLOG(p_journal_.trace()) << "Proposal: duplicate";
1973  return;
1974  }
1975 
1976  if (!isTrusted)
1977  {
1979  {
1980  JLOG(p_journal_.debug())
1981  << "Proposal: Dropping untrusted (peer divergence)";
1982  return;
1983  }
1984 
1985  if (!cluster() && app_.getFeeTrack().isLoadedLocal())
1986  {
1987  JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
1988  return;
1989  }
1990  }
1991 
1992  JLOG(p_journal_.trace())
1993  << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
1994 
1995  auto proposal = RCLCxPeerPos(
1996  publicKey,
1997  sig,
1998  suppression,
2000  prevLedger,
2001  set.proposeseq(),
2002  proposeHash,
2003  closeTime,
2006 
2009  isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
2010  "recvPropose->checkPropose",
2011  [weak, isTrusted, m, proposal]() {
2012  if (auto peer = weak.lock())
2013  peer->checkPropose(isTrusted, m, proposal);
2014  });
2015 }
2016 
2017 void
2019 {
2020  JLOG(p_journal_.trace()) << "Status: Change";
2021 
2022  if (!m->has_networktime())
2023  m->set_networktime(app_.timeKeeper().now().time_since_epoch().count());
2024 
2025  {
2027  if (!last_status_.has_newstatus() || m->has_newstatus())
2028  last_status_ = *m;
2029  else
2030  {
2031  // preserve old status
2032  protocol::NodeStatus status = last_status_.newstatus();
2033  last_status_ = *m;
2034  m->set_newstatus(status);
2035  }
2036  }
2037 
2038  if (m->newevent() == protocol::neLOST_SYNC)
2039  {
2040  bool outOfSync{false};
2041  {
2042  // Operations on closedLedgerHash_ and previousLedgerHash_ must be
2043  // guarded by recentLock_.
2045  if (!closedLedgerHash_.isZero())
2046  {
2047  outOfSync = true;
2049  }
2051  }
2052  if (outOfSync)
2053  {
2054  JLOG(p_journal_.debug()) << "Status: Out of sync";
2055  }
2056  return;
2057  }
2058 
2059  {
2060  uint256 closedLedgerHash{};
2061  bool const peerChangedLedgers{
2062  m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
2063 
2064  {
2065  // Operations on closedLedgerHash_ and previousLedgerHash_ must be
2066  // guarded by recentLock_.
2068  if (peerChangedLedgers)
2069  {
2070  closedLedgerHash_ = m->ledgerhash();
2071  closedLedgerHash = closedLedgerHash_;
2072  addLedger(closedLedgerHash, sl);
2073  }
2074  else
2075  {
2077  }
2078 
2079  if (m->has_ledgerhashprevious() &&
2080  stringIsUint256Sized(m->ledgerhashprevious()))
2081  {
2082  previousLedgerHash_ = m->ledgerhashprevious();
2084  }
2085  else
2086  {
2088  }
2089  }
2090  if (peerChangedLedgers)
2091  {
2092  JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
2093  }
2094  else
2095  {
2096  JLOG(p_journal_.debug()) << "Status: No ledger";
2097  }
2098  }
2099 
2100  if (m->has_firstseq() && m->has_lastseq())
2101  {
2103 
2104  minLedger_ = m->firstseq();
2105  maxLedger_ = m->lastseq();
2106 
2107  if ((maxLedger_ < minLedger_) || (minLedger_ == 0) || (maxLedger_ == 0))
2108  minLedger_ = maxLedger_ = 0;
2109  }
2110 
2111  if (m->has_ledgerseq() &&
2113  {
2114  checkTracking(
2115  m->ledgerseq(), app_.getLedgerMaster().getValidLedgerIndex());
2116  }
2117 
2118  app_.getOPs().pubPeerStatus([=, this]() -> Json::Value {
2120 
2121  if (m->has_newstatus())
2122  {
2123  switch (m->newstatus())
2124  {
2125  case protocol::nsCONNECTING:
2126  j[jss::status] = "CONNECTING";
2127  break;
2128  case protocol::nsCONNECTED:
2129  j[jss::status] = "CONNECTED";
2130  break;
2131  case protocol::nsMONITORING:
2132  j[jss::status] = "MONITORING";
2133  break;
2134  case protocol::nsVALIDATING:
2135  j[jss::status] = "VALIDATING";
2136  break;
2137  case protocol::nsSHUTTING:
2138  j[jss::status] = "SHUTTING";
2139  break;
2140  }
2141  }
2142 
2143  if (m->has_newevent())
2144  {
2145  switch (m->newevent())
2146  {
2147  case protocol::neCLOSING_LEDGER:
2148  j[jss::action] = "CLOSING_LEDGER";
2149  break;
2150  case protocol::neACCEPTED_LEDGER:
2151  j[jss::action] = "ACCEPTED_LEDGER";
2152  break;
2153  case protocol::neSWITCHED_LEDGER:
2154  j[jss::action] = "SWITCHED_LEDGER";
2155  break;
2156  case protocol::neLOST_SYNC:
2157  j[jss::action] = "LOST_SYNC";
2158  break;
2159  }
2160  }
2161 
2162  if (m->has_ledgerseq())
2163  {
2164  j[jss::ledger_index] = m->ledgerseq();
2165  }
2166 
2167  if (m->has_ledgerhash())
2168  {
2169  uint256 closedLedgerHash{};
2170  {
2171  std::lock_guard sl(recentLock_);
2172  closedLedgerHash = closedLedgerHash_;
2173  }
2174  j[jss::ledger_hash] = to_string(closedLedgerHash);
2175  }
2176 
2177  if (m->has_networktime())
2178  {
2179  j[jss::date] = Json::UInt(m->networktime());
2180  }
2181 
2182  if (m->has_firstseq() && m->has_lastseq())
2183  {
2184  j[jss::ledger_index_min] = Json::UInt(m->firstseq());
2185  j[jss::ledger_index_max] = Json::UInt(m->lastseq());
2186  }
2187 
2188  return j;
2189  });
2190 }
2191 
2192 void
2193 PeerImp::checkTracking(std::uint32_t validationSeq)
2194 {
2195  std::uint32_t serverSeq;
2196  {
2197  // Extract the sequence number of the highest
2198  // ledger this peer has
2199  std::lock_guard sl(recentLock_);
2200 
2201  serverSeq = maxLedger_;
2202  }
2203  if (serverSeq != 0)
2204  {
2205  // Compare the peer's ledger sequence to the
2206  // sequence of a recently-validated ledger
2207  checkTracking(serverSeq, validationSeq);
2208  }
2209 }
2210 
2211 void
2212 PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
2213 {
2214  int diff = std::max(seq1, seq2) - std::min(seq1, seq2);
2215 
2216  if (diff < Tuning::convergedLedgerLimit)
2217  {
2218  // The peer's ledger sequence is close to the validation's
2219  tracking_ = Tracking::converged;
2220  }
2221 
2222  if ((diff > Tuning::divergedLedgerLimit) &&
2223  (tracking_.load() != Tracking::diverged))
2224  {
2225  // The peer's ledger sequence is way off the validation's
2226  std::lock_guard sl(recentLock_);
2227 
2228  tracking_ = Tracking::diverged;
2229  trackingTime_ = clock_type::now();
2230  }
2231 }
2232 
2233 void
2235 {
2236  if (!stringIsUint256Sized(m->hash()))
2237  {
2238  fee_ = Resource::feeInvalidRequest;
2239  return;
2240  }
2241 
2242  uint256 const hash{m->hash()};
2243 
2244  if (m->status() == protocol::tsHAVE)
2245  {
2246  std::lock_guard sl(recentLock_);
2247 
2248  if (std::find(recentTxSets_.begin(), recentTxSets_.end(), hash) !=
2249  recentTxSets_.end())
2250  {
2251  fee_ = Resource::feeUnwantedData;
2252  return;
2253  }
2254 
2255  recentTxSets_.push_back(hash);
2256  }
2257 }
2258 
2259 void
2260 PeerImp::onValidatorListMessage(
2261  std::string const& messageType,
2262  std::string const& manifest,
2263  std::uint32_t version,
2264  std::vector<ValidatorBlobInfo> const& blobs)
2265 {
2266  // If there are no blobs, the message is malformed (possibly because of
2267  // ValidatorList class rules), so charge accordingly and skip processing.
2268  if (blobs.empty())
2269  {
2270  JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
2271  << " from peer " << remote_address_;
2272  // This shouldn't ever happen with a well-behaved peer
2273  fee_ = Resource::feeHighBurdenPeer;
2274  return;
2275  }
2276 
2277  auto const hash = sha512Half(manifest, blobs, version);
2278 
2279  JLOG(p_journal_.debug())
2280  << "Received " << messageType << " from " << remote_address_.to_string()
2281  << " (" << id_ << ")";
2282 
2283  if (!app_.getHashRouter().addSuppressionPeer(hash, id_))
2284  {
2285  JLOG(p_journal_.debug())
2286  << messageType << ": received duplicate " << messageType;
2287  // Charging this fee here won't hurt the peer in the normal
2288  // course of operation (ie. refresh every 5 minutes), but
2289  // will add up if the peer is misbehaving.
2290  fee_ = Resource::feeUnwantedData;
2291  return;
2292  }
2293 
2294  auto const applyResult = app_.validators().applyListsAndBroadcast(
2295  manifest,
2296  version,
2297  blobs,
2298  remote_address_.to_string(),
2299  hash,
2300  app_.overlay(),
2301  app_.getHashRouter(),
2302  app_.getOPs());
2303 
2304  JLOG(p_journal_.debug())
2305  << "Processed " << messageType << " version " << version << " from "
2306  << (applyResult.publisherKey ? strHex(*applyResult.publisherKey)
2307  : "unknown or invalid publisher")
2308  << " from " << remote_address_.to_string() << " (" << id_
2309  << ") with best result " << to_string(applyResult.bestDisposition());
2310 
2311  // Act based on the best result
2312  switch (applyResult.bestDisposition())
2313  {
2314  // New list
2315  case ListDisposition::accepted:
2316  // Newest list is expired, and that needs to be broadcast, too
2317  case ListDisposition::expired:
2318  // Future list
2319  case ListDisposition::pending: {
2320  std::lock_guard<std::mutex> sl(recentLock_);
2321 
2322  assert(applyResult.publisherKey);
2323  auto const& pubKey = *applyResult.publisherKey;
2324 #ifndef NDEBUG
2325  if (auto const iter = publisherListSequences_.find(pubKey);
2326  iter != publisherListSequences_.end())
2327  {
2328  assert(iter->second < applyResult.sequence);
2329  }
2330 #endif
2331  publisherListSequences_[pubKey] = applyResult.sequence;
2332  }
2333  break;
2334  case ListDisposition::same_sequence:
2335  case ListDisposition::known_sequence:
2336 #ifndef NDEBUG
2337  {
2338  std::lock_guard<std::mutex> sl(recentLock_);
2339  assert(applyResult.sequence && applyResult.publisherKey);
2340  assert(
2341  publisherListSequences_[*applyResult.publisherKey] <=
2342  applyResult.sequence);
2343  }
2344 #endif // !NDEBUG
2345 
2346  break;
2347  case ListDisposition::stale:
2348  case ListDisposition::untrusted:
2349  case ListDisposition::invalid:
2350  case ListDisposition::unsupported_version:
2351  break;
2352  default:
2353  assert(false);
2354  }
2355 
2356  // Charge based on the worst result
2357  switch (applyResult.worstDisposition())
2358  {
2359  case ListDisposition::accepted:
2360  case ListDisposition::expired:
2361  case ListDisposition::pending:
2362  // No charges for good data
2363  break;
2364  case ListDisposition::same_sequence:
2365  case ListDisposition::known_sequence:
2366  // Charging this fee here won't hurt the peer in the normal
2367  // course of operation (ie. refresh every 5 minutes), but
2368  // will add up if the peer is misbehaving.
2369  fee_ = Resource::feeUnwantedData;
2370  break;
2371  case ListDisposition::stale:
2372  // There are very few good reasons for a peer to send an
2373  // old list, particularly more than once.
2374  fee_ = Resource::feeBadData;
2375  break;
2376  case ListDisposition::untrusted:
2377  // Charging this fee here won't hurt the peer in the normal
2378  // course of operation (ie. refresh every 5 minutes), but
2379  // will add up if the peer is misbehaving.
2380  fee_ = Resource::feeUnwantedData;
2381  break;
2382  case ListDisposition::invalid:
2383  // This shouldn't ever happen with a well-behaved peer
2384  fee_ = Resource::feeInvalidSignature;
2385  break;
2386  case ListDisposition::unsupported_version:
2387  // During a version transition, this may be legitimate.
2388  // If it happens frequently, that's probably bad.
2389  fee_ = Resource::feeBadData;
2390  break;
2391  default:
2392  assert(false);
2393  }
2394 
2395  // Log based on all the results.
2396  for (auto const& [disp, count] : applyResult.dispositions)
2397  {
2398  switch (disp)
2399  {
2400  // New list
2401  case ListDisposition::accepted:
2402  JLOG(p_journal_.debug())
2403  << "Applied " << count << " new " << messageType
2404  << "(s) from peer " << remote_address_;
2405  break;
2406  // Newest list is expired, and that needs to be broadcast, too
2407  case ListDisposition::expired:
2408  JLOG(p_journal_.debug())
2409  << "Applied " << count << " expired " << messageType
2410  << "(s) from peer " << remote_address_;
2411  break;
2412  // Future list
2413  case ListDisposition::pending:
2414  JLOG(p_journal_.debug())
2415  << "Processed " << count << " future " << messageType
2416  << "(s) from peer " << remote_address_;
2417  break;
2418  case ListDisposition::same_sequence:
2419  JLOG(p_journal_.warn())
2420  << "Ignored " << count << " " << messageType
2421  << "(s) with current sequence from peer "
2422  << remote_address_;
2423  break;
2424  case ListDisposition::known_sequence:
2425  JLOG(p_journal_.warn())
2426  << "Ignored " << count << " " << messageType
2427  << "(s) with future sequence from peer " << remote_address_;
2428  break;
2429  case ListDisposition::stale:
2430  JLOG(p_journal_.warn())
2431  << "Ignored " << count << "stale " << messageType
2432  << "(s) from peer " << remote_address_;
2433  break;
2434  case ListDisposition::untrusted:
2435  JLOG(p_journal_.warn())
2436  << "Ignored " << count << " untrusted " << messageType
2437  << "(s) from peer " << remote_address_;
2438  break;
2439  case ListDisposition::unsupported_version:
2440  JLOG(p_journal_.warn())
2441  << "Ignored " << count << "unsupported version "
2442  << messageType << "(s) from peer " << remote_address_;
2443  break;
2444  case ListDisposition::invalid:
2445  JLOG(p_journal_.warn())
2446  << "Ignored " << count << "invalid " << messageType
2447  << "(s) from peer " << remote_address_;
2448  break;
2449  default:
2450  assert(false);
2451  }
2452  }
2453 }
2454 
2455 void
2457 {
2458  try
2459  {
2460  if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
2461  {
2462  JLOG(p_journal_.debug())
2463  << "ValidatorList: received validator list from peer using "
2464  << "protocol version " << to_string(protocol_)
2465  << " which shouldn't support this feature.";
2466  fee_ = Resource::feeUnwantedData;
2467  return;
2468  }
2469  onValidatorListMessage(
2470  "ValidatorList",
2471  m->manifest(),
2472  m->version(),
2473  ValidatorList::parseBlobs(*m));
2474  }
2475  catch (std::exception const& e)
2476  {
2477  JLOG(p_journal_.warn()) << "ValidatorList: Exception, " << e.what()
2478  << " from peer " << remote_address_;
2479  fee_ = Resource::feeBadData;
2480  }
2481 }
2482 
2483 void
2484 PeerImp::onMessage(
2486 {
2487  try
2488  {
2489  if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
2490  {
2491  JLOG(p_journal_.debug())
2492  << "ValidatorListCollection: received validator list from peer "
2493  << "using protocol version " << to_string(protocol_)
2494  << " which shouldn't support this feature.";
2495  fee_ = Resource::feeUnwantedData;
2496  return;
2497  }
2498  else if (m->version() < 2)
2499  {
2500  JLOG(p_journal_.debug())
2501  << "ValidatorListCollection: received invalid validator list "
2502  "version "
2503  << m->version() << " from peer using protocol version "
2504  << to_string(protocol_);
2505  fee_ = Resource::feeBadData;
2506  return;
2507  }
2508  onValidatorListMessage(
2509  "ValidatorListCollection",
2510  m->manifest(),
2511  m->version(),
2512  ValidatorList::parseBlobs(*m));
2513  }
2514  catch (std::exception const& e)
2515  {
2516  JLOG(p_journal_.warn()) << "ValidatorListCollection: Exception, "
2517  << e.what() << " from peer " << remote_address_;
2518  fee_ = Resource::feeBadData;
2519  }
2520 }
2521 
2522 void
2523 PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
2524 {
2525  if (m->validation().size() < 50)
2526  {
2527  JLOG(p_journal_.warn()) << "Validation: Too small";
2528  fee_ = Resource::feeInvalidRequest;
2529  return;
2530  }
2531 
2532  try
2533  {
2534  auto const closeTime = app_.timeKeeper().closeTime();
2535 
2537  {
2538  SerialIter sit(makeSlice(m->validation()));
2539  val = std::make_shared<STValidation>(
2540  std::ref(sit),
2541  [this](PublicKey const& pk) {
2542  return calcNodeID(
2543  app_.validatorManifests().getMasterKey(pk));
2544  },
2545  false);
2546  val->setSeen(closeTime);
2547  }
2548 
2549  if (!isCurrent(
2550  app_.getValidations().parms(),
2551  app_.timeKeeper().closeTime(),
2552  val->getSignTime(),
2553  val->getSeenTime()))
2554  {
2555  JLOG(p_journal_.trace()) << "Validation: Not current";
2556  fee_ = Resource::feeUnwantedData;
2557  return;
2558  }
2559 
2560  // RH TODO: when isTrusted = false we should probably also cache a key
2561  // suppression for 30 seconds to avoid doing a relatively expensive
2562  // lookup every time a spam packet is received
2563  auto const isTrusted =
2564  app_.validators().trusted(val->getSignerPublic());
2565 
2566  // If the operator has specified that untrusted validations be dropped
2567  // then this happens here I.e. before further wasting CPU verifying the
2568  // signature of an untrusted key
2569  if (!isTrusted && app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
2570  return;
2571 
2572  auto key = sha512Half(makeSlice(m->validation()));
2573 
2574  if (auto [added, relayed] =
2575  app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
2576  !added)
2577  {
2578  // Count unique messages (Slots has it's own 'HashRouter'), which a
2579  // peer receives within IDLED seconds since the message has been
2580  // relayed. Wait WAIT_ON_BOOTUP time to let the server establish
2581  // connections to peers.
2582  if (reduceRelayReady() && relayed &&
2583  (stopwatch().now() - *relayed) < reduce_relay::IDLED)
2584  overlay_.updateSlotAndSquelch(
2585  key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
2586  JLOG(p_journal_.trace()) << "Validation: duplicate";
2587  return;
2588  }
2589 
2590  if (!isTrusted && (tracking_.load() == Tracking::diverged))
2591  {
2592  JLOG(p_journal_.debug())
2593  << "Dropping untrusted validation from diverged peer";
2594  }
2595  else if (isTrusted || !app_.getFeeTrack().isLoadedLocal())
2596  {
2597  std::string const name = [isTrusted, val]() {
2598  std::string ret =
2599  isTrusted ? "Trusted validation" : "Untrusted validation";
2600 
2601 #ifdef DEBUG
2602  ret += " " +
2603  std::to_string(val->getFieldU32(sfLedgerSequence)) + ": " +
2604  to_string(val->getNodeID());
2605 #endif
2606 
2607  return ret;
2608  }();
2609 
2610  std::weak_ptr<PeerImp> weak = shared_from_this();
2611  app_.getJobQueue().addJob(
2612  isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
2613  name,
2614  [weak, val, m, key]() {
2615  if (auto peer = weak.lock())
2616  peer->checkValidation(val, key, m);
2617  });
2618  }
2619  else
2620  {
2621  JLOG(p_journal_.debug())
2622  << "Dropping untrusted validation for load";
2623  }
2624  }
2625  catch (std::exception const& e)
2626  {
2627  JLOG(p_journal_.warn())
2628  << "Exception processing validation: " << e.what();
2629  fee_ = Resource::feeInvalidRequest;
2630  }
2631 }
2632 
2633 void
2635 {
2636  protocol::TMGetObjectByHash& packet = *m;
2637 
2638  JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
2639  << " " << packet.objects_size();
2640 
2641  if (packet.query())
2642  {
2643  // this is a query
2644  if (send_queue_.size() >= Tuning::dropSendQueue)
2645  {
2646  JLOG(p_journal_.debug()) << "GetObject: Large send queue";
2647  return;
2648  }
2649 
2650  if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2651  {
2652  doFetchPack(m);
2653  return;
2654  }
2655 
2656  if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
2657  {
2658  if (!txReduceRelayEnabled())
2659  {
2660  JLOG(p_journal_.error())
2661  << "TMGetObjectByHash: tx reduce-relay is disabled";
2662  fee_ = Resource::feeInvalidRequest;
2663  return;
2664  }
2665 
2666  std::weak_ptr<PeerImp> weak = shared_from_this();
2667  app_.getJobQueue().addJob(
2668  jtREQUESTED_TXN, "doTransactions", [weak, m]() {
2669  if (auto peer = weak.lock())
2670  peer->doTransactions(m);
2671  });
2672  return;
2673  }
2674 
2675  fee_ = Resource::feeMediumBurdenPeer;
2676 
2677  protocol::TMGetObjectByHash reply;
2678 
2679  reply.set_query(false);
2680 
2681  if (packet.has_seq())
2682  reply.set_seq(packet.seq());
2683 
2684  reply.set_type(packet.type());
2685 
2686  if (packet.has_ledgerhash())
2687  {
2688  if (!stringIsUint256Sized(packet.ledgerhash()))
2689  {
2690  fee_ = Resource::feeInvalidRequest;
2691  return;
2692  }
2693 
2694  reply.set_ledgerhash(packet.ledgerhash());
2695  }
2696 
2697  // This is a very minimal implementation
2698  for (int i = 0; i < packet.objects_size(); ++i)
2699  {
2700  auto const& obj = packet.objects(i);
2701  if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2702  {
2703  uint256 const hash{obj.hash()};
2704  // VFALCO TODO Move this someplace more sensible so we dont
2705  // need to inject the NodeStore interfaces.
2706  std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
2707  auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
2708  if (!nodeObject)
2709  {
2710  if (auto shardStore = app_.getShardStore())
2711  {
2712  if (seq >= shardStore->earliestLedgerSeq())
2713  nodeObject = shardStore->fetchNodeObject(hash, seq);
2714  }
2715  }
2716  if (nodeObject)
2717  {
2718  protocol::TMIndexedObject& newObj = *reply.add_objects();
2719  newObj.set_hash(hash.begin(), hash.size());
2720  newObj.set_data(
2721  &nodeObject->getData().front(),
2722  nodeObject->getData().size());
2723 
2724  if (obj.has_nodeid())
2725  newObj.set_index(obj.nodeid());
2726  if (obj.has_ledgerseq())
2727  newObj.set_ledgerseq(obj.ledgerseq());
2728 
2729  // VFALCO NOTE "seq" in the message is obsolete
2730  }
2731  }
2732  }
2733 
2734  JLOG(p_journal_.trace()) << "GetObj: " << reply.objects_size() << " of "
2735  << packet.objects_size();
2736  send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
2737  }
2738  else
2739  {
2740  // this is a reply
2741  std::uint32_t pLSeq = 0;
2742  bool pLDo = true;
2743  bool progress = false;
2744 
2745  for (int i = 0; i < packet.objects_size(); ++i)
2746  {
2747  const protocol::TMIndexedObject& obj = packet.objects(i);
2748 
2749  if (obj.has_hash() && stringIsUint256Sized(obj.hash()))
2750  {
2751  if (obj.has_ledgerseq())
2752  {
2753  if (obj.ledgerseq() != pLSeq)
2754  {
2755  if (pLDo && (pLSeq != 0))
2756  {
2757  JLOG(p_journal_.debug())
2758  << "GetObj: Full fetch pack for " << pLSeq;
2759  }
2760  pLSeq = obj.ledgerseq();
2761  pLDo = !app_.getLedgerMaster().haveLedger(pLSeq);
2762 
2763  if (!pLDo)
2764  {
2765  JLOG(p_journal_.debug())
2766  << "GetObj: Late fetch pack for " << pLSeq;
2767  }
2768  else
2769  progress = true;
2770  }
2771  }
2772 
2773  if (pLDo)
2774  {
2775  uint256 const hash{obj.hash()};
2776 
2777  app_.getLedgerMaster().addFetchPack(
2778  hash,
2779  std::make_shared<Blob>(
2780  obj.data().begin(), obj.data().end()));
2781  }
2782  }
2783  }
2784 
2785  if (pLDo && (pLSeq != 0))
2786  {
2787  JLOG(p_journal_.debug())
2788  << "GetObj: Partial fetch pack for " << pLSeq;
2789  }
2790  if (packet.type() == protocol::TMGetObjectByHash::otFETCH_PACK)
2791  app_.getLedgerMaster().gotFetchPack(progress, pLSeq);
2792  }
2793 }
2794 
2795 void
2797 {
2798  if (!txReduceRelayEnabled())
2799  {
2800  JLOG(p_journal_.error())
2801  << "TMHaveTransactions: tx reduce-relay is disabled";
2802  fee_ = Resource::feeInvalidRequest;
2803  return;
2804  }
2805 
2806  std::weak_ptr<PeerImp> weak = shared_from_this();
2807  app_.getJobQueue().addJob(
2808  jtMISSING_TXN, "handleHaveTransactions", [weak, m]() {
2809  if (auto peer = weak.lock())
2810  peer->handleHaveTransactions(m);
2811  });
2812 }
2813 
2814 void
2815 PeerImp::handleHaveTransactions(
2817 {
2818  protocol::TMGetObjectByHash tmBH;
2819  tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
2820  tmBH.set_query(true);
2821 
2822  JLOG(p_journal_.trace())
2823  << "received TMHaveTransactions " << m->hashes_size();
2824 
2825  for (std::uint32_t i = 0; i < m->hashes_size(); i++)
2826  {
2827  if (!stringIsUint256Sized(m->hashes(i)))
2828  {
2829  JLOG(p_journal_.error())
2830  << "TMHaveTransactions with invalid hash size";
2831  fee_ = Resource::feeInvalidRequest;
2832  return;
2833  }
2834 
2835  uint256 hash(m->hashes(i));
2836 
2837  auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
2838 
2839  JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
2840 
2841  if (!txn)
2842  {
2843  JLOG(p_journal_.debug()) << "adding transaction to request";
2844 
2845  auto obj = tmBH.add_objects();
2846  obj->set_hash(hash.data(), hash.size());
2847  }
2848  else
2849  {
2850  // Erase only if a peer has seen this tx. If the peer has not
2851  // seen this tx then the tx could not has been queued for this
2852  // peer.
2853  removeTxQueue(hash);
2854  }
2855  }
2856 
2857  JLOG(p_journal_.trace())
2858  << "transaction request object is " << tmBH.objects_size();
2859 
2860  if (tmBH.objects_size() > 0)
2861  send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
2862 }
2863 
2864 void
2865 PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
2866 {
2867  if (!txReduceRelayEnabled())
2868  {
2869  JLOG(p_journal_.error())
2870  << "TMTransactions: tx reduce-relay is disabled";
2871  fee_ = Resource::feeInvalidRequest;
2872  return;
2873  }
2874 
2875  JLOG(p_journal_.trace())
2876  << "received TMTransactions " << m->transactions_size();
2877 
2878  overlay_.addTxMetrics(m->transactions_size());
2879 
2880  for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
2881  handleTransaction(
2883  m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
2884  false);
2885 }
2886 
2887 void
2888 PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
2889 {
2890  using on_message_fn =
2892  if (!strand_.running_in_this_thread())
2893  return post(
2894  strand_,
2895  std::bind(
2896  (on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
2897 
2898  if (!m->has_validatorpubkey())
2899  {
2900  charge(Resource::feeBadData);
2901  return;
2902  }
2903  auto validator = m->validatorpubkey();
2904  auto const slice{makeSlice(validator)};
2905  if (!publicKeyType(slice))
2906  {
2907  charge(Resource::feeBadData);
2908  return;
2909  }
2910  PublicKey key(slice);
2911 
2912  // Ignore non-validator squelch
2913  if (!app_.validators().listed(key))
2914  {
2915  charge(Resource::feeBadData);
2916  JLOG(p_journal_.debug())
2917  << "onMessage: TMSquelch discarding non-validator squelch "
2918  << slice;
2919  return;
2920  }
2921 
2922  // Ignore the squelch for validator's own messages.
2923  if (key == app_.getValidationPublicKey())
2924  {
2925  JLOG(p_journal_.debug())
2926  << "onMessage: TMSquelch discarding validator's squelch " << slice;
2927  return;
2928  }
2929 
2930  std::uint32_t duration =
2931  m->has_squelchduration() ? m->squelchduration() : 0;
2932  if (!m->squelch())
2933  squelch_.removeSquelch(key);
2934  else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
2935  charge(Resource::feeBadData);
2936 
2937  JLOG(p_journal_.debug())
2938  << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
2939 }
2940 
2941 //--------------------------------------------------------------------------
2942 
2943 void
2944 PeerImp::addLedger(
2945  uint256 const& hash,
2946  std::lock_guard<std::mutex> const& lockedRecentLock)
2947 {
2948  // lockedRecentLock is passed as a reminder that recentLock_ must be
2949  // locked by the caller.
2950  (void)lockedRecentLock;
2951 
2952  if (std::find(recentLedgers_.begin(), recentLedgers_.end(), hash) !=
2953  recentLedgers_.end())
2954  return;
2955 
2956  recentLedgers_.push_back(hash);
2957 }
2958 
2959 void
2960 PeerImp::doFetchPack(const std::shared_ptr<protocol::TMGetObjectByHash>& packet)
2961 {
2962  // VFALCO TODO Invert this dependency using an observer and shared state
2963  // object. Don't queue fetch pack jobs if we're under load or we already
2964  // have some queued.
2965  if (app_.getFeeTrack().isLoadedLocal() ||
2966  (app_.getLedgerMaster().getValidatedLedgerAge() > 40s) ||
2967  (app_.getJobQueue().getJobCount(jtPACK) > 10))
2968  {
2969  JLOG(p_journal_.info()) << "Too busy to make fetch pack";
2970  return;
2971  }
2972 
2973  if (!stringIsUint256Sized(packet->ledgerhash()))
2974  {
2975  JLOG(p_journal_.warn()) << "FetchPack hash size malformed";
2976  fee_ = Resource::feeInvalidRequest;
2977  return;
2978  }
2979 
2980  fee_ = Resource::feeHighBurdenPeer;
2981 
2982  uint256 const hash{packet->ledgerhash()};
2983 
2984  std::weak_ptr<PeerImp> weak = shared_from_this();
2985  auto elapsed = UptimeClock::now();
2986  auto const pap = &app_;
2987  app_.getJobQueue().addJob(
2988  jtPACK, "MakeFetchPack", [pap, weak, packet, hash, elapsed]() {
2989  pap->getLedgerMaster().makeFetchPack(weak, packet, hash, elapsed);
2990  });
2991 }
2992 
2993 void
2994 PeerImp::doTransactions(
2996 {
2997  protocol::TMTransactions reply;
2998 
2999  JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
3000  << packet->objects_size();
3001 
3002  if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
3003  {
3004  JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
3005  fee_ = Resource::feeInvalidRequest;
3006  return;
3007  }
3008 
3009  for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
3010  {
3011  auto const& obj = packet->objects(i);
3012 
3013  if (!stringIsUint256Sized(obj.hash()))
3014  {
3015  fee_ = Resource::feeInvalidRequest;
3016  return;
3017  }
3018 
3019  uint256 hash(obj.hash());
3020 
3021  auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
3022 
3023  if (!txn)
3024  {
3025  JLOG(p_journal_.error()) << "doTransactions, transaction not found "
3026  << Slice(hash.data(), hash.size());
3027  fee_ = Resource::feeInvalidRequest;
3028  return;
3029  }
3030 
3031  Serializer s;
3032  auto tx = reply.add_transactions();
3033  auto sttx = txn->getSTransaction();
3034  sttx->add(s);
3035  tx->set_rawtransaction(s.data(), s.size());
3036  tx->set_status(
3037  txn->getStatus() == INCLUDED ? protocol::tsCURRENT
3038  : protocol::tsNEW);
3039  tx->set_receivetimestamp(
3040  app_.timeKeeper().now().time_since_epoch().count());
3041  tx->set_deferred(txn->getSubmitResult().queued);
3042  }
3043 
3044  if (reply.transactions_size() > 0)
3045  send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
3046 }
3047 
3048 void
3049 PeerImp::checkTransaction(
3050  int flags,
3051  bool checkSignature,
3052  std::shared_ptr<STTx const> const& stx)
3053 {
3054  // VFALCO TODO Rewrite to not use exceptions
3055  try
3056  {
3057  // Expired?
3058  if (stx->isFieldPresent(sfLastLedgerSequence) &&
3059  (stx->getFieldU32(sfLastLedgerSequence) <
3060  app_.getLedgerMaster().getValidLedgerIndex()))
3061  {
3062  app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3063  charge(Resource::feeUnwantedData);
3064  return;
3065  }
3066 
3067  if (checkSignature)
3068  {
3069  // Check the signature before handing off to the job queue.
3070  if (auto [valid, validReason] = checkValidity(
3071  app_.getHashRouter(),
3072  *stx,
3073  app_.getLedgerMaster().getValidatedRules(),
3074  app_.config());
3075  valid != Validity::Valid)
3076  {
3077  if (!validReason.empty())
3078  {
3079  JLOG(p_journal_.trace())
3080  << "Exception checking transaction: " << validReason;
3081  }
3082 
3083  // Probably not necessary to set SF_BAD, but doesn't hurt.
3084  app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3085  charge(Resource::feeInvalidSignature);
3086  return;
3087  }
3088  }
3089  else
3090  {
3091  forceValidity(
3092  app_.getHashRouter(), stx->getTransactionID(), Validity::Valid);
3093  }
3094 
3095  std::string reason;
3096  auto tx = std::make_shared<Transaction>(stx, reason, app_);
3097 
3098  if (tx->getStatus() == INVALID)
3099  {
3100  if (!reason.empty())
3101  {
3102  JLOG(p_journal_.trace())
3103  << "Exception checking transaction: " << reason;
3104  }
3105  app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3106  charge(Resource::feeInvalidSignature);
3107  return;
3108  }
3109 
3110  bool const trusted(flags & SF_TRUSTED);
3111  app_.getOPs().processTransaction(
3112  tx, trusted, false, NetworkOPs::FailHard::no);
3113  }
3114  catch (std::exception const& ex)
3115  {
3116  JLOG(p_journal_.warn())
3117  << "Exception in " << __func__ << ": " << ex.what();
3118  app_.getHashRouter().setFlags(stx->getTransactionID(), SF_BAD);
3119  charge(Resource::feeBadData);
3120  }
3121 }
3122 
3123 // Called from our JobQueue
3124 void
3125 PeerImp::checkPropose(
3126  bool isTrusted,
3128  RCLCxPeerPos peerPos)
3129 {
3130  JLOG(p_journal_.trace())
3131  << "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal";
3132 
3133  assert(packet);
3134 
3135  if (!cluster() && !peerPos.checkSign())
3136  {
3137  JLOG(p_journal_.warn()) << "Proposal fails sig check";
3138  charge(Resource::feeInvalidSignature);
3139  return;
3140  }
3141 
3142  bool relay;
3143 
3144  if (isTrusted)
3145  relay = app_.getOPs().processTrustedProposal(peerPos);
3146  else
3147  relay = app_.config().RELAY_UNTRUSTED_PROPOSALS == 1 || cluster();
3148 
3149  if (relay)
3150  {
3151  // haveMessage contains peers, which are suppressed; i.e. the peers
3152  // are the source of the message, consequently the message should
3153  // not be relayed to these peers. But the message must be counted
3154  // as part of the squelch logic.
3155  auto haveMessage = app_.overlay().relay(
3156  *packet, peerPos.suppressionID(), peerPos.publicKey());
3157  if (reduceRelayReady() && !haveMessage.empty())
3158  overlay_.updateSlotAndSquelch(
3159  peerPos.suppressionID(),
3160  peerPos.publicKey(),
3161  std::move(haveMessage),
3162  protocol::mtPROPOSE_LEDGER);
3163  }
3164 }
3165 
3166 void
3167 PeerImp::checkValidation(
3168  std::shared_ptr<STValidation> const& val,
3169  uint256 const& key,
3171 {
3172  if (!val->isValid())
3173  {
3174  JLOG(p_journal_.debug()) << "Validation forwarded by peer is invalid";
3175  charge(Resource::feeInvalidSignature);
3176  return;
3177  }
3178 
3179  // FIXME it should be safe to remove this try/catch. Investigate codepaths.
3180  try
3181  {
3182  if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
3183  cluster())
3184  {
3185  // haveMessage contains peers, which are suppressed; i.e. the peers
3186  // are the source of the message, consequently the message should
3187  // not be relayed to these peers. But the message must be counted
3188  // as part of the squelch logic.
3189  auto haveMessage =
3190  overlay_.relay(*packet, key, val->getSignerPublic());
3191  if (reduceRelayReady() && !haveMessage.empty())
3192  {
3193  overlay_.updateSlotAndSquelch(
3194  key,
3195  val->getSignerPublic(),
3196  std::move(haveMessage),
3197  protocol::mtVALIDATION);
3198  }
3199  }
3200  }
3201  catch (std::exception const& ex)
3202  {
3203  JLOG(p_journal_.trace())
3204  << "Exception processing validation: " << ex.what();
3205  charge(Resource::feeInvalidRequest);
3206  }
3207 }
3208 
3209 // Returns the set of peers that can help us get
3210 // the TX tree with the specified root hash.
3211 //
3213 getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
3214 {
3216  int retScore = 0;
3217 
3218  ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
3219  if (p->hasTxSet(rootHash) && p.get() != skip)
3220  {
3221  auto score = p->getScore(true);
3222  if (!ret || (score > retScore))
3223  {
3224  ret = std::move(p);
3225  retScore = score;
3226  }
3227  }
3228  });
3229 
3230  return ret;
3231 }
3232 
3233 // Returns a random peer weighted by how likely to
3234 // have the ledger and how responsive it is.
3235 //
3238  OverlayImpl& ov,
3239  uint256 const& ledgerHash,
3240  LedgerIndex ledger,
3241  PeerImp const* skip)
3242 {
3244  int retScore = 0;
3245 
3246  ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
3247  if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
3248  {
3249  auto score = p->getScore(true);
3250  if (!ret || (score > retScore))
3251  {
3252  ret = std::move(p);
3253  retScore = score;
3254  }
3255  }
3256  });
3257 
3258  return ret;
3259 }
3260 
3261 void
3262 PeerImp::sendLedgerBase(
3263  std::shared_ptr<Ledger const> const& ledger,
3264  protocol::TMLedgerData& ledgerData)
3265 {
3266  JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
3267 
3268  Serializer s(sizeof(LedgerInfo));
3269  addRaw(ledger->info(), s);
3270  ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
3271 
3272  auto const& stateMap{ledger->stateMap()};
3273  if (stateMap.getHash() != beast::zero)
3274  {
3275  // Return account state root node if possible
3276  Serializer root(768);
3277 
3278  stateMap.serializeRoot(root);
3279  ledgerData.add_nodes()->set_nodedata(
3280  root.getDataPtr(), root.getLength());
3281 
3282  if (ledger->info().txHash != beast::zero)
3283  {
3284  auto const& txMap{ledger->txMap()};
3285  if (txMap.getHash() != beast::zero)
3286  {
3287  // Return TX root node if possible
3288  root.erase();
3289  txMap.serializeRoot(root);
3290  ledgerData.add_nodes()->set_nodedata(
3291  root.getDataPtr(), root.getLength());
3292  }
3293  }
3294  }
3295 
3296  auto message{
3297  std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
3298  send(message);
3299 }
3300 
3302 PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
3303 {
3304  JLOG(p_journal_.trace()) << "getLedger: Ledger";
3305 
3307 
3308  if (m->has_ledgerhash())
3309  {
3310  // Attempt to find ledger by hash
3311  uint256 const ledgerHash{m->ledgerhash()};
3312  ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
3313  if (!ledger)
3314  {
3315  if (m->has_ledgerseq())
3316  {
3317  // Attempt to find ledger by sequence in the shard store
3318  if (auto shards = app_.getShardStore())
3319  {
3320  if (m->ledgerseq() >= shards->earliestLedgerSeq())
3321  {
3322  ledger =
3323  shards->fetchLedger(ledgerHash, m->ledgerseq());
3324  }
3325  }
3326  }
3327 
3328  if (!ledger)
3329  {
3330  JLOG(p_journal_.trace())
3331  << "getLedger: Don't have ledger with hash " << ledgerHash;
3332 
3333  if (m->has_querytype() && !m->has_requestcookie())
3334  {
3335  // Attempt to relay the request to a peer
3336  if (auto const peer = getPeerWithLedger(
3337  overlay_,
3338  ledgerHash,
3339  m->has_ledgerseq() ? m->ledgerseq() : 0,
3340  this))
3341  {
3342  m->set_requestcookie(id());
3343  peer->send(std::make_shared<Message>(
3344  *m, protocol::mtGET_LEDGER));
3345  JLOG(p_journal_.debug())
3346  << "getLedger: Request relayed to peer";
3347  return ledger;
3348  }
3349 
3350  JLOG(p_journal_.trace())
3351  << "getLedger: Failed to find peer to relay request";
3352  }
3353  }
3354  }
3355  }
3356  else if (m->has_ledgerseq())
3357  {
3358  // Attempt to find ledger by sequence
3359  if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
3360  {
3361  JLOG(p_journal_.debug())
3362  << "getLedger: Early ledger sequence request";
3363  }
3364  else
3365  {
3366  ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
3367  if (!ledger)
3368  {
3369  JLOG(p_journal_.debug())
3370  << "getLedger: Don't have ledger with sequence "
3371  << m->ledgerseq();
3372  }
3373  }
3374  }
3375  else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
3376  {
3377  ledger = app_.getLedgerMaster().getClosedLedger();
3378  }
3379 
3380  if (ledger)
3381  {
3382  // Validate retrieved ledger sequence
3383  auto const ledgerSeq{ledger->info().seq};
3384  if (m->has_ledgerseq())
3385  {
3386  if (ledgerSeq != m->ledgerseq())
3387  {
3388  // Do not resource charge a peer responding to a relay
3389  if (!m->has_requestcookie())
3390  charge(Resource::feeInvalidRequest);
3391 
3392  ledger.reset();
3393  JLOG(p_journal_.warn())
3394  << "getLedger: Invalid ledger sequence " << ledgerSeq;
3395  }
3396  }
3397  else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
3398  {
3399  ledger.reset();
3400  JLOG(p_journal_.debug())
3401  << "getLedger: Early ledger sequence request " << ledgerSeq;
3402  }
3403  }
3404  else
3405  {
3406  JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
3407  }
3408 
3409  return ledger;
3410 }
3411 
3413 PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
3414 {
3415  JLOG(p_journal_.trace()) << "getTxSet: TX set";
3416 
3417  uint256 const txSetHash{m->ledgerhash()};
3418  std::shared_ptr<SHAMap> shaMap{
3419  app_.getInboundTransactions().getSet(txSetHash, false)};
3420  if (!shaMap)
3421  {
3422  if (m->has_querytype() && !m->has_requestcookie())
3423  {
3424  // Attempt to relay the request to a peer
3425  if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
3426  {
3427  m->set_requestcookie(id());
3428  peer->send(
3429  std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
3430  JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
3431  }
3432  else
3433  {
3434  JLOG(p_journal_.debug())
3435  << "getTxSet: Failed to find relay peer";
3436  }
3437  }
3438  else
3439  {
3440  JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
3441  }
3442  }
3443 
3444  return shaMap;
3445 }
3446 
3447 void
3448 PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
3449 {
3450  // Do not resource charge a peer responding to a relay
3451  if (!m->has_requestcookie())
3452  charge(Resource::feeMediumBurdenPeer);
3453 
3456  SHAMap const* map{nullptr};
3457  protocol::TMLedgerData ledgerData;
3458  bool fatLeaves{true};
3459  auto const itype{m->itype()};
3460 
3461  if (itype == protocol::liTS_CANDIDATE)
3462  {
3463  if (sharedMap = getTxSet(m); !sharedMap)
3464  return;
3465  map = sharedMap.get();
3466 
3467  // Fill out the reply
3468  ledgerData.set_ledgerseq(0);
3469  ledgerData.set_ledgerhash(m->ledgerhash());
3470  ledgerData.set_type(protocol::liTS_CANDIDATE);
3471  if (m->has_requestcookie())
3472  ledgerData.set_requestcookie(m->requestcookie());
3473 
3474  // We'll already have most transactions
3475  fatLeaves = false;
3476  }
3477  else
3478  {
3479  if (send_queue_.size() >= Tuning::dropSendQueue)
3480  {
3481  JLOG(p_journal_.debug())
3482  << "processLedgerRequest: Large send queue";
3483  return;
3484  }
3485  if (app_.getFeeTrack().isLoadedLocal() && !cluster())
3486  {
3487  JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
3488  return;
3489  }
3490 
3491  if (ledger = getLedger(m); !ledger)
3492  return;
3493 
3494  // Fill out the reply
3495  auto const ledgerHash{ledger->info().hash};
3496  ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
3497  ledgerData.set_ledgerseq(ledger->info().seq);
3498  ledgerData.set_type(itype);
3499  if (m->has_requestcookie())
3500  ledgerData.set_requestcookie(m->requestcookie());
3501 
3502  switch (itype)
3503  {
3504  case protocol::liBASE:
3505  sendLedgerBase(ledger, ledgerData);
3506  return;
3507 
3508  case protocol::liTX_NODE:
3509  map = &ledger->txMap();
3510  JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash "
3511  << to_string(map->getHash());
3512  break;
3513 
3514  case protocol::liAS_NODE:
3515  map = &ledger->stateMap();
3516  JLOG(p_journal_.trace())
3517  << "processLedgerRequest: Account state map hash "
3518  << to_string(map->getHash());
3519  break;
3520 
3521  default:
3522  // This case should not be possible here
3523  JLOG(p_journal_.error())
3524  << "processLedgerRequest: Invalid ledger info type";
3525  return;
3526  }
3527  }
3528 
3529  if (!map)
3530  {
3531  JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
3532  return;
3533  }
3534 
3535  // Add requested node data to reply
3536  if (m->nodeids_size() > 0)
3537  {
3538  auto const queryDepth{
3539  m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
3540 
3542 
3543  for (int i = 0; i < m->nodeids_size() &&
3544  ledgerData.nodes_size() < Tuning::softMaxReplyNodes;
3545  ++i)
3546  {
3547  auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
3548 
3549  data.clear();
3550  data.reserve(Tuning::softMaxReplyNodes);
3551 
3552  try
3553  {
3554  if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
3555  {
3556  JLOG(p_journal_.trace())
3557  << "processLedgerRequest: getNodeFat got "
3558  << data.size() << " nodes";
3559 
3560  for (auto const& d : data)
3561  {
3562  protocol::TMLedgerNode* node{ledgerData.add_nodes()};
3563  node->set_nodeid(d.first.getRawString());
3564  node->set_nodedata(d.second.data(), d.second.size());
3565  }
3566  }
3567  else
3568  {
3569  JLOG(p_journal_.warn())
3570  << "processLedgerRequest: getNodeFat returns false";
3571  }
3572  }
3573  catch (std::exception const& e)
3574  {
3575  std::string info;
3576  switch (itype)
3577  {
3578  case protocol::liBASE:
3579  // This case should not be possible here
3580  info = "Ledger base";
3581  break;
3582 
3583  case protocol::liTX_NODE:
3584  info = "TX node";
3585  break;
3586 
3587  case protocol::liAS_NODE:
3588  info = "AS node";
3589  break;
3590 
3591  case protocol::liTS_CANDIDATE:
3592  info = "TS candidate";
3593  break;
3594 
3595  default:
3596  info = "Invalid";
3597  break;
3598  }
3599 
3600  if (!m->has_ledgerhash())
3601  info += ", no hash specified";
3602 
3603  JLOG(p_journal_.error())
3604  << "processLedgerRequest: getNodeFat with nodeId "
3605  << *shaMapNodeId << " and ledger info type " << info
3606  << " throws exception: " << e.what();
3607  }
3608  }
3609 
3610  JLOG(p_journal_.info())
3611  << "processLedgerRequest: Got request for " << m->nodeids_size()
3612  << " nodes at depth " << queryDepth << ", return "
3613  << ledgerData.nodes_size() << " nodes";
3614  }
3615 
3616  send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
3617 }
3618 
3619 int
3620 PeerImp::getScore(bool haveItem) const
3621 {
3622  // Random component of score, used to break ties and avoid
3623  // overloading the "best" peer
3624  static const int spRandomMax = 9999;
3625 
3626  // Score for being very likely to have the thing we are
3627  // look for; should be roughly spRandomMax
3628  static const int spHaveItem = 10000;
3629 
3630  // Score reduction for each millisecond of latency; should
3631  // be roughly spRandomMax divided by the maximum reasonable
3632  // latency
3633  static const int spLatency = 30;
3634 
3635  // Penalty for unknown latency; should be roughly spRandomMax
3636  static const int spNoLatency = 8000;
3637 
3638  int score = rand_int(spRandomMax);
3639 
3640  if (haveItem)
3641  score += spHaveItem;
3642 
3644  {
3645  std::lock_guard sl(recentLock_);
3646  latency = latency_;
3647  }
3648 
3649  if (latency)
3650  score -= latency->count() * spLatency;
3651  else
3652  score -= spNoLatency;
3653 
3654  return score;
3655 }
3656 
3657 bool
3658 PeerImp::isHighLatency() const
3659 {
3660  std::lock_guard sl(recentLock_);
3661  return latency_ >= peerHighLatency;
3662 }
3663 
3664 bool
3665 PeerImp::reduceRelayReady()
3666 {
3667  if (!reduceRelayReady_)
3668  reduceRelayReady_ =
3669  reduce_relay::epoch<std::chrono::minutes>(UptimeClock::now()) >
3670  reduce_relay::WAIT_ON_BOOTUP;
3671  return vpReduceRelayEnabled_ && reduceRelayReady_;
3672 }
3673 
3674 void
3675 PeerImp::Metrics::add_message(std::uint64_t bytes)
3676 {
3677  using namespace std::chrono_literals;
3678  std::unique_lock lock{mutex_};
3679 
3680  totalBytes_ += bytes;
3681  accumBytes_ += bytes;
3682  auto const timeElapsed = clock_type::now() - intervalStart_;
3683  auto const timeElapsedInSecs =
3684  std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
3685 
3686  if (timeElapsedInSecs >= 1s)
3687  {
3688  auto const avgBytes = accumBytes_ / timeElapsedInSecs.count();
3689  rollingAvg_.push_back(avgBytes);
3690 
3691  auto const totalBytes =
3692  std::accumulate(rollingAvg_.begin(), rollingAvg_.end(), 0ull);
3693  rollingAvgBytes_ = totalBytes / rollingAvg_.size();
3694 
3695  intervalStart_ = clock_type::now();
3696  accumBytes_ = 0;
3697  }
3698 }
3699 
3701 PeerImp::Metrics::average_bytes() const
3702 {
3703  std::shared_lock lock{mutex_};
3704  return rollingAvgBytes_;
3705 }
3706 
3708 PeerImp::Metrics::total_bytes() const
3709 {
3710  std::shared_lock lock{mutex_};
3711  return totalBytes_;
3712 }
3713 
3714 } // namespace ripple
ripple::PublicKey::data
std::uint8_t const * data() const noexcept
Definition: PublicKey.h:81
ripple::PeerImp::latency_
std::optional< std::chrono::milliseconds > latency_
Definition: PeerImp.h:114
ripple::PeerImp::ledgerRange
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
Definition: PeerImp.cpp:541
ripple::PeerImp::uptime
clock_type::duration uptime() const
Definition: PeerImp.h:352
ripple::Resource::feeInvalidRequest
const Charge feeInvalidRequest
Schedule of fees charged for imposing load on the server.
ripple::Application
Definition: Application.h:115
ripple::ClusterNode
Definition: ClusterNode.h:30
ripple::jtTRANSACTION
@ jtTRANSACTION
Definition: Job.h:63
ripple::PeerImp::inbound_
const bool inbound_
Definition: PeerImp.h:91
ripple::TrafficCount::categorize
static category categorize(::google::protobuf::Message const &message, int type, bool inbound)
Given a protocol message, determine which traffic category it belongs to.
Definition: TrafficCount.cpp:25
sstream
ripple::PeerImp::recentLock_
std::mutex recentLock_
Definition: PeerImp.h:149
ripple::HashRouter::addSuppressionPeerWithStatus
std::pair< bool, std::optional< Stopwatch::time_point > > addSuppressionPeerWithStatus(uint256 const &key, PeerShortID peer)
Add a suppression peer and get message's relay status.
Definition: HashRouter.cpp:57
ripple::RCLCxPeerPos
A peer's signed, proposed position for use in RCLConsensus.
Definition: RCLCxPeerPos.h:43
std::weak_ptr::lock
T lock(T... args)
std::for_each
T for_each(T... args)
ripple::PeerImp::stream_ptr_
std::unique_ptr< stream_type > stream_ptr_
Definition: PeerImp.h:78
ripple::makeSlice
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Definition: Slice.h:241
ripple::Application::cluster
virtual Cluster & cluster()=0
ripple::PeerImp::socket_
socket_type & socket_
Definition: PeerImp.h:79
std::bind
T bind(T... args)
ripple::PeerImp::trackingTime_
clock_type::time_point trackingTime_
Definition: PeerImp.h:97
ripple::ShardState::complete
@ complete
ripple::HashRouter::addSuppressionPeer
bool addSuppressionPeer(uint256 const &key, PeerShortID peer)
Definition: HashRouter.cpp:51
std::string
STL class.
ripple::Resource::feeMediumBurdenPeer
const Charge feeMediumBurdenPeer
std::shared_ptr
STL class.
ripple::Resource::Consumer::disconnect
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition: Consumer.cpp:117
ripple::PeerImp::addTxQueue
void addTxQueue(uint256 const &hash) override
Add transaction's hash to the transactions' hashes queue.
Definition: PeerImp.cpp:314
ripple::PeerImp::onMessage
void onMessage(std::shared_ptr< protocol::TMManifests > const &m)
Definition: PeerImp.cpp:1052
ripple::ManifestCache::getMasterKey
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
Definition: app/misc/impl/Manifest.cpp:303
ripple::jtMANIFEST
@ jtMANIFEST
Definition: Job.h:56
ripple::Overlay::Setup::networkID
std::optional< std::uint32_t > networkID
Definition: Overlay.h:75
std::exception
STL class.
ripple::PeerImp::hasTxSet
bool hasTxSet(uint256 const &hash) const override
Definition: PeerImp.cpp:550
ripple::calcNodeID
NodeID calcNodeID(PublicKey const &pk)
Calculate the 160-bit node ID from a node public key.
Definition: PublicKey.cpp:303
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::PeerImp::strand_
boost::asio::strand< boost::asio::executor > strand_
Definition: PeerImp.h:81
ripple::sfLedgerSequence
const SF_UINT32 sfLedgerSequence
ripple::PeerImp::recentLedgers_
boost::circular_buffer< uint256 > recentLedgers_
Definition: PeerImp.h:111
ripple::PeerImp::ledgerReplayEnabled_
bool ledgerReplayEnabled_
Definition: PeerImp.h:181
ripple::deserializeSHAMapNodeID
std::optional< SHAMapNodeID > deserializeSHAMapNodeID(void const *data, std::size_t size)
Return an object representing a serialized SHAMap Node ID.
Definition: SHAMapNodeID.cpp:101
ripple::PeerImp::request_
http_request_type request_
Definition: PeerImp.h:155
ripple::Resource::Gossip
Data format for exchanging consumption information across peers.
Definition: Gossip.h:29
ripple::Slice
An immutable linear range of bytes.
Definition: Slice.h:44
ripple::PeerImp::~PeerImp
virtual ~PeerImp()
Definition: PeerImp.cpp:134
ripple::relayLimit
static constexpr std::uint32_t relayLimit
Definition: ripple/overlay/Peer.h:36
beast::IP::Endpoint::to_string
std::string to_string() const
Returns a string representing the endpoint.
Definition: IPEndpoint.cpp:57
std::pair
ripple::jtMISSING_TXN
@ jtMISSING_TXN
Definition: Job.h:64
ripple::PeerImp::doAccept
void doAccept()
Definition: PeerImp.cpp:767
std::vector::reserve
T reserve(T... args)
ripple::OverlayImpl::updateSlotAndSquelch
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, std::set< Peer::id_t > &&peers, protocol::MessageType type)
Updates message count for validator/peer.
Definition: OverlayImpl.cpp:1484
ripple::HashRouter::shouldProcess
bool shouldProcess(uint256 const &key, PeerShortID peer, int &flags, std::chrono::seconds tx_interval)
Definition: HashRouter.cpp:78
ripple::PeerImp::handleTransaction
void handleTransaction(std::shared_ptr< protocol::TMTransaction > const &m, bool eraseTxQueue)
Called from onMessage(TMTransaction(s)).
Definition: PeerImp.cpp:1522
ripple::HashPrefix::manifest
@ manifest
Manifest.
ripple::LedgerMaster::getValidLedgerIndex
LedgerIndex getValidLedgerIndex()
Definition: LedgerMaster.cpp:213
ripple::OverlayImpl::endOfPeerChain
void endOfPeerChain(std::uint32_t id)
Called when the reply from the last peer in a peer chain is received.
Definition: OverlayImpl.cpp:774
ripple::addRaw
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
Definition: View.cpp:162
Json::UInt
unsigned int UInt
Definition: json_forwards.h:27
ripple::verify
bool verify(PublicKey const &publicKey, Slice const &m, Slice const &sig, bool mustBeFullyCanonical) noexcept
Verify a signature on a message.
Definition: PublicKey.cpp:272
ripple::PeerImp::doProtocolStart
void doProtocolStart()
Definition: PeerImp.cpp:850
std::vector
STL class.
std::find
T find(T... args)
std::string::size
T size(T... args)
ripple::PeerImp::recentTxSets_
boost::circular_buffer< uint256 > recentTxSets_
Definition: PeerImp.h:112
ripple::PublicKey::empty
bool empty() const noexcept
Definition: PublicKey.h:117
ripple::PeerImp::sendTxQueue
void sendTxQueue() override
Send aggregated transactions' hashes.
Definition: PeerImp.cpp:295
ripple::make_protocol
constexpr ProtocolVersion make_protocol(std::uint16_t major, std::uint16_t minor)
Definition: ProtocolVersion.h:40
ripple::PeerImp::txReduceRelayEnabled
bool txReduceRelayEnabled() const override
Definition: PeerImp.h:429
ripple::FEATURE_LEDGER_REPLAY
static constexpr char FEATURE_LEDGER_REPLAY[]
Definition: Handshake.h:148
std::chrono::milliseconds
ripple::PeerImp::setTimer
void setTimer()
Definition: PeerImp.cpp:655
ripple::ProtocolFeature::LedgerReplay
@ LedgerReplay
ripple::OverlayImpl::incPeerDisconnectCharges
void incPeerDisconnectCharges() override
Definition: OverlayImpl.h:378
ripple::toBase58
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition: AccountID.cpp:104
beast::IP::Endpoint::address
Address const & address() const
Returns the address portion of this endpoint.
Definition: IPEndpoint.h:76
ripple::PeerImp::getVersion
std::string getVersion() const
Return the version of rippled that the peer is running, if reported.
Definition: PeerImp.cpp:372
std::set::emplace
T emplace(T... args)
std::stringstream
STL class.
beast::Journal::warn
Stream warn() const
Definition: Journal.h:327
std::shared_ptr::get
T get(T... args)
std::lock_guard
STL class.
ripple::Application::getShardStore
virtual NodeStore::DatabaseShard * getShardStore()=0
ripple::PeerImp::close
void close()
Definition: PeerImp.cpp:578
ripple::PeerImp::charge
void charge(Resource::Charge const &fee) override
Adjust this peer's load balance based on the type of load imposed.
Definition: PeerImp.cpp:343
ripple::PeerImp::onMessageUnknown
void onMessageUnknown(std::uint16_t type)
Definition: PeerImp.cpp:1003
ripple::makeSharedValue
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
Definition: Handshake.cpp:145
ripple::Cluster::member
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition: Cluster.cpp:39
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
ripple::stopwatch
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition: chrono.h:88
std::setfill
T setfill(T... args)
ripple::PeerImp::journal_
const beast::Journal journal_
Definition: PeerImp.h:76
ripple::PeerImp::send
void send(std::shared_ptr< Message > const &m) override
Definition: PeerImp.cpp:241
ripple::Application::timeKeeper
virtual TimeKeeper & timeKeeper()=0
ripple::OverlayImpl::setup
Setup const & setup() const
Definition: OverlayImpl.h:176
ripple::ProtocolFeature
ProtocolFeature
Definition: ripple/overlay/Peer.h:38
ripple::PeerImp::onTimer
void onTimer(boost::system::error_code const &ec)
Definition: PeerImp.cpp:690
ripple::ShardState
ShardState
Shard states.
Definition: nodestore/Types.h:60
ripple::Cluster::update
bool update(PublicKey const &identity, std::string name, std::uint32_t loadFee=0, NetClock::time_point reportTime=NetClock::time_point{})
Store information about the state of a cluster node.
Definition: Cluster.cpp:58
ripple::PeerImp::lastPingTime_
clock_type::time_point lastPingTime_
Definition: PeerImp.h:116
ripple::OverlayImpl::incJqTransOverflow
void incJqTransOverflow() override
Increment and retrieve counter for transaction job queue overflows.
Definition: OverlayImpl.h:354
ripple::PeerImp
Definition: PeerImp.h:52
ripple::PeerFinder::Config::peerPrivate
bool peerPrivate
true if we want our IP address kept private.
Definition: PeerfinderManager.h:61
ripple::Config::MAX_TRANSACTIONS
int MAX_TRANSACTIONS
Definition: Config.h:231
ripple::PeerImp::previousLedgerHash_
uint256 previousLedgerHash_
Definition: PeerImp.h:109
ripple::FEATURE_VPRR
static constexpr char FEATURE_VPRR[]
Definition: Handshake.h:144
std::optional::reset
T reset(T... args)
ripple::PeerImp::txQueue_
hash_set< uint256 > txQueue_
Definition: PeerImp.h:175
ripple::base_uint::data
pointer data()
Definition: base_uint.h:122
algorithm
ripple::Application::getOPs
virtual NetworkOPs & getOPs()=0
ripple::PeerImp::name_
std::string name_
Definition: PeerImp.h:101
ripple::PeerFinder::Manager::on_endpoints
virtual void on_endpoints(std::shared_ptr< Slot > const &slot, Endpoints const &endpoints)=0
Called when mtENDPOINTS is received.
ripple::forceValidity
void forceValidity(HashRouter &router, uint256 const &txid, Validity validity)
Sets the validity of a given transaction in the cache.
Definition: apply.cpp:89
ripple::Application::getInboundLedgers
virtual InboundLedgers & getInboundLedgers()=0
ripple::Application::getFeeTrack
virtual LoadFeeTrack & getFeeTrack()=0
ripple::base_uint< 256 >::size
constexpr static std::size_t size()
Definition: base_uint.h:519
ripple::ValidatorList::sendValidatorList
static void sendValidatorList(Peer &peer, std::uint64_t peerSequence, PublicKey const &publisherKey, std::size_t maxSequence, std::uint32_t rawVersion, std::string const &rawManifest, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, HashRouter &hashRouter, beast::Journal j)
Definition: ValidatorList.cpp:749
ripple::getPeerWithLedger
static std::shared_ptr< PeerImp > getPeerWithLedger(OverlayImpl &ov, uint256 const &ledgerHash, LedgerIndex ledger, PeerImp const *skip)
Definition: PeerImp.cpp:3237
ripple::PeerImp::publicKey_
const PublicKey publicKey_
Definition: PeerImp.h:100
ripple::protocolMessageName
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
Definition: ProtocolMessage.h:62
ripple::Serializer::data
void const * data() const noexcept
Definition: Serializer.h:75
ripple::PeerImp::read_buffer_
boost::beast::multi_buffer read_buffer_
Definition: PeerImp.h:154
ripple::PeerImp::error_code
boost::system::error_code error_code
Definition: PeerImp.h:62
ripple::JobQueue::getJobCount
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:128
ripple::FEATURE_TXRR
static constexpr char FEATURE_TXRR[]
Definition: Handshake.h:146
std::tie
T tie(T... args)
ripple::PeerImp::remote_address_
const beast::IP::Endpoint remote_address_
Definition: PeerImp.h:86
ripple::publicKeyType
std::optional< KeyType > publicKeyType(Slice const &slice)
Returns the type of public key.
Definition: PublicKey.cpp:207
ripple::jtTXN_DATA
@ jtTXN_DATA
Definition: Job.h:70
ripple::PeerFinder::Manager::on_closed
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
ripple::OverlayImpl::peerFinder
PeerFinder::Manager & peerFinder()
Definition: OverlayImpl.h:164
ripple::getPeerWithTree
static std::shared_ptr< PeerImp > getPeerWithTree(OverlayImpl &ov, uint256 const &rootHash, PeerImp const *skip)
Definition: PeerImp.cpp:3213
ripple::base_uint< 256 >
ripple::INCLUDED
@ INCLUDED
Definition: Transaction.h:48
ripple::LoadFeeTrack::isLoadedLocal
bool isLoadedLocal() const
Definition: LoadFeeTrack.h:126
ripple::PeerImp::addLedger
void addLedger(uint256 const &hash, std::lock_guard< std::mutex > const &lockedRecentLock)
Definition: PeerImp.cpp:2944
ripple::Config::RELAY_UNTRUSTED_PROPOSALS
int RELAY_UNTRUSTED_PROPOSALS
Definition: Config.h:175
ripple::Resource::feeInvalidSignature
const Charge feeInvalidSignature
ripple::OverlayImpl::onManifests
void onManifests(std::shared_ptr< protocol::TMManifests > const &m, std::shared_ptr< PeerImp > const &from)
Definition: OverlayImpl.cpp:625
ripple::Overlay::Setup::public_ip
beast::IP::Address public_ip
Definition: Overlay.h:72
std::enable_shared_from_this< PeerImp >::shared_from_this
T shared_from_this(T... args)
ripple::rand_int
std::enable_if_t< std::is_integral< Integral >::value &&detail::is_engine< Engine >::value, Integral > rand_int(Engine &engine, Integral min, Integral max)
Return a uniformly distributed random integer.
Definition: ripple/basics/random.h:115
ripple::NetworkOPs::isNeedNetworkLedger
virtual bool isNeedNetworkLedger()=0
ripple::Resource::drop
@ drop
Definition: Disposition.h:37
ripple::checkValidity
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition: apply.cpp:37
ripple::jtPROPOSAL_t
@ jtPROPOSAL_t
Definition: Job.h:75
ripple::base_uint::isZero
bool isZero() const
Definition: base_uint.h:532
ripple::OverlayImpl::resourceManager
Resource::Manager & resourceManager()
Definition: OverlayImpl.h:170
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::LedgerReplayMsgHandler::processProofPathResponse
bool processProofPathResponse(std::shared_ptr< protocol::TMProofPathResponse > const &msg)
Process TMProofPathResponse.
Definition: LedgerReplayMsgHandler.cpp:105
ripple::PeerImp::gracefulClose
void gracefulClose()
Definition: PeerImp.cpp:639
ripple::Application::getLedgerMaster
virtual LedgerMaster & getLedgerMaster()=0
ripple::PublicKey
A public key.
Definition: PublicKey.h:59
std::atomic::load
T load(T... args)
ripple::Resource::feeBadData
const Charge feeBadData
ripple::PublicKey::size
std::size_t size() const noexcept
Definition: PublicKey.h:87
ripple::Serializer::getDataPtr
const void * getDataPtr() const
Definition: Serializer.h:189
ripple::Resource::Manager::importConsumers
virtual void importConsumers(std::string const &origin, Gossip const &gossip)=0
Import packaged consumer information.
ripple::PeerImp::closedLedgerHash_
uint256 closedLedgerHash_
Definition: PeerImp.h:108
ripple::PeerImp::detaching_
bool detaching_
Definition: PeerImp.h:98
ripple::PeerImp::onMessageEnd
void onMessageEnd(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m)
Definition: PeerImp.cpp:1043
ripple::Application::config
virtual Config & config()=0
ripple::PeerImp::shardInfos_
hash_map< PublicKey, NodeStore::ShardInfo > shardInfos_
Definition: PeerImp.h:167
ripple::isCurrent
bool isCurrent(ValidationParms const &p, NetClock::time_point now, NetClock::time_point signTime, NetClock::time_point seenTime)
Whether a validation is still current.
Definition: Validations.h:148
beast::Journal::active
bool active(Severity level) const
Returns true if any message would be logged at this severity level.
Definition: Journal.h:301
ripple::PeerImp::stream_
stream_type & stream_
Definition: PeerImp.h:80
ripple::PeerImp::onWriteMessage
void onWriteMessage(error_code ec, std::size_t bytes_transferred)
Definition: PeerImp.cpp:949
std::unique_lock
STL class.
ripple::SHAMap
A SHAMap is both a radix tree with a fan-out of 16 and a Merkle tree.
Definition: SHAMap.h:95
ripple::InfoSub::Source::pubPeerStatus
virtual void pubPeerStatus(std::function< Json::Value(void)> const &)=0
ripple::Application::nodeIdentity
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
ripple::Tuning::hardMaxReplyNodes
@ hardMaxReplyNodes
The hard cap on the number of ledger entries in a single reply.
Definition: overlay/impl/Tuning.h:42
ripple::jtVALIDATION_t
@ jtVALIDATION_t
Definition: Job.h:72
ripple::reduce_relay::IDLED
static constexpr auto IDLED
Definition: ReduceRelayCommon.h:39
ripple::PeerImp::hasRange
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
Definition: PeerImp.cpp:568
ripple::Resource::feeUnwantedData
const Charge feeUnwantedData
ripple::Serializer::addRaw
int addRaw(Blob const &vector)
Definition: Serializer.cpp:100
std::to_string
T to_string(T... args)
ripple::Application::getJobQueue
virtual JobQueue & getJobQueue()=0
ripple::Resource::Gossip::items
std::vector< Item > items
Definition: Gossip.h:42
ripple::PeerImp::cycleStatus
void cycleStatus() override
Definition: PeerImp.cpp:558
ripple::set
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
Definition: BasicConfig.h:313
ripple::PeerImp::app_
Application & app_
Definition: PeerImp.h:72
ripple::PeerImp::crawl
bool crawl() const
Returns true if this connection will publicly share its IP address.
Definition: PeerImp.cpp:357
ripple::PeerImp::minLedger_
LedgerIndex minLedger_
Definition: PeerImp.h:106
ripple::FEATURE_COMPR
static constexpr char FEATURE_COMPR[]
Definition: Handshake.h:142
ripple::Serializer::slice
Slice slice() const noexcept
Definition: Serializer.h:63
ripple::PeerImp::ledgerReplayMsgHandler_
LedgerReplayMsgHandler ledgerReplayMsgHandler_
Definition: PeerImp.h:182
ripple::base64_decode
std::string base64_decode(std::string const &data)
Definition: base64.cpp:245
beast::Journal::error
Stream error() const
Definition: Journal.h:333
beast::Journal::info
Stream info() const
Definition: Journal.h:321
std::chrono::time_point
ripple::ShardState::finalized
@ finalized
ripple::PeerImp::hasLedger
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
Definition: PeerImp.cpp:515
ripple::PeerImp::Tracking::unknown
@ unknown
ripple::Resource::Consumer::balance
int balance()
Returns the credit balance representing consumption.
Definition: Consumer.cpp:129
ripple::HashPrefix::proposal
@ proposal
proposal for signing
ripple::TimeKeeper::closeTime
virtual time_point closeTime() const =0
Returns the close time, in network time.
ripple::PeerImp::headers_
boost::beast::http::fields const & headers_
Definition: PeerImp.h:157
std::accumulate
T accumulate(T... args)
ripple::SerialIter
Definition: Serializer.h:310
ripple::PeerImp::metrics_
struct ripple::PeerImp::@13 metrics_
ripple::peerFeatureEnabled
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
Definition: Handshake.h:199
ripple::PeerImp::reduceRelayReady
bool reduceRelayReady()
Definition: PeerImp.cpp:3665
std::uint32_t
ripple::PeerImp::send_queue_
std::queue< std::shared_ptr< Message > > send_queue_
Definition: PeerImp.h:158
ripple::PeerImp::slot_
const std::shared_ptr< PeerFinder::Slot > slot_
Definition: PeerImp.h:153
ripple::Overlay::foreach
void foreach(Function f) const
Visit every active peer.
Definition: Overlay.h:198
ripple::PeerImp::load_event_
std::unique_ptr< LoadEvent > load_event_
Definition: PeerImp.h:161
ripple::ShardState::finalizing
@ finalizing
std::map
STL class.
ripple::PeerImp::protocol_
ProtocolVersion protocol_
Definition: PeerImp.h:94
ripple::Application::getValidationPublicKey
virtual PublicKey const & getValidationPublicKey() const =0
ripple::Cluster::size
std::size_t size() const
The number of nodes in the cluster list.
Definition: Cluster.cpp:50
std::nth_element
T nth_element(T... args)
memory
ripple::PeerImp::waitable_timer
boost::asio::basic_waitable_timer< std::chrono::steady_clock > waitable_timer
Definition: PeerImp.h:69
ripple::jtPEER
@ jtPEER
Definition: Job.h:81
ripple::NodeStore::ShardInfo
Definition: ShardInfo.h:32
ripple::PeerImp::onShutdown
void onShutdown(error_code ec)
Definition: PeerImp.cpp:751
ripple::proposalUniqueId
uint256 proposalUniqueId(uint256 const &proposeHash, uint256 const &previousLedger, std::uint32_t proposeSeq, NetClock::time_point closeTime, Slice const &publicKey, Slice const &signature)
Calculate a unique identifier for a signed proposal.
Definition: RCLCxPeerPos.cpp:66
ripple::PeerImp::name
std::string name() const
Definition: PeerImp.cpp:833
ripple::Application::validators
virtual ValidatorList & validators()=0
ripple::KeyType::secp256k1
@ secp256k1
ripple::RCLCxPeerPos::publicKey
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Definition: RCLCxPeerPos.h:78
std::weak_ptr
STL class.
ripple::PeerImp::timer_
waitable_timer timer_
Definition: PeerImp.h:82
std::min
T min(T... args)
ripple::Serializer
Definition: Serializer.h:39
ripple::LedgerMaster::getValidatedLedgerAge
std::chrono::seconds getValidatedLedgerAge()
Definition: LedgerMaster.cpp:274
ripple::Resource::Gossip::Item
Describes a single consumer.
Definition: Gossip.h:34
ripple::OverlayImpl::deletePeer
void deletePeer(Peer::id_t id)
Called when the peer is deleted.
Definition: OverlayImpl.cpp:1517
ripple::jtREQUESTED_TXN
@ jtREQUESTED_TXN
Definition: Job.h:65
ripple::PeerImp::Tracking::diverged
@ diverged
ripple::jtPACK
@ jtPACK
Definition: Job.h:43
ripple::PeerImp::gracefulClose_
bool gracefulClose_
Definition: PeerImp.h:159
std::vector::emplace_back
T emplace_back(T... args)
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::InboundLedgers::gotLedgerData
virtual bool gotLedgerData(LedgerHash const &ledgerHash, std::shared_ptr< Peer >, std::shared_ptr< protocol::TMLedgerData >)=0
ripple::Application::getNodeStore
virtual NodeStore::Database & getNodeStore()=0
ripple::Application::validatorManifests
virtual ManifestCache & validatorManifests()=0
ripple::OverlayImpl::getManifestsMessage
std::shared_ptr< Message > getManifestsMessage()
Definition: OverlayImpl.cpp:1276
ripple::Serializer::size
std::size_t size() const noexcept
Definition: Serializer.h:69
ripple::send_if_not
send_if_not_pred< Predicate > send_if_not(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition: predicates.h:107
ripple::ShardState::acquire
@ acquire
protocol
Definition: ValidatorList.h:38
ripple::jtVALIDATION_ut
@ jtVALIDATION_ut
Definition: Job.h:55
ripple::INVALID
@ INVALID
Definition: Transaction.h:47
ripple::reduce_relay::MAX_TX_QUEUE_SIZE
static constexpr std::size_t MAX_TX_QUEUE_SIZE
Definition: ReduceRelayCommon.h:55
ripple::ProtocolFeature::ValidatorList2Propagation
@ ValidatorList2Propagation
ripple::OverlayImpl::remove
void remove(std::shared_ptr< PeerFinder::Slot > const &slot)
Definition: OverlayImpl.cpp:462
ripple::PeerImp::squelch_
reduce_relay::Squelch< UptimeClock > squelch_
Definition: PeerImp.h:119
ripple::Config::TX_REDUCE_RELAY_METRICS
bool TX_REDUCE_RELAY_METRICS
Definition: Config.h:271
ripple::PeerImp::lastPingSeq_
std::optional< std::uint32_t > lastPingSeq_
Definition: PeerImp.h:115
ripple::base_uint::zero
void zero()
Definition: base_uint.h:542
ripple::NodeStore::Database::seqToShardIndex
std::uint32_t seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
Calculates the shard index for a given ledger sequence.
Definition: Database.h:283
ripple::PeerFinder::Manager::config
virtual Config config()=0
Returns the configuration for the manager.
std
STL namespace.
beast::severities::kWarning
@ kWarning
Definition: Journal.h:37
ripple::NodeStore::Database::earliestShardIndex
std::uint32_t earliestShardIndex() const noexcept
Definition: Database.h:246
std::set::insert
T insert(T... args)
ripple::sha512Half
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition: digest.h:216
beast::IP::Endpoint::from_string
static Endpoint from_string(std::string const &s)
Definition: IPEndpoint.cpp:49
ripple::OverlayImpl::activate
void activate(std::shared_ptr< PeerImp > const &peer)
Called when a peer has connected successfully This is called after the peer handshake has been comple...
Definition: OverlayImpl.cpp:594
ripple::OverlayImpl::onPeerDeactivate
void onPeerDeactivate(Peer::id_t id)
Definition: OverlayImpl.cpp:618
ripple::Tuning::readBufferBytes
constexpr std::size_t readBufferBytes
Size of buffer used to read from the socket.
Definition: overlay/impl/Tuning.h:65
ripple::Resource::Gossip::Item::address
beast::IP::Endpoint address
Definition: Gossip.h:39
ripple::LedgerMaster::getCurrentLedgerIndex
LedgerIndex getCurrentLedgerIndex()
Definition: LedgerMaster.cpp:207
ripple::Resource::Consumer
An endpoint that consumes resources.
Definition: Consumer.h:34
ripple::Resource::Charge
A consumption charge.
Definition: Charge.h:30
ripple::Resource::Gossip::Item::balance
int balance
Definition: Gossip.h:38
ripple::TimeKeeper::now
virtual time_point now() const override=0
Returns the estimate of wall time, in network time.
ripple::PeerImp::maxLedger_
LedgerIndex maxLedger_
Definition: PeerImp.h:107
ripple::PeerImp::run
virtual void run()
Definition: PeerImp.cpp:157
ripple::Tuning::targetSendQueue
@ targetSendQueue
How many messages we consider reasonable sustained on a send queue.
Definition: overlay/impl/Tuning.h:52
ripple::LoadFeeTrack::setClusterFee
void setClusterFee(std::uint32_t fee)
Definition: LoadFeeTrack.h:113
ripple::PeerImp::checkTracking
void checkTracking(std::uint32_t validationSeq)
Check if the peer is tracking.
Definition: PeerImp.cpp:2193
ripple::PeerImp::large_sendq_
int large_sendq_
Definition: PeerImp.h:160
ripple::PeerImp::domain
std::string domain() const
Definition: PeerImp.cpp:840
std::string::empty
T empty(T... args)
ripple::Resource::feeLightPeer
const Charge feeLightPeer
ripple::jtREPLAY_REQ
@ jtREPLAY_REQ
Definition: Job.h:59
ripple::jtPROPOSAL_ut
@ jtPROPOSAL_ut
Definition: Job.h:61
ripple::TokenType::NodePublic
@ NodePublic
ripple::PeerImp::last_status_
protocol::TMStatusChange last_status_
Definition: PeerImp.h:150
ripple::RCLCxPeerPos::suppressionID
uint256 const & suppressionID() const
Unique id used by hash router to suppress duplicates.
Definition: RCLCxPeerPos.h:85
ripple::PeerImp::supportsFeature
bool supportsFeature(ProtocolFeature f) const override
Definition: PeerImp.cpp:498
ripple::OverlayImpl::findPeerByPublicKey
std::shared_ptr< Peer > findPeerByPublicKey(PublicKey const &pubKey) override
Returns the peer with the matching public key, or null.
Definition: OverlayImpl.cpp:1209
std::optional
mutex
ripple::PeerImp::getPeerShardInfos
const hash_map< PublicKey, NodeStore::ShardInfo > getPeerShardInfos() const
Definition: PeerImp.cpp:632
ripple::PeerImp::onMessageBegin
void onMessageBegin(std::uint16_t type, std::shared_ptr<::google::protobuf::Message > const &m, std::size_t size, std::size_t uncompressed_size, bool isCompressed)
Definition: PeerImp.cpp:1009
std::stringstream::str
T str(T... args)
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
std::size_t
ripple::to_string
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
Definition: app/misc/impl/Manifest.cpp:41
ripple::PeerImp::json
Json::Value json() override
Definition: PeerImp.cpp:380
ripple::Cluster::for_each
void for_each(std::function< void(ClusterNode const &)> func) const
Invokes the callback once for every cluster node.
Definition: Cluster.cpp:84
ripple::PeerImp::compressionEnabled_
Compressed compressionEnabled_
Definition: PeerImp.h:170
ripple::Tuning::sendqIntervals
@ sendqIntervals
How many timer intervals a sendq has to stay large before we disconnect.
Definition: overlay/impl/Tuning.h:46
ripple::ProtocolFeature::ValidatorListPropagation
@ ValidatorListPropagation
beast::IP::Endpoint
A version-independent IP address and port combination.
Definition: IPEndpoint.h:38
ripple::OverlayImpl::incPeerDisconnect
void incPeerDisconnect() override
Increment and retrieve counters for total peer disconnects, and disconnects we initiate for excessive...
Definition: OverlayImpl.h:366
ripple::OverlayImpl::addTxMetrics
void addTxMetrics(Args... args)
Add tx reduce-relay metrics.
Definition: OverlayImpl.h:447
ripple::Serializer::add32
int add32(std::uint32_t i)
Definition: Serializer.cpp:38
ripple::LedgerInfo
Information about the notional ledger backing the view.
Definition: ReadView.h:75
ripple::strHex
std::string strHex(FwdIt begin, FwdIt end)
Definition: strHex.h:30
std::set::end
T end(T... args)
ripple::PeerFinder::Manager::on_failure
virtual void on_failure(std::shared_ptr< Slot > const &slot)=0
Called when an outbound connection is deemed to have failed.
ripple::PeerImp::makePrefix
static std::string makePrefix(id_t id)
Definition: PeerImp.cpp:682
ripple::PeerImp::usage_
Resource::Consumer usage_
Definition: PeerImp.h:151
std::setw
T setw(T... args)
ripple::NodeStore::Database::earliestLedgerSeq
std::uint32_t earliestLedgerSeq() const noexcept
Definition: Database.h:238
numeric
ripple::OverlayImpl
Definition: OverlayImpl.h:58
std::max
T max(T... args)
ripple::base_uint::parseHex
constexpr bool parseHex(std::string_view sv)
Parse a hex string into a base_uint.
Definition: base_uint.h:496
beast::IP::Endpoint::at_port
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition: IPEndpoint.h:69
ripple::ValidatorList::trusted
bool trusted(PublicKey const &identity) const
Returns true if public key is trusted.
Definition: ValidatorList.cpp:1367
ripple::OverlayImpl::findPeerByShortID
std::shared_ptr< Peer > findPeerByShortID(Peer::id_t const &id) const override
Returns the peer with the matching short id, or null.
Definition: OverlayImpl.cpp:1197
ripple::Serializer::getLength
int getLength() const
Definition: Serializer.h:199
ripple::OverlayImpl::reportTraffic
void reportTraffic(TrafficCount::category cat, bool isInbound, int bytes)
Definition: OverlayImpl.cpp:678
ripple::sfLastLedgerSequence
const SF_UINT32 sfLastLedgerSequence
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:165
ripple::PeerImp::shardInfoMutex_
std::mutex shardInfoMutex_
Definition: PeerImp.h:168
ripple::Resource::Consumer::charge
Disposition charge(Charge const &fee)
Apply a load charge to the consumer.
Definition: Consumer.cpp:99
ripple::PeerImp::overlay_
OverlayImpl & overlay_
Definition: PeerImp.h:90
ripple::makeResponse
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
Definition: Handshake.cpp:392
ripple::http_request_type
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition: Handshake.h:47
std::unique_ptr< stream_type >
ripple::Tuning::sendQueueLogFreq
@ sendQueueLogFreq
How often to log send queue size.
Definition: overlay/impl/Tuning.h:55
ripple::PeerImp::tracking_
std::atomic< Tracking > tracking_
Definition: PeerImp.h:96
ripple::PeerImp::nameMutex_
boost::shared_mutex nameMutex_
Definition: PeerImp.h:102
ripple::PeerImp::cancelTimer
void cancelTimer()
Definition: PeerImp.cpp:673
ripple::invokeProtocolMessage
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
Definition: ProtocolMessage.h:343
std::unordered_map
STL class.
ripple::PeerImp::fee_
Resource::Charge fee_
Definition: PeerImp.h:152
ripple::stringIsUint256Sized
static bool stringIsUint256Sized(std::string const &pBuffStr)
Definition: PeerImp.cpp:151
beast::IP::Endpoint::from_string_checked
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
Definition: IPEndpoint.cpp:35
ripple::ValidatorList::for_each_available
void for_each_available(std::function< void(std::string const &manifest, std::uint32_t version, std::map< std::size_t, ValidatorBlobInfo > const &blobInfos, PublicKey const &pubKey, std::size_t maxSequence, uint256 const &hash)> func) const
Invokes the callback once for every available publisher list's raw data members.
Definition: ValidatorList.cpp:1651
std::set
STL class.
ripple::PeerImp::stop
void stop() override
Definition: PeerImp.cpp:215
ripple::Tuning::maxQueryDepth
@ maxQueryDepth
The maximum number of levels to search.
Definition: overlay/impl/Tuning.h:61
ripple::Application::getHashRouter
virtual HashRouter & getHashRouter()=0
ripple::PeerImp::removeTxQueue
void removeTxQueue(uint256 const &hash) override
Remove transaction's hash from the transactions' hashes queue.
Definition: PeerImp.cpp:331
ripple::PeerImp::Tracking::converged
@ converged
ripple::PeerImp::id_
const id_t id_
Definition: PeerImp.h:73
ripple::OverlayImpl::for_each
void for_each(UnaryFunc &&f) const
Definition: OverlayImpl.h:281
std::ref
T ref(T... args)
ripple::RCLCxPeerPos::checkSign
bool checkSign() const
Verify the signing hash of the proposal.
Definition: RCLCxPeerPos.cpp:48
ripple::LedgerReplayMsgHandler::processReplayDeltaResponse
bool processReplayDeltaResponse(std::shared_ptr< protocol::TMReplayDeltaResponse > const &msg)
Process TMReplayDeltaResponse.
Definition: LedgerReplayMsgHandler.cpp:220
std::exception::what
T what(T... args)
std::shared_lock
STL class.
ripple::PeerImp::fail
void fail(std::string const &reason)
Definition: PeerImp.cpp:600
ripple::PeerImp::cluster
bool cluster() const override
Returns true if this connection is a member of the cluster.
Definition: PeerImp.cpp:366
ripple::ShardState::queued
@ queued
ripple::HashPrefix::shardInfo
@ shardInfo
shard info for signing
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::PeerImp::p_journal_
const beast::Journal p_journal_
Definition: PeerImp.h:77
ripple::Config::MAX_UNKNOWN_TIME
std::chrono::seconds MAX_UNKNOWN_TIME
Definition: Config.h:287
ripple::Peer
Represents a peer connection in the overlay.
Definition: ripple/overlay/Peer.h:45
ripple::Config::MAX_DIVERGED_TIME
std::chrono::seconds MAX_DIVERGED_TIME
Definition: Config.h:290
ripple::jtLEDGER_REQ
@ jtLEDGER_REQ
Definition: Job.h:60
ripple::PeerImp::onReadMessage
void onReadMessage(error_code ec, std::size_t bytes_transferred)
Definition: PeerImp.cpp:893
ripple::root
Number root(Number f, unsigned d)
Definition: Number.cpp:624
ripple::ConsensusProposal< NodeID, uint256, uint256 >
std::chrono::steady_clock::now
T now(T... args)