Skip to content

Commit

Permalink
Add ProtocolStart and GracefulClose P2P protocol messages (XRPLF#3839)
Browse files Browse the repository at this point in the history
Clean up the peer-to-peer protocol start/close sequences by introducing
START_PROTOCOL and GRACEFUL_CLOSE messages, which sync inbound/outbound
peer send/receive. The GRACEFUL_CLOSE message differentiates application
and link layer failures.

* Introduce the `InboundHandoff` class to manage inbound peer
  instantiation and synchronize the send/receive protocol messages
  between peers.
* Update `OverlayImpl` to utilize the `InboundHandoff` class to manage
  inbound handshakes.
* Update `PeerImp` for improved handling of protocol messages.
* Modify the `Message` class for better maintainability.
* Introduce P2P protocol version `2.3`.
  • Loading branch information
gregtatcam authored Sep 22, 2023
1 parent 5433e13 commit 8f89694
Show file tree
Hide file tree
Showing 14 changed files with 565 additions and 140 deletions.
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ target_sources (rippled PRIVATE
src/ripple/overlay/impl/Cluster.cpp
src/ripple/overlay/impl/ConnectAttempt.cpp
src/ripple/overlay/impl/Handshake.cpp
src/ripple/overlay/impl/InboundHandoff.cpp
src/ripple/overlay/impl/Message.cpp
src/ripple/overlay/impl/OverlayImpl.cpp
src/ripple/overlay/impl/PeerImp.cpp
Expand Down
17 changes: 8 additions & 9 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ class Message : public std::enable_shared_from_this<Message>
return validatorKey_;
}

/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @return Message type
*/
int
getType() const;

private:
std::vector<uint8_t> buffer_;
std::vector<uint8_t> bufferCompressed_;
Expand Down Expand Up @@ -129,15 +137,6 @@ class Message : public std::enable_shared_from_this<Message>
*/
void
compress();

/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @param in Payload header pointer
* @return Message type
*/
int
getType(std::uint8_t const* in) const;
};

} // namespace ripple
Expand Down
1 change: 1 addition & 0 deletions src/ripple/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
StartProtocol
};

/** Represents a peer connection in the overlay. */
Expand Down
44 changes: 44 additions & 0 deletions src/ripple/overlay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,50 @@ transferred between A and B and will not be able to intelligently tamper with th
message stream between Alice and Bob, although she may be still be able to inject
delays or terminate the link.

## Peer Connection Sequence

The _PeerImp_ object can be constructed as either an outbound or an inbound peer.
The outbound peer is constructed by the _ConnectAttempt_ - the client side of
the connection. The inbound peer is constructed by the _InboundHandoff_ -
the server side of the connection. This differentiation of the peers matters only
in terms of the object construction. Once constructed, both inbound and outbound
peer play the same role.

### Outbound Peer

An outbound connection is initiated once a second by
the _OverlayImpl::Timer::on_timer()_ method. This method calls
_OverlayImpl::autoConnect()_, which in turn calls _OverlayImpl::connect()_ for
every outbound endpoint generated by _PeerFinder::autoconnect()_. _connect()_
method constructs _ConnectAttempt_ object. _ConnectAttempt_ attempts to connect
to the provided endpoint and on a successful connection executes the client side
of the handshake protocol described above. If the handshake is successful then
the outbound _PeerImp_ object is constructed and passed to the overlay manager
_OverlayImpl_, which adds the object to the list of peers and children. The latter
maintains a list of objects which might be executing an asynchronous operation
and therefore have to be stopped on shutdown. The outbound _PeerImp_ sends
_TMStartProtocol_ message on start to instruct the connected inbound peer that
the outbound peer is ready to receive the protocol messages.

### Inbound Peer

Construction of the inbound peer is more involved. A multi protocol-server,
_ServerImpl_ located in _src/ripple/server_ module, maintains multiple configured
listening ports. Each listening port allows for multiple protocols including HTTP,
HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol. For simplicity this
sequence describes only the Peer protocol. _ServerImpl_ constructs
_Door_ object for each configured protocol. Each instance of the _Door_ object
accepts connections on the configured port. On a successful connection the _Door_
constructs _SSLHTTPPeer_ object since the Peer protocol always uses SSL
connection. _SSLHTTPPeer_ executes the SSL handshake. If the handshake is successful
then a server handler, _ServerHandlerImpl_ located in _src/ripple/src/impl_, hands off
the connection to the _OverlayImpl::onHandoff()_ method. _onHandoff()_ method
validates the client's HTTP handshake request described above. If the request is
valid then the _InboundHandoff_ object is constructed. _InboundHandoff_ sends
HTTP response to the connected client, constructs the inbound _PeerImp_ object,
and passes it to the overlay manager _OverlayImpl_, which adds the object to
the list of peers and children. Once the inbound _PeerImp_ receives
_TMStartProtocol_ message, it starts sending the protocol messages.

# Ripple Clustering #

Expand Down
185 changes: 185 additions & 0 deletions src/ripple/overlay/impl/InboundHandoff.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/impl/InboundHandoff.h>
#include <ripple/overlay/impl/PeerImp.h>

#include <boost/beast/core/ostream.hpp>

namespace ripple {

InboundHandoff::InboundHandoff(
Application& app,
id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay)
: OverlayImpl::Child(overlay)
, app_(app)
, id_(id)
, sink_(
app_.journal("Peer"),
[id]() {
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}())
, journal_(sink_)
, stream_ptr_(std::move(stream_ptr))
, strand_(stream_ptr_->next_layer().socket().get_executor())
, remote_address_(slot->remote_endpoint())
, protocol_(protocol)
, publicKey_(publicKey)
, usage_(consumer)
, slot_(slot)
, request_(std::move(request))
{
}

void
InboundHandoff::run()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&InboundHandoff::run, shared_from_this()));
sendResponse();
}

void
InboundHandoff::stop()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&InboundHandoff::stop, shared_from_this()));
if (stream_ptr_->next_layer().socket().is_open())
{
JLOG(journal_.debug()) << "Stop";
}
close();
}

void
InboundHandoff::sendResponse()
{
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");

JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);

auto write_buffer = std::make_shared<boost::beast::multi_buffer>();

boost::beast::ostream(*write_buffer) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remote_address_.address(),
*sharedValue,
overlay_.setup().networkID,
protocol_,
app_);

// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
*stream_ptr_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!stream_ptr_->next_layer().socket().is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return createPeer();
return fail("Failed to write header");
}));
}

void
InboundHandoff::fail(std::string const& name, error_code const& ec)
{
if (socket().is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
close();
}

void
InboundHandoff::fail(std::string const& reason)
{
if (journal_.active(beast::severities::kWarning) && socket().is_open())
{
auto const n = app_.cluster().member(publicKey_);
JLOG(journal_.warn())
<< (n ? remote_address_.to_string() : *n) << " failed: " << reason;
}
close();
}

void
InboundHandoff::close()
{
if (socket().is_open())
{
socket().close();
JLOG(journal_.debug()) << "Closed";
}
}

void
InboundHandoff::createPeer()
{
auto peer = std::make_shared<PeerImp>(
app_,
id_,
slot_,
std::move(request_),
publicKey_,
protocol_,
usage_,
std::move(stream_ptr_),
overlay_);

overlay_.add_active(peer);
}

InboundHandoff::socket_type&
InboundHandoff::socket() const
{
return stream_ptr_->next_layer().socket();
}

} // namespace ripple
102 changes: 102 additions & 0 deletions src/ripple/overlay/impl/InboundHandoff.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED
#define RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED

#include <ripple/overlay/impl/OverlayImpl.h>

namespace ripple {

/** Sends HTTP response. Instantiates the inbound peer
* once the response is sent. Maintains all data members
* required for the inbound peer instantiation.
*/
class InboundHandoff : public OverlayImpl::Child,
public std::enable_shared_from_this<InboundHandoff>
{
private:
using error_code = boost::system::error_code;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using id_t = Peer::id_t;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
std::unique_ptr<stream_type> stream_ptr_;
boost::asio::strand<boost::asio::executor> strand_;
beast::IP::Endpoint const remote_address_;
ProtocolVersion protocol_;
PublicKey const publicKey_;
Resource::Consumer usage_;
std::shared_ptr<PeerFinder::Slot> const slot_;
http_request_type request_;

public:
virtual ~InboundHandoff() override = default;

InboundHandoff(
Application& app,
id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay);

// This class isn't meant to be copied
InboundHandoff(InboundHandoff const&) = delete;
InboundHandoff&
operator=(InboundHandoff const&) = delete;

/** Start the handshake */
void
run();
/** Stop the child */
void
stop() override;

private:
/** Send upgrade response to the client */
void
sendResponse();
/** Instantiate and run the overlay peer */
void
createPeer();
/** Log and close */
void
fail(std::string const& name, error_code const& ec);
/** Log and close */
void
fail(std::string const& reason);
/** Close connection */
void
close();
/** Get underlying socket */
socket_type&
socket() const;
};

} // namespace ripple

#endif // RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED
Loading

0 comments on commit 8f89694

Please sign in to comment.