Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions crates/net/eth-wire/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,7 @@ pub enum P2PHandshakeError {
/// An error that can occur when interacting with a [`Pinger`].
#[derive(Debug, thiserror::Error)]
pub enum PingerError {
/// A ping was sent while the pinger was in the `TimedOut` state.
#[error("ping sent while timed out")]
PingWhileTimedOut,

/// A pong was received while the pinger was in the `Ready` state.
/// An unexpected pong was received while the pinger was in the `Ready` state.
#[error("pong received while ready")]
PongWhileReady,

/// A pong was received while the pinger was in the `TimedOut` state.
#[error("pong received while timed out")]
PongWhileTimedOut,
UnexpectedPong,
}
30 changes: 12 additions & 18 deletions crates/net/eth-wire/src/p2pstream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#![allow(dead_code, unreachable_pub, missing_docs, unused_variables)]
use crate::{
capability::SharedCapability,
error::{P2PHandshakeError, P2PStreamError},
pinger::{Pinger, PingerEvent},
};
use bytes::{Buf, Bytes, BytesMut};
use futures::{ready, FutureExt, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
Expand All @@ -14,12 +19,6 @@ use std::{
};
use tokio_stream::Stream;

use crate::{
capability::SharedCapability,
error::{P2PHandshakeError, P2PStreamError},
pinger::{IntervalTimeoutPinger, PingerEvent},
};

/// [`MAX_PAYLOAD_SIZE`] is the maximum size of an uncompressed message payload.
/// This is defined in [EIP-706](https://eips.ethereum.org/EIPS/eip-706).
const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024;
Expand Down Expand Up @@ -47,10 +46,6 @@ const PING_INTERVAL: Duration = Duration::from_secs(60);
/// [`P2PMessage::Disconnect`] message.
const GRACE_PERIOD: Duration = Duration::from_secs(2);

/// [`MAX_FAILED_PINGS`] determines the maximum number of failed ping attempts before disconnecting
/// from a peer.
const MAX_FAILED_PINGS: u8 = 3;

/// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the
/// `Hello` handshake is completed.
#[pin_project]
Expand Down Expand Up @@ -150,7 +145,7 @@ pub struct P2PStream<S> {
decoder: snap::raw::Decoder,

/// The state machine used for keeping track of the peer's ping status.
pinger: IntervalTimeoutPinger,
pinger: Pinger,

/// The supported capability for this stream.
shared_capability: SharedCapability,
Expand All @@ -165,7 +160,7 @@ impl<S> P2PStream<S> {
inner,
encoder: snap::raw::Encoder::new(),
decoder: snap::raw::Decoder::new(),
pinger: IntervalTimeoutPinger::new(MAX_FAILED_PINGS, PING_INTERVAL, PING_TIMEOUT),
pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT),
shared_capability: capability,
}
}
Expand All @@ -183,9 +178,9 @@ where
let mut this = self.project();

// poll the pinger to determine if we should send a ping
let pinger_res = ready!(Pin::new(&mut this.pinger).poll_next(cx));
match pinger_res {
Some(Ok(PingerEvent::Ping)) => {
match this.pinger.poll_ping(cx) {
Poll::Pending => {}
Poll::Ready(Ok(PingerEvent::Ping)) => {
// encode the ping message
let mut ping_bytes = BytesMut::new();
P2PMessage::Ping.encode(&mut ping_bytes);
Expand All @@ -194,7 +189,6 @@ where
let send_res = Pin::new(&mut this.inner).send(ping_bytes.into()).poll_unpin(cx)?;
ready!(send_res)
}
// either None (stream ended) or Some(PingEvent::Timeout) or Err(err)
_ => {
// encode the disconnect message
let mut disconnect_bytes = BytesMut::new();
Expand All @@ -208,7 +202,7 @@ where
// since the ping stream has timed out, let's send a None
return Poll::Ready(None)
}
};
}

// we should loop here to ensure we don't return Poll::Pending if we have a message to
// return behind any pings we need to respond to
Expand Down Expand Up @@ -244,7 +238,7 @@ where
} else if id == P2PMessageID::Pong as u8 {
// TODO: do we need to decode the pong?
// if we were waiting for a pong, this will reset the pinger state
this.pinger.pong_received()?
this.pinger.on_pong()?
} else if id > MAX_P2P_MESSAGE_ID && id <= MAX_RESERVED_MESSAGE_ID {
// we have received an unknown reserved message
return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(id))))
Expand Down
Loading