Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 96 additions & 51 deletions docs/crates/eth-wire.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,70 @@ This crate can be thought of as having 2 components:
2. Abstractions over Tokio Streams that operate on these types.

(Note that ECIES is implemented in a separate `reth-ecies` crate.)
Additionally, this crate focuses on stream implementations (P2P and Eth), handshakes, and multiplexing. The protocol
message types and RLP encoding/decoding live in the separate `eth-wire-types` crate and are re-exported by `eth-wire`
for convenience.
## Types
The most basic Eth-wire type is a `ProtocolMessage`. It describes all messages that reth can send/receive.

[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/message.rs)
[File: crates/net/eth-wire-types/src/message.rs](../../crates/net/eth-wire-types/src/message.rs)
```rust, ignore
/// An `eth` protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProtocolMessage {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
pub message_type: EthMessageID,
pub message: EthMessage,
pub message: EthMessage<N>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum EthMessage {
Status(Status),
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
Status(StatusMessage),
NewBlockHashes(NewBlockHashes),
Transactions(Transactions),
NewPooledTransactionHashes(NewPooledTransactionHashes),
NewBlock(Box<N::NewBlockPayload>),
Transactions(Transactions<N::BroadcastedTransaction>),
NewPooledTransactionHashes66(NewPooledTransactionHashes66),
NewPooledTransactionHashes68(NewPooledTransactionHashes68),
GetBlockHeaders(RequestPair<GetBlockHeaders>),
// ...
BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
GetBlockBodies(RequestPair<GetBlockBodies>),
BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
GetPooledTransactions(RequestPair<GetPooledTransactions>),
PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
GetNodeData(RequestPair<GetNodeData>),
NodeData(RequestPair<NodeData>),
GetReceipts(RequestPair<GetReceipts>),
Receipts(RequestPair<Receipts>),
Receipts(RequestPair<Receipts<N::Receipt>>),
Receipts69(RequestPair<Receipts69<N::Receipt>>),
BlockRangeUpdate(BlockRangeUpdate),
}

/// Represents message IDs for eth protocol messages.
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EthMessageID {
Status = 0x00,
NewBlockHashes = 0x01,
Transactions = 0x02,
// ...
GetBlockHeaders = 0x03,
BlockHeaders = 0x04,
GetBlockBodies = 0x05,
BlockBodies = 0x06,
NewBlock = 0x07,
NewPooledTransactionHashes = 0x08,
GetPooledTransactions = 0x09,
PooledTransactions = 0x0a,
GetNodeData = 0x0d,
NodeData = 0x0e,
GetReceipts = 0x0f,
Receipts = 0x10,
BlockRangeUpdate = 0x11,
}

```
Messages can either be broadcast to the network, or can be a request/response message to a single peer. This 2nd type of message is
described using a `RequestPair` struct, which is simply a concatenation of the underlying message with a request id.

[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/message.rs)
[File: crates/net/eth-wire-types/src/message.rs](../../crates/net/eth-wire-types/src/message.rs)
```rust, ignore
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RequestPair<T> {
Expand All @@ -59,10 +81,8 @@ pub struct RequestPair<T> {
}

```
Every `Ethmessage` has a corresponding rust struct that implements the `Encodable` and `Decodable` traits.
These traits are defined as follows:

[Crate: crates/rlp](https://github.com/paradigmxyz/reth/tree/1563506aea09049a85e5cc72c2894f3f7a371581/crates/rlp)
Every `EthMessage` has a corresponding Rust struct that implements `alloy_rlp::Encodable` and `alloy_rlp::Decodable`
(often via derive macros like `RlpEncodable`/`RlpDecodable`). These traits are defined in `alloy_rlp`:
```rust, ignore
pub trait Decodable: Sized {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self>;
Expand All @@ -72,10 +92,11 @@ pub trait Encodable {
fn length(&self) -> usize;
}
```
These traits describe how the `Ethmessage` should be serialized/deserialized into raw bytes using the RLP format.
In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by the `common/rlp` and `common/rlp-derive` crates.
These traits describe how the `EthMessage` should be serialized/deserialized into raw bytes using the RLP format.
In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by `alloy_rlp` and the derive macros used in `eth-wire-types`.

Note that the `ProtocolMessage` itself implements these traits, so any stream of bytes can be converted into it by calling `ProtocolMessage::decode()` and vice versa with `ProtocolMessage::encode()`. The message type is determined by the first byte of the byte stream.
Note: `ProtocolMessage` implements `Encodable`, while decoding is performed via
`ProtocolMessage::decode_message(version, &mut bytes)` because decoding must respect the negotiated `EthVersion`.

### Example: The Transactions message
Let's understand how an `EthMessage` is implemented by taking a look at the `Transactions` Message. The eth specification describes a Transaction message as a list of RLP-encoded transactions:
Expand All @@ -93,17 +114,17 @@ The items in the list are transactions in the format described in the main Ether

In reth, this is represented as:

[File: crates/net/eth-wire/src/types/broadcast.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/broadcast.rs)
[File: crates/net/eth-wire-types/src/broadcast.rs](../../crates/net/eth-wire-types/src/broadcast.rs)
```rust,ignore
pub struct Transactions(
pub struct Transactions<T = TransactionSigned>(
/// New transactions for the peer to include in its mempool.
pub Vec<TransactionSigned>,
pub Vec<T>,
);
```

And the corresponding trait implementations are present in the primitives crate.
And the corresponding transaction type is defined here:

[File: crates/primitives/src/transaction/mod.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/primitives/src/transaction/mod.rs)
[File: crates/ethereum/primitives/src/transaction.rs](../../crates/ethereum/primitives/src/transaction.rs)
```rust, ignore
#[reth_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)]
Expand Down Expand Up @@ -146,7 +167,7 @@ The lowest level stream to communicate with other peers is the P2P stream. It ta
Decompression/Compression of bytes is done with snappy algorithm ([EIP 706](https://eips.ethereum.org/EIPS/eip-706))
using the external `snap` crate.

[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust,ignore
#[pin_project]
pub struct P2PStream<S> {
Expand All @@ -155,23 +176,29 @@ pub struct P2PStream<S> {
encoder: snap::raw::Encoder,
decoder: snap::raw::Decoder,
pinger: Pinger,
shared_capability: SharedCapability,
/// Negotiated shared capabilities
shared_capabilities: SharedCapabilities,
/// Outgoing messages buffered for sending to the underlying stream.
outgoing_messages: VecDeque<Bytes>,
/// Maximum number of messages that can be buffered before yielding backpressure.
outgoing_message_buffer_capacity: usize,
/// Whether this stream is currently in the process of gracefully disconnecting.
disconnecting: bool,
}
```
### Pinger
To manage pinging, an instance of the `Pinger` struct is used. This is a state machine that keeps track of how many pings
we have sent/received and the timeouts associated with them.
To manage pinging, an instance of the `Pinger` struct is used. This is a state machine that keeps track of pings
we have sent/received and the timeout associated with them.

[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/pinger.rs)
[File: crates/net/eth-wire/src/pinger.rs](../../crates/net/eth-wire/src/pinger.rs)
```rust,ignore
#[derive(Debug)]
pub(crate) struct Pinger {
/// The timer used for the next ping.
ping_interval: Interval,
/// The timer used for the next ping.
/// The timer used to detect a ping timeout.
timeout_timer: Pin<Box<Sleep>>,
/// The timeout duration for each ping.
timeout: Duration,
state: PingState,
}
Expand Down Expand Up @@ -205,7 +232,7 @@ pub(crate) fn poll_ping(
}
}
PingState::WaitingForPong => {
if self.timeout_timer.is_elapsed() {
if self.timeout_timer.as_mut().poll(cx).is_ready() {
self.state = PingState::TimedOut;
return Poll::Ready(Ok(PingerEvent::Timeout))
}
Expand All @@ -223,7 +250,7 @@ To send and receive data, the P2PStream itself is a future that implements the `
For the `Stream` trait, the `inner` stream is polled, decompressed and returned. Most of the code is just
error handling and is omitted here for clarity.

[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust,ignore

impl<S> Stream for P2PStream<S> {
Expand All @@ -240,7 +267,8 @@ impl<S> Stream for P2PStream<S> {
let mut decompress_buf = BytesMut::zeroed(decompressed_len + 1);
this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..])?;
// ... Omitted Error handling
decompress_buf[0] = bytes[0] - this.shared_capability.offset();
// Normalize IDs: reserved p2p range is 0x00..=0x0f; subprotocols start at 0x10
decompress_buf[0] = bytes[0] - MAX_RESERVED_MESSAGE_ID - 1;
return Poll::Ready(Some(Ok(decompress_buf)))
}
}
Expand All @@ -250,15 +278,16 @@ impl<S> Stream for P2PStream<S> {
Similarly, for the `Sink` trait, we do the reverse, compressing and sending data out to the `inner` stream.
The important functions in this trait are shown below.

[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust, ignore
impl<S> Sink<Bytes> for P2PStream<S> {
fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
let this = self.project();
let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1));
let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..])?;
compressed.truncate(compressed_size + 1);
compressed[0] = item[0] + this.shared_capability.offset();
// Mask subprotocol IDs into global space above reserved p2p IDs
compressed[0] = item[0] + MAX_RESERVED_MESSAGE_ID + 1;
this.outgoing_messages.push_back(compressed.freeze());
Ok(())
}
Expand All @@ -285,28 +314,28 @@ impl<S> Sink<Bytes> for P2PStream<S> {


## EthStream
The EthStream is very simple, it does not keep track of any state, it simply wraps the P2Pstream.
The EthStream wraps a stream and handles eth message (RLP) encoding/decoding with respect to the negotiated `EthVersion`.

[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs)
[File: crates/net/eth-wire/src/ethstream.rs](../../crates/net/eth-wire/src/ethstream.rs)
```rust,ignore
#[pin_project]
pub struct EthStream<S> {
#[pin]
inner: S,
}
```
EthStream's only job is to perform the RLP decoding/encoding, using the `ProtocolMessage::decode()` and `ProtocolMessage::encode()`
functions we looked at earlier.
EthStream performs RLP decoding/encoding using `ProtocolMessage::decode_message(version, &mut bytes)`
and `ProtocolMessage::encode()`, and enforces protocol rules (e.g., prohibiting `Status` after handshake).

[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs)
[File: crates/net/eth-wire/src/ethstream.rs](../../crates/net/eth-wire/src/ethstream.rs)
```rust,ignore
impl<S, E> Stream for EthStream<S> {
// ...
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let bytes = ready!(this.inner.poll_next(cx)).unwrap();
// ...
let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) {
let msg = match ProtocolMessage::decode_message(self.version(), &mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => {
return Poll::Ready(Some(Err(err.into())))
Expand All @@ -319,10 +348,12 @@ impl<S, E> Stream for EthStream<S> {
impl<S, E> Sink<EthMessage> for EthStream<S> {
// ...
fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> {
// ...
if matches!(item, EthMessage::Status(_)) {
let _ = self.project().inner.disconnect(DisconnectReason::ProtocolBreach);
return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake))
}
let mut bytes = BytesMut::new();
ProtocolMessage::from(item).encode(&mut bytes);

let bytes = bytes.freeze();
self.project().inner.start_send(bytes)?;
Ok(())
Expand All @@ -339,9 +370,9 @@ For a session to be established, peers in the Ethereum network must first exchan

To perform these, reth has special `Unauthed` versions of streams described above.

The `UnauthedP2Pstream` does the `Hello` handshake and returns a `P2PStream`.
The `UnauthedP2PStream` does the `Hello` handshake and returns a `P2PStream`.

[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust, ignore
#[pin_project]
pub struct UnauthedP2PStream<S> {
Expand All @@ -351,20 +382,34 @@ pub struct UnauthedP2PStream<S> {

impl<S> UnauthedP2PStream<S> {
// ...
pub async fn handshake(mut self, hello: HelloMessage) -> Result<(P2PStream<S>, HelloMessage), Error> {
self.inner.send(alloy_rlp::encode(P2PMessage::Hello(hello.clone())).into()).await?;
pub async fn handshake(mut self, hello: HelloMessageWithProtocols) -> Result<(P2PStream<S>, HelloMessage), Error> {
self.inner.send(alloy_rlp::encode(P2PMessage::Hello(hello.message())).into()).await?;
let first_message_bytes = tokio::time::timeout(HANDSHAKE_TIMEOUT, self.inner.next()).await;

let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..]) {
Ok(P2PMessage::Hello(hello)) => Ok(hello),
// ...
}
}?;
let stream = P2PStream::new(self.inner, capability);
let stream = P2PStream::new(self.inner, shared_capabilities);

Ok((stream, their_hello))
}
}

```
Similarly, UnauthedEthStream does the `Status` handshake and returns an `EthStream`. The code is [here](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs)
Similarly, `UnauthedEthStream` does the `Status` handshake and returns an `EthStream`. It accepts a `UnifiedStatus`
and a `ForkFilter`, and provides a timeout wrapper. The code is [here](../../crates/net/eth-wire/src/ethstream.rs)

### Multiplexing and satellites

`eth-wire` also provides `RlpxProtocolMultiplexer`/`RlpxSatelliteStream` to run the primary `eth` protocol alongside
additional "satellite" protocols (e.g. `snap`) using negotiated `SharedCapabilities`.

## Message variants and versions

- `NewPooledTransactionHashes` differs between ETH66 (`NewPooledTransactionHashes66`) and ETH68 (`NewPooledTransactionHashes68`).
- Starting with ETH67, `GetNodeData` and `NodeData` are removed (decoding them for >=67 yields an error).
- Starting with ETH69:
- `BlockRangeUpdate (0x11)` announces the historical block range served.
- Receipts omit bloom: encoded as `Receipts69` instead of `Receipts`.