diff --git a/Cargo.lock b/Cargo.lock index 76615d5719c..643d6f324fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6575,7 +6575,7 @@ version = "1.0.0" dependencies = [ "confy", "humantime-serde", - "reth-network", + "reth-network-types", "reth-prune-types", "serde", "tempfile", @@ -7318,6 +7318,7 @@ dependencies = [ "reth-network-api", "reth-network-p2p", "reth-network-peers", + "reth-network-types", "reth-primitives", "reth-provider", "reth-tasks", @@ -7387,6 +7388,19 @@ dependencies = [ "url", ] +[[package]] +name = "reth-network-types" +version = "1.0.0" +dependencies = [ + "humantime-serde", + "reth-net-banlist", + "reth-network-api", + "reth-network-peers", + "serde", + "serde_json", + "tracing", +] + [[package]] name = "reth-nippy-jar" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index a7e1ab3d92a..4634eb51622 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ members = [ "crates/net/eth-wire/", "crates/net/nat/", "crates/net/network-api/", + "crates/net/network-types/", "crates/net/network/", "crates/net/p2p/", "crates/net/peers/", @@ -305,6 +306,7 @@ reth-net-banlist = { path = "crates/net/banlist" } reth-net-nat = { path = "crates/net/nat" } reth-network = { path = "crates/net/network" } reth-network-api = { path = "crates/net/network-api" } +reth-network-types = { path = "crates/net/network-types" } reth-network-peers = { path = "crates/net/peers", default-features = false } reth-network-p2p = { path = "crates/net/p2p" } reth-nippy-jar = { path = "crates/storage/nippy-jar" } diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index 527f5b1538e..1468fccd32d 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] # reth -reth-network.workspace = true +reth-network-types = { workspace = true, features = ["serde"] } reth-prune-types.workspace = true # serde diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index f458ef41646..5d79549d4e4 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -1,6 +1,6 @@ //! Configuration files. -use reth_network::{PeersConfig, SessionsConfig}; +use reth_network_types::{PeersConfig, SessionsConfig}; use reth_prune_types::PruneModes; use serde::{Deserialize, Deserializer, Serialize}; use std::{ diff --git a/crates/net/network-types/Cargo.toml b/crates/net/network-types/Cargo.toml new file mode 100644 index 00000000000..66c1f4d84a3 --- /dev/null +++ b/crates/net/network-types/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "reth-network-types" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Commonly used network types" + +[lints] +workspace = true + +[dependencies] +# reth +reth-network-api.workspace = true +reth-network-peers.workspace = true +reth-net-banlist.workspace = true + +# io +serde = { workspace = true, optional = true } +humantime-serde = { workspace = true, optional = true } +serde_json = { workspace = true } + +# misc +tracing.workspace = true + +[features] +serde = ["dep:serde", "dep:humantime-serde"] +test-utils = [] diff --git a/crates/net/network-types/src/backoff.rs b/crates/net/network-types/src/backoff.rs new file mode 100644 index 00000000000..8ee9f68a4e3 --- /dev/null +++ b/crates/net/network-types/src/backoff.rs @@ -0,0 +1,27 @@ +/// Describes the type of backoff should be applied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BackoffKind { + /// Use the lowest configured backoff duration. + /// + /// This applies to connection problems where there is a chance that they will be resolved + /// after the short duration. + Low, + /// Use a slightly higher duration to put a peer in timeout + /// + /// This applies to more severe connection problems where there is a lower chance that they + /// will be resolved. + Medium, + /// Use the max configured backoff duration. + /// + /// This is intended for spammers, or bad peers in general. + High, +} + +// === impl BackoffKind === + +impl BackoffKind { + /// Returns true if the backoff is considered severe. + pub const fn is_severe(&self) -> bool { + matches!(self, Self::Medium | Self::High) + } +} diff --git a/crates/net/network-types/src/lib.rs b/crates/net/network-types/src/lib.rs new file mode 100644 index 00000000000..5b075d609bc --- /dev/null +++ b/crates/net/network-types/src/lib.rs @@ -0,0 +1,24 @@ +//! Commonly used networking types. +//! +//! ## Feature Flags +//! +//! - `serde` (default): Enable serde support + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +/// Types related to peering. +pub mod peers; +pub use peers::{ConnectionsConfig, PeersConfig, ReputationChangeWeights}; + +pub mod session; +pub use session::{SessionLimits, SessionsConfig}; + +/// [`BackoffKind`] definition. +mod backoff; +pub use backoff::BackoffKind; diff --git a/crates/net/network-types/src/peers/config.rs b/crates/net/network-types/src/peers/config.rs new file mode 100644 index 00000000000..5143c4c6f2b --- /dev/null +++ b/crates/net/network-types/src/peers/config.rs @@ -0,0 +1,292 @@ +//! Configuration for peering. + +use crate::{BackoffKind, ReputationChangeWeights}; +use reth_net_banlist::BanList; +use reth_network_peers::NodeRecord; +use std::{ + collections::HashSet, + io::{self, ErrorKind}, + path::Path, + time::Duration, +}; +use tracing::info; + +/// Maximum number of available slots for outbound sessions. +pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100; + +/// Maximum number of available slots for inbound sessions. +pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30; + +/// Maximum number of available slots for concurrent outgoing dials. +/// +/// This restricts how many outbound dials can be performed concurrently. +pub const DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS: usize = 15; + +/// The durations to use when a backoff should be applied to a peer. +/// +/// See also [`BackoffKind`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct PeerBackoffDurations { + /// Applies to connection problems where there is a chance that they will be resolved after the + /// short duration. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub low: Duration, + /// Applies to more severe connection problems where there is a lower chance that they will be + /// resolved. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub medium: Duration, + /// Intended for spammers, or bad peers in general. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub high: Duration, + /// Maximum total backoff duration. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub max: Duration, +} + +impl PeerBackoffDurations { + /// Returns the corresponding [`Duration`] + pub const fn backoff(&self, kind: BackoffKind) -> Duration { + match kind { + BackoffKind::Low => self.low, + BackoffKind::Medium => self.medium, + BackoffKind::High => self.high, + } + } + + /// Returns the timestamp until which we should backoff. + /// + /// The Backoff duration is capped by the configured maximum backoff duration. + pub fn backoff_until(&self, kind: BackoffKind, backoff_counter: u8) -> std::time::Instant { + let backoff_time = self.backoff(kind); + let backoff_time = backoff_time + backoff_time * backoff_counter as u32; + let now = std::time::Instant::now(); + now + backoff_time.min(self.max) + } + + /// Returns durations for testing. + #[cfg(any(test, feature = "test-utils"))] + pub const fn test() -> Self { + Self { + low: Duration::from_millis(200), + medium: Duration::from_millis(200), + high: Duration::from_millis(200), + max: Duration::from_millis(200), + } + } +} + +impl Default for PeerBackoffDurations { + fn default() -> Self { + Self { + low: Duration::from_secs(30), + // 3min + medium: Duration::from_secs(60 * 3), + // 15min + high: Duration::from_secs(60 * 15), + // 1h + max: Duration::from_secs(60 * 60), + } + } +} + +/// Tracks stats about connected nodes +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize), serde(default))] +pub struct ConnectionsConfig { + /// Maximum allowed outbound connections. + pub max_outbound: usize, + /// Maximum allowed inbound connections. + pub max_inbound: usize, + /// Maximum allowed concurrent outbound dials. + #[cfg_attr(feature = "serde", serde(default))] + pub max_concurrent_outbound_dials: usize, +} + +impl Default for ConnectionsConfig { + fn default() -> Self { + Self { + max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize, + max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize, + max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS, + } + } +} + +/// Config type for initiating a `PeersManager` instance. +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", serde(default))] +pub struct PeersConfig { + /// How often to recheck free slots for outbound connections. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub refill_slots_interval: Duration, + /// Trusted nodes to connect to or accept from + pub trusted_nodes: HashSet, + /// Connect to or accept from trusted nodes only? + #[cfg_attr(feature = "serde", serde(alias = "connect_trusted_nodes_only"))] + pub trusted_nodes_only: bool, + /// Maximum number of backoff attempts before we give up on a peer and dropping. + /// + /// The max time spent of a peer before it's removed from the set is determined by the + /// configured backoff duration and the max backoff count. + /// + /// With a backoff counter of 5 and a backoff duration of 1h, the minimum time spent of the + /// peer in the table is the sum of all backoffs (1h + 2h + 3h + 4h + 5h = 15h). + /// + /// Note: this does not apply to trusted peers. + pub max_backoff_count: u8, + /// Basic nodes to connect to. + #[cfg_attr(feature = "serde", serde(skip))] + pub basic_nodes: HashSet, + /// How long to ban bad peers. + #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] + pub ban_duration: Duration, + /// Restrictions on `PeerIds` and Ips. + #[cfg_attr(feature = "serde", serde(skip))] + pub ban_list: BanList, + /// Restrictions on connections. + pub connection_info: ConnectionsConfig, + /// How to weigh reputation changes. + pub reputation_weights: ReputationChangeWeights, + /// How long to backoff peers that we are failed to connect to for non-fatal reasons. + /// + /// The backoff duration increases with number of backoff attempts. + pub backoff_durations: PeerBackoffDurations, +} + +impl Default for PeersConfig { + fn default() -> Self { + Self { + refill_slots_interval: Duration::from_millis(5_000), + connection_info: Default::default(), + reputation_weights: Default::default(), + ban_list: Default::default(), + // Ban peers for 12h + ban_duration: Duration::from_secs(60 * 60 * 12), + backoff_durations: Default::default(), + trusted_nodes: Default::default(), + trusted_nodes_only: false, + basic_nodes: Default::default(), + max_backoff_count: 5, + } + } +} + +impl PeersConfig { + /// A set of `peer_ids` and ip addr that we want to never connect to + pub fn with_ban_list(mut self, ban_list: BanList) -> Self { + self.ban_list = ban_list; + self + } + + /// Configure how long to ban bad peers + pub const fn with_ban_duration(mut self, ban_duration: Duration) -> Self { + self.ban_duration = ban_duration; + self + } + + /// Maximum allowed outbound connections. + pub const fn with_max_outbound(mut self, max_outbound: usize) -> Self { + self.connection_info.max_outbound = max_outbound; + self + } + + /// Maximum allowed inbound connections with optional update. + pub const fn with_max_inbound_opt(mut self, max_inbound: Option) -> Self { + if let Some(max_inbound) = max_inbound { + self.connection_info.max_inbound = max_inbound; + } + self + } + + /// Maximum allowed outbound connections with optional update. + pub const fn with_max_outbound_opt(mut self, max_outbound: Option) -> Self { + if let Some(max_outbound) = max_outbound { + self.connection_info.max_outbound = max_outbound; + } + self + } + + /// Maximum allowed inbound connections. + pub const fn with_max_inbound(mut self, max_inbound: usize) -> Self { + self.connection_info.max_inbound = max_inbound; + self + } + + /// Maximum allowed concurrent outbound dials. + pub const fn with_max_concurrent_dials(mut self, max_concurrent_outbound_dials: usize) -> Self { + self.connection_info.max_concurrent_outbound_dials = max_concurrent_outbound_dials; + self + } + + /// Nodes to always connect to. + pub fn with_trusted_nodes(mut self, nodes: HashSet) -> Self { + self.trusted_nodes = nodes; + self + } + + /// Connect only to trusted nodes. + pub const fn with_trusted_nodes_only(mut self, trusted_only: bool) -> Self { + self.trusted_nodes_only = trusted_only; + self + } + + /// Nodes available at launch. + pub fn with_basic_nodes(mut self, nodes: HashSet) -> Self { + self.basic_nodes = nodes; + self + } + + /// Configures the max allowed backoff count. + pub const fn with_max_backoff_count(mut self, max_backoff_count: u8) -> Self { + self.max_backoff_count = max_backoff_count; + self + } + + /// Configures how to weigh reputation changes. + pub const fn with_reputation_weights( + mut self, + reputation_weights: ReputationChangeWeights, + ) -> Self { + self.reputation_weights = reputation_weights; + self + } + + /// Configures how long to backoff peers that are we failed to connect to for non-fatal reasons + pub const fn with_backoff_durations(mut self, backoff_durations: PeerBackoffDurations) -> Self { + self.backoff_durations = backoff_durations; + self + } + + /// Returns the maximum number of peers, inbound and outbound. + pub const fn max_peers(&self) -> usize { + self.connection_info.max_outbound + self.connection_info.max_inbound + } + + /// Read from file nodes available at launch. Ignored if None. + pub fn with_basic_nodes_from_file( + self, + optional_file: Option>, + ) -> Result { + let Some(file_path) = optional_file else { return Ok(self) }; + let reader = match std::fs::File::open(file_path.as_ref()) { + Ok(file) => io::BufReader::new(file), + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self), + Err(e) => Err(e)?, + }; + info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers"); + let nodes: HashSet = serde_json::from_reader(reader)?; + Ok(self.with_basic_nodes(nodes)) + } + + /// Returns settings for testing + #[cfg(any(test, feature = "test-utils"))] + pub fn test() -> Self { + Self { + refill_slots_interval: Duration::from_millis(100), + backoff_durations: PeerBackoffDurations::test(), + ..Default::default() + } + } +} diff --git a/crates/net/network-types/src/peers/mod.rs b/crates/net/network-types/src/peers/mod.rs new file mode 100644 index 00000000000..4b195750b51 --- /dev/null +++ b/crates/net/network-types/src/peers/mod.rs @@ -0,0 +1,5 @@ +pub mod reputation; +pub use reputation::ReputationChangeWeights; + +pub mod config; +pub use config::{ConnectionsConfig, PeersConfig}; diff --git a/crates/net/network/src/peers/reputation.rs b/crates/net/network-types/src/peers/reputation.rs similarity index 92% rename from crates/net/network/src/peers/reputation.rs rename to crates/net/network-types/src/peers/reputation.rs index 9d3ec256bea..13fac4c1ebc 100644 --- a/crates/net/network/src/peers/reputation.rs +++ b/crates/net/network-types/src/peers/reputation.rs @@ -3,13 +3,13 @@ use reth_network_api::{Reputation, ReputationChangeKind}; /// The default reputation of a peer -pub(crate) const DEFAULT_REPUTATION: Reputation = 0; +pub const DEFAULT_REPUTATION: Reputation = 0; /// The minimal unit we're measuring reputation const REPUTATION_UNIT: i32 = -1024; /// The reputation value below which new connection from/to peers are rejected. -pub(crate) const BANNED_REPUTATION: i32 = 50 * REPUTATION_UNIT; +pub const BANNED_REPUTATION: i32 = 50 * REPUTATION_UNIT; /// The reputation change to apply to a peer that dropped the connection. const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = 4 * REPUTATION_UNIT; @@ -42,11 +42,11 @@ const BAD_ANNOUNCEMENT_REPUTATION_CHANGE: i32 = REPUTATION_UNIT; /// This gives a trusted peer more leeway when interacting with the node, which is useful for in /// custom setups. By not setting this to `0` we still allow trusted peer penalization but less than /// untrusted peers. -pub(crate) const MAX_TRUSTED_PEER_REPUTATION_CHANGE: Reputation = 2 * REPUTATION_UNIT; +pub const MAX_TRUSTED_PEER_REPUTATION_CHANGE: Reputation = 2 * REPUTATION_UNIT; /// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold #[inline] -pub(crate) const fn is_banned_reputation(reputation: i32) -> bool { +pub const fn is_banned_reputation(reputation: i32) -> bool { reputation < BANNED_REPUTATION } @@ -80,7 +80,7 @@ pub struct ReputationChangeWeights { impl ReputationChangeWeights { /// Returns the quantifiable [`ReputationChange`] for the given [`ReputationChangeKind`] using /// the configured weights - pub(crate) fn change(&self, kind: ReputationChangeKind) -> ReputationChange { + pub fn change(&self, kind: ReputationChangeKind) -> ReputationChange { match kind { ReputationChangeKind::BadMessage => self.bad_message.into(), ReputationChangeKind::BadBlock => self.bad_block.into(), @@ -115,14 +115,14 @@ impl Default for ReputationChangeWeights { /// Represents a change in a peer's reputation. #[derive(Debug, Copy, Clone, Default)] -pub(crate) struct ReputationChange(Reputation); +pub struct ReputationChange(Reputation); // === impl ReputationChange === impl ReputationChange { /// Helper type for easier conversion #[inline] - pub(crate) const fn as_i32(self) -> Reputation { + pub const fn as_i32(self) -> Reputation { self.0 } } diff --git a/crates/net/network/src/session/config.rs b/crates/net/network-types/src/session/config.rs similarity index 65% rename from crates/net/network/src/session/config.rs rename to crates/net/network-types/src/session/config.rs index 6c7fc282d0a..941448effd6 100644 --- a/crates/net/network/src/session/config.rs +++ b/crates/net/network-types/src/session/config.rs @@ -1,9 +1,6 @@ -//! Configuration types for [`SessionManager`](crate::session::SessionManager). +//! Configuration types for peer sessions manager. -use crate::{ - peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}, - session::{Direction, ExceedsSessionLimit}, -}; +use crate::peers::config::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}; use std::time::Duration; /// Default request timeout for a single request. @@ -29,7 +26,7 @@ const DEFAULT_MAX_PEERS: usize = /// With maxed out peers, this will allow for 3 messages per session (average) const DEFAULT_SESSION_EVENT_BUFFER_SIZE: usize = DEFAULT_MAX_PEERS * 2; -/// Configuration options when creating a [`SessionManager`](crate::session::SessionManager). +/// Configuration options for peer session management. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serde", serde(default))] @@ -111,10 +108,14 @@ impl SessionsConfig { #[derive(Debug, Clone, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct SessionLimits { - max_pending_inbound: Option, - max_pending_outbound: Option, - max_established_inbound: Option, - max_established_outbound: Option, + /// Maximum allowed inbound connections. + pub max_pending_inbound: Option, + /// Maximum allowed outbound connections. + pub max_pending_outbound: Option, + /// Maximum allowed established inbound connections. + pub max_established_inbound: Option, + /// Maximum allowed established outbound connections. + pub max_established_outbound: Option, } impl SessionLimits { @@ -143,107 +144,10 @@ impl SessionLimits { } } -/// Keeps track of all sessions. -#[derive(Debug, Clone)] -pub struct SessionCounter { - /// Limits to enforce. - limits: SessionLimits, - /// Number of pending incoming sessions. - pending_inbound: u32, - /// Number of pending outgoing sessions. - pending_outbound: u32, - /// Number of active inbound sessions. - active_inbound: u32, - /// Number of active outbound sessions. - active_outbound: u32, -} - -// === impl SessionCounter === - -impl SessionCounter { - pub(crate) const fn new(limits: SessionLimits) -> Self { - Self { - limits, - pending_inbound: 0, - pending_outbound: 0, - active_inbound: 0, - active_outbound: 0, - } - } - - pub(crate) fn inc_pending_inbound(&mut self) { - self.pending_inbound += 1; - } - - pub(crate) fn inc_pending_outbound(&mut self) { - self.pending_outbound += 1; - } - - pub(crate) fn dec_pending(&mut self, direction: &Direction) { - match direction { - Direction::Outgoing(_) => { - self.pending_outbound -= 1; - } - Direction::Incoming => { - self.pending_inbound -= 1; - } - } - } - - pub(crate) fn inc_active(&mut self, direction: &Direction) { - match direction { - Direction::Outgoing(_) => { - self.active_outbound += 1; - } - Direction::Incoming => { - self.active_inbound += 1; - } - } - } - - pub(crate) fn dec_active(&mut self, direction: &Direction) { - match direction { - Direction::Outgoing(_) => { - self.active_outbound -= 1; - } - Direction::Incoming => { - self.active_inbound -= 1; - } - } - } - - pub(crate) const fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> { - Self::ensure(self.pending_outbound, self.limits.max_pending_outbound) - } - - pub(crate) const fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> { - Self::ensure(self.pending_inbound, self.limits.max_pending_inbound) - } - - const fn ensure(current: u32, limit: Option) -> Result<(), ExceedsSessionLimit> { - if let Some(limit) = limit { - if current >= limit { - return Err(ExceedsSessionLimit(limit)) - } - } - Ok(()) - } -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn test_limits() { - let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2)); - assert!(limits.ensure_pending_outbound().is_ok()); - limits.inc_pending_inbound(); - assert!(limits.ensure_pending_inbound().is_ok()); - limits.inc_pending_inbound(); - assert!(limits.ensure_pending_inbound().is_err()); - } - #[test] fn scale_session_event_buffer() { let config = SessionsConfig::default().with_upscaled_event_buffer(10); diff --git a/crates/net/network-types/src/session/mod.rs b/crates/net/network-types/src/session/mod.rs new file mode 100644 index 00000000000..a5b613189c0 --- /dev/null +++ b/crates/net/network-types/src/session/mod.rs @@ -0,0 +1,4 @@ +//! Peer sessions configuration. + +pub mod config; +pub use config::{SessionLimits, SessionsConfig}; diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 1dde28cfe2c..d61caa7bec9 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -29,6 +29,7 @@ reth-provider.workspace = true reth-tokio-util.workspace = true reth-consensus.workspace = true reth-network-peers.workspace = true +reth-network-types.workspace = true # ethereum enr = { workspace = true, features = ["serde", "rust-secp256k1"] } @@ -75,6 +76,7 @@ reth-primitives = { workspace = true, features = ["test-utils"] } # integration tests reth-network = { workspace = true, features = ["test-utils"] } reth-network-p2p = { workspace = true, features = ["test-utils"] } +reth-network-types = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-tracing.workspace = true @@ -95,8 +97,8 @@ criterion = { workspace = true, features = ["async_tokio", "html_reports"] } [features] default = ["serde"] geth-tests = [] -serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json"] -test-utils = ["reth-provider/test-utils", "dep:tempfile", "reth-transaction-pool/test-utils"] +serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json", "reth-network-types/serde"] +test-utils = ["reth-provider/test-utils", "dep:tempfile", "reth-transaction-pool/test-utils", "reth-network-types/test-utils"] [[bench]] name = "bench" diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 151421b4a5d..c42d204f5e2 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -3,8 +3,6 @@ use crate::{ error::NetworkError, import::{BlockImport, ProofOfStakeBlockImport}, - peers::PeersConfig, - session::SessionsConfig, transactions::TransactionsManagerConfig, NetworkHandle, NetworkManager, }; @@ -17,6 +15,7 @@ use reth_discv5::NetworkStackId; use reth_dns_discovery::DnsDiscoveryConfig; use reth_eth_wire::{HelloMessage, HelloMessageWithProtocols, Status}; use reth_network_peers::{pk2id, PeerId}; +use reth_network_types::{PeersConfig, SessionsConfig}; use reth_primitives::{ForkFilter, Head}; use reth_provider::{BlockReader, HeaderProvider}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 9019a79f237..d5e0f453721 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -6,6 +6,7 @@ use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError}, DisconnectReason, }; +use reth_network_types::BackoffKind; use std::{fmt, io, io::ErrorKind, net::SocketAddr}; /// Service kind. @@ -104,34 +105,6 @@ pub(crate) trait SessionError: fmt::Debug + fmt::Display { fn should_backoff(&self) -> Option; } -/// Describes the type of backoff should be applied. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum BackoffKind { - /// Use the lowest configured backoff duration. - /// - /// This applies to connection problems where there is a chance that they will be resolved - /// after the short duration. - Low, - /// Use a slightly higher duration to put a peer in timeout - /// - /// This applies to more severe connection problems where there is a lower chance that they - /// will be resolved. - Medium, - /// Use the max configured backoff duration. - /// - /// This is intended for spammers, or bad peers in general. - High, -} - -// === impl BackoffKind === - -impl BackoffKind { - /// Returns true if the backoff is considered severe. - pub(crate) const fn is_severe(&self) -> bool { - matches!(self, Self::Medium | Self::High) - } -} - impl SessionError for EthStreamError { fn merits_discovery_ban(&self) -> bool { match self { diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index b8f3f8be169..f03889f9841 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -143,12 +143,12 @@ pub use fetch::FetchClient; pub use manager::{NetworkEvent, NetworkManager}; pub use message::PeerRequest; pub use network::{NetworkEvents, NetworkHandle, NetworkProtocols}; -pub use peers::PeersConfig; pub use session::{ ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent, PendingSessionHandle, PendingSessionHandshakeError, SessionCommand, SessionEvent, SessionId, - SessionLimits, SessionManager, SessionsConfig, + SessionManager, }; pub use transactions::{FilterAnnouncement, MessageFilter, ValidateTx68}; pub use reth_eth_wire::{DisconnectReason, HelloMessageWithProtocols}; +pub use reth_network_types::{PeersConfig, SessionsConfig}; diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers.rs similarity index 88% rename from crates/net/network/src/peers/manager.rs rename to crates/net/network/src/peers.rs index bde0bd066f0..c7e6a05a57d 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers.rs @@ -1,12 +1,7 @@ +//! Peer related implementations + use crate::{ - error::{BackoffKind, SessionError}, - peers::{ - reputation::{ - is_banned_reputation, DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE, - }, - ReputationChangeWeights, DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS, - DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND, - }, + error::SessionError, session::{Direction, PendingSessionHandshakeError}, swarm::NetworkConnectionState, }; @@ -15,13 +10,21 @@ use reth_eth_wire::{errors::EthStreamError, DisconnectReason}; use reth_net_banlist::BanList; use reth_network_api::{PeerKind, ReputationChangeKind}; use reth_network_peers::{NodeRecord, PeerId}; +use reth_network_types::{ + peers::{ + config::PeerBackoffDurations, + reputation::{ + is_banned_reputation, DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE, + }, + }, + ConnectionsConfig, PeersConfig, ReputationChangeWeights, +}; use reth_primitives::ForkId; use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt::Display, - io::{self, ErrorKind}, + io::{self}, net::{IpAddr, SocketAddr}, - path::Path, task::{Context, Poll}, time::Duration, }; @@ -31,7 +34,7 @@ use tokio::{ time::{Instant, Interval}, }; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{info, trace}; +use tracing::trace; /// A communication channel to the [`PeersManager`] to apply manual changes to the peer set. #[derive(Clone, Debug)] @@ -170,7 +173,7 @@ impl PeersManager { reputation_weights, refill_slots_interval: tokio::time::interval(refill_slots_interval), release_interval: tokio::time::interval_at(now + unban_interval, unban_interval), - connection_info, + connection_info: ConnectionInfo::new(connection_info), ban_list, backed_off_peers: Default::default(), ban_duration, @@ -238,9 +241,7 @@ impl PeersManager { return Err(InboundConnectionError::IpBanned) } - if (!self.connection_info.has_in_capacity() || self.connection_info.max_inbound == 0) && - self.trusted_peer_ids.is_empty() - { + if !self.connection_info.has_in_capacity() && self.trusted_peer_ids.is_empty() { // if we don't have any inbound slots and no trusted peers, we don't accept any new // connections return Err(InboundConnectionError::ExceedsCapacity) @@ -918,42 +919,37 @@ impl Default for PeersManager { } /// Tracks stats about connected nodes -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize), serde(default))] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct ConnectionInfo { /// Counter for currently occupied slots for active outbound connections. - #[cfg_attr(feature = "serde", serde(skip))] num_outbound: usize, /// Counter for pending outbound connections. - #[cfg_attr(feature = "serde", serde(skip))] num_pending_out: usize, /// Counter for currently occupied slots for active inbound connections. - #[cfg_attr(feature = "serde", serde(skip))] num_inbound: usize, /// Counter for pending inbound connections. - #[cfg_attr(feature = "serde", serde(skip))] num_pending_in: usize, - /// Maximum allowed outbound connections. - max_outbound: usize, - /// Maximum allowed inbound connections. - max_inbound: usize, - /// Maximum allowed concurrent outbound dials. - #[cfg_attr(feature = "serde", serde(default))] - max_concurrent_outbound_dials: usize, + /// Restrictions on number of connections. + config: ConnectionsConfig, } // === impl ConnectionInfo === impl ConnectionInfo { + /// Returns a new [`ConnectionInfo`] with the given config. + const fn new(config: ConnectionsConfig) -> Self { + Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 } + } + /// Returns `true` if there's still capacity for a new outgoing connection. const fn has_out_capacity(&self) -> bool { - self.num_pending_out < self.max_concurrent_outbound_dials && - self.num_outbound < self.max_outbound + self.num_pending_out < self.config.max_concurrent_outbound_dials && + self.num_outbound < self.config.max_outbound } /// Returns `true` if there's still capacity for a new incoming connection. const fn has_in_capacity(&self) -> bool { - self.num_inbound < self.max_inbound + self.num_inbound < self.config.max_inbound } fn decr_state(&mut self, state: PeerConnectionState) { @@ -998,20 +994,6 @@ impl ConnectionInfo { } } -impl Default for ConnectionInfo { - fn default() -> Self { - Self { - num_outbound: 0, - num_inbound: 0, - max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize, - max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize, - max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS, - num_pending_out: 0, - num_pending_in: 0, - } - } -} - /// Tracks info about a single peer. #[derive(Debug, Clone)] pub struct Peer { @@ -1029,7 +1011,8 @@ pub struct Peer { kind: PeerKind, /// Whether the peer is currently backed off. backed_off: bool, - /// Counts number of times the peer was backed off due to a severe [`BackoffKind`]. + /// Counts number of times the peer was backed off due to a severe + /// [`reth_network_types::BackoffKind`]. severe_backoff_counter: u8, } @@ -1263,265 +1246,6 @@ pub enum PeerAction { PeerRemoved(PeerId), } -/// Config type for initiating a [`PeersManager`] instance. -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -#[cfg_attr(feature = "serde", serde(default))] -pub struct PeersConfig { - /// How often to recheck free slots for outbound connections. - #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] - pub refill_slots_interval: Duration, - /// Trusted nodes to connect to or accept from - pub trusted_nodes: HashSet, - /// Connect to or accept from trusted nodes only? - #[cfg_attr(feature = "serde", serde(alias = "connect_trusted_nodes_only"))] - pub trusted_nodes_only: bool, - /// Maximum number of backoff attempts before we give up on a peer and dropping. - /// - /// The max time spent of a peer before it's removed from the set is determined by the - /// configured backoff duration and the max backoff count. - /// - /// With a backoff counter of 5 and a backoff duration of 1h, the minimum time spent of the - /// peer in the table is the sum of all backoffs (1h + 2h + 3h + 4h + 5h = 15h). - /// - /// Note: this does not apply to trusted peers. - pub max_backoff_count: u8, - /// Basic nodes to connect to. - #[cfg_attr(feature = "serde", serde(skip))] - pub basic_nodes: HashSet, - /// How long to ban bad peers. - #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] - pub ban_duration: Duration, - /// Restrictions on `PeerIds` and Ips. - #[cfg_attr(feature = "serde", serde(skip))] - pub ban_list: BanList, - /// Restrictions on connections. - pub connection_info: ConnectionInfo, - /// How to weigh reputation changes. - pub reputation_weights: ReputationChangeWeights, - /// How long to backoff peers that are we failed to connect to for non-fatal reasons, such as - /// [`DisconnectReason::TooManyPeers`]. - /// - /// The backoff duration increases with number of backoff attempts. - pub backoff_durations: PeerBackoffDurations, -} - -impl Default for PeersConfig { - fn default() -> Self { - Self { - refill_slots_interval: Duration::from_millis(5_000), - connection_info: Default::default(), - reputation_weights: Default::default(), - ban_list: Default::default(), - // Ban peers for 12h - ban_duration: Duration::from_secs(60 * 60 * 12), - backoff_durations: Default::default(), - trusted_nodes: Default::default(), - trusted_nodes_only: false, - basic_nodes: Default::default(), - max_backoff_count: 5, - } - } -} - -impl PeersConfig { - /// A set of `peer_ids` and ip addr that we want to never connect to - pub fn with_ban_list(mut self, ban_list: BanList) -> Self { - self.ban_list = ban_list; - self - } - - /// Configure how long to ban bad peers - pub const fn with_ban_duration(mut self, ban_duration: Duration) -> Self { - self.ban_duration = ban_duration; - self - } - - /// Maximum occupied slots for outbound connections. - pub const fn with_max_pending_outbound(mut self, num_outbound: usize) -> Self { - self.connection_info.num_outbound = num_outbound; - self - } - - /// Maximum occupied slots for inbound connections. - pub const fn with_max_pending_inbound(mut self, num_inbound: usize) -> Self { - self.connection_info.num_inbound = num_inbound; - self - } - - /// Maximum allowed outbound connections. - pub const fn with_max_outbound(mut self, max_outbound: usize) -> Self { - self.connection_info.max_outbound = max_outbound; - self - } - - /// Maximum allowed inbound connections with optional update. - pub const fn with_max_inbound_opt(mut self, max_inbound: Option) -> Self { - if let Some(max_inbound) = max_inbound { - self.connection_info.max_inbound = max_inbound; - } - self - } - - /// Maximum allowed outbound connections with optional update. - pub const fn with_max_outbound_opt(mut self, max_outbound: Option) -> Self { - if let Some(max_outbound) = max_outbound { - self.connection_info.max_outbound = max_outbound; - } - self - } - - /// Maximum allowed inbound connections. - pub const fn with_max_inbound(mut self, max_inbound: usize) -> Self { - self.connection_info.max_inbound = max_inbound; - self - } - - /// Maximum allowed concurrent outbound dials. - pub const fn with_max_concurrent_dials(mut self, max_concurrent_outbound_dials: usize) -> Self { - self.connection_info.max_concurrent_outbound_dials = max_concurrent_outbound_dials; - self - } - - /// Nodes to always connect to. - pub fn with_trusted_nodes(mut self, nodes: HashSet) -> Self { - self.trusted_nodes = nodes; - self - } - - /// Connect only to trusted nodes. - pub const fn with_trusted_nodes_only(mut self, trusted_only: bool) -> Self { - self.trusted_nodes_only = trusted_only; - self - } - - /// Nodes available at launch. - pub fn with_basic_nodes(mut self, nodes: HashSet) -> Self { - self.basic_nodes = nodes; - self - } - - /// Configures the max allowed backoff count. - pub const fn with_max_backoff_count(mut self, max_backoff_count: u8) -> Self { - self.max_backoff_count = max_backoff_count; - self - } - - /// Configures how to weigh reputation changes. - pub const fn with_reputation_weights( - mut self, - reputation_weights: ReputationChangeWeights, - ) -> Self { - self.reputation_weights = reputation_weights; - self - } - - /// Configures how long to backoff peers that are we failed to connect to for non-fatal reasons - pub const fn with_backoff_durations(mut self, backoff_durations: PeerBackoffDurations) -> Self { - self.backoff_durations = backoff_durations; - self - } - - /// Returns the maximum number of peers, inbound and outbound. - pub const fn max_peers(&self) -> usize { - self.connection_info.max_outbound + self.connection_info.max_inbound - } - - /// Read from file nodes available at launch. Ignored if None. - pub fn with_basic_nodes_from_file( - self, - optional_file: Option>, - ) -> Result { - let Some(file_path) = optional_file else { return Ok(self) }; - let reader = match std::fs::File::open(file_path.as_ref()) { - Ok(file) => io::BufReader::new(file), - Err(e) if e.kind() == ErrorKind::NotFound => return Ok(self), - Err(e) => Err(e)?, - }; - info!(target: "net::peers", file = %file_path.as_ref().display(), "Loading saved peers"); - let nodes: HashSet = serde_json::from_reader(reader)?; - Ok(self.with_basic_nodes(nodes)) - } - - /// Returns settings for testing - #[cfg(test)] - fn test() -> Self { - Self { - refill_slots_interval: Duration::from_millis(100), - backoff_durations: PeerBackoffDurations::test(), - ..Default::default() - } - } -} - -/// The durations to use when a backoff should be applied to a peer. -/// -/// See also [`BackoffKind`]. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct PeerBackoffDurations { - /// Applies to connection problems where there is a chance that they will be resolved after the - /// short duration. - #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] - pub low: Duration, - /// Applies to more severe connection problems where there is a lower chance that they will be - /// resolved. - #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] - pub medium: Duration, - /// Intended for spammers, or bad peers in general. - #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] - pub high: Duration, - /// Maximum total backoff duration. - #[cfg_attr(feature = "serde", serde(with = "humantime_serde"))] - pub max: Duration, -} - -impl PeerBackoffDurations { - /// Returns the corresponding [`Duration`] - pub const fn backoff(&self, kind: BackoffKind) -> Duration { - match kind { - BackoffKind::Low => self.low, - BackoffKind::Medium => self.medium, - BackoffKind::High => self.high, - } - } - - /// Returns the timestamp until which we should backoff. - /// - /// The Backoff duration is capped by the configured maximum backoff duration. - pub fn backoff_until(&self, kind: BackoffKind, backoff_counter: u8) -> std::time::Instant { - let backoff_time = self.backoff(kind); - let backoff_time = backoff_time + backoff_time * backoff_counter as u32; - let now = std::time::Instant::now(); - now + backoff_time.min(self.max) - } - - /// Returns durations for testing. - #[cfg(test)] - const fn test() -> Self { - Self { - low: Duration::from_millis(200), - medium: Duration::from_millis(200), - high: Duration::from_millis(200), - max: Duration::from_millis(200), - } - } -} - -impl Default for PeerBackoffDurations { - fn default() -> Self { - Self { - low: Duration::from_secs(30), - // 3min - medium: Duration::from_secs(60 * 3), - // 15min - high: Duration::from_secs(60 * 15), - // 1h - max: Duration::from_secs(60 * 60), - } - } -} - /// Error thrown when a incoming connection is rejected right away #[derive(Debug, Error, PartialEq, Eq)] pub enum InboundConnectionError { @@ -1541,11 +1265,9 @@ impl Display for InboundConnectionError { mod tests { use super::PeersManager; use crate::{ - error::BackoffKind, peers::{ - manager::{ConnectionInfo, PeerBackoffDurations, PeerConnectionState}, - reputation::DEFAULT_REPUTATION, - InboundConnectionError, PeerAction, + ConnectionInfo, InboundConnectionError, PeerAction, PeerBackoffDurations, + PeerConnectionState, }, session::PendingSessionHandshakeError, PeersConfig, @@ -1558,6 +1280,7 @@ mod tests { use reth_net_banlist::BanList; use reth_network_api::{Direction, ReputationChangeKind}; use reth_network_peers::PeerId; + use reth_network_types::{peers::reputation::DEFAULT_REPUTATION, BackoffKind}; use reth_primitives::B512; use std::{ collections::HashSet, @@ -2106,7 +1829,7 @@ mod tests { peers.add_trusted_peer_id(trusted); // saturate the inbound slots - for i in 0..peers.connection_info.max_inbound { + for i in 0..peers.connection_info.config.max_inbound { let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008); assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); let peer_id = PeerId::random(); @@ -2404,7 +2127,7 @@ mod tests { match a { Ok(_) => panic!(), Err(err) => match err { - super::InboundConnectionError::IpBanned {} => { + InboundConnectionError::IpBanned {} => { assert_eq!(peer_manager.connection_info.num_pending_in, 0) } _ => unreachable!(), @@ -2769,7 +2492,7 @@ mod tests { let mut peer_manager = PeersManager::new(config); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); let socket_addr = SocketAddr::new(ip, 8008); - for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 { + for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { peer_manager.add_peer(PeerId::random(), socket_addr, None); } @@ -2779,7 +2502,7 @@ mod tests { .iter() .filter(|ev| matches!(ev, PeerAction::Connect { .. })) .count(); - assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials); + assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials); } #[tokio::test] @@ -2790,18 +2513,18 @@ mod tests { let socket_addr = SocketAddr::new(ip, 8008); // add more peers than allowed - for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 { + for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { peer_manager.add_peer(PeerId::random(), socket_addr, None); } - for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 { + for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { match event!(peer_manager) { PeerAction::PeerAdded(_) => {} _ => unreachable!(), } } - for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials { + for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials { match event!(peer_manager) { PeerAction::Connect { .. } => {} _ => unreachable!(), @@ -2813,7 +2536,7 @@ mod tests { // all dialed connections should be in 'PendingOut' state let dials = peer_manager.connection_info.num_pending_out; - assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials); + assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials); let num_pendingout_states = peer_manager .peers @@ -2823,7 +2546,7 @@ mod tests { .collect::>(); assert_eq!( num_pendingout_states.len(), - peer_manager.connection_info.max_concurrent_outbound_dials + peer_manager.connection_info.config.max_concurrent_outbound_dials ); // establish dialed connections diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs deleted file mode 100644 index fafb2d7622e..00000000000 --- a/crates/net/network/src/peers/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! Peer related implementations - -mod manager; -mod reputation; - -pub(crate) use manager::InboundConnectionError; -pub use manager::{ConnectionInfo, Peer, PeerAction, PeersConfig, PeersHandle, PeersManager}; -pub use reputation::ReputationChangeWeights; -pub use reth_network_api::PeerKind; - -/// Maximum number of available slots for outbound sessions. -pub const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100; - -/// Maximum number of available slots for inbound sessions. -pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30; - -/// Maximum number of available slots for concurrent outgoing dials. -/// -/// This restricts how many outbound dials can be performed concurrently. -pub const DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS: usize = 15; diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 7551721ea23..97ccaf2c6a7 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -3,7 +3,6 @@ use crate::{ message::{NewBlockMessage, PeerMessage, PeerRequest, PeerResponse, PeerResponseResult}, session::{ - config::INITIAL_REQUEST_TIMEOUT, conn::EthRlpxConnection, handle::{ActiveSessionMessage, SessionCommand}, SessionId, @@ -20,6 +19,7 @@ use reth_eth_wire::{ use reth_metrics::common::mpsc::MeteredPollSender; use reth_network_p2p::error::RequestError; use reth_network_peers::PeerId; +use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT; use rustc_hash::FxHashMap; use std::{ collections::VecDeque, @@ -759,10 +759,7 @@ fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) -> #[cfg(test)] mod tests { use super::*; - use crate::session::{ - config::PROTOCOL_BREACH_REQUEST_TIMEOUT, handle::PendingSessionEvent, - start_pending_incoming_session, - }; + use crate::session::{handle::PendingSessionEvent, start_pending_incoming_session}; use reth_chainspec::MAINNET; use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ @@ -770,6 +767,7 @@ mod tests { UnauthedEthStream, UnauthedP2PStream, }; use reth_network_peers::pk2id; + use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT; use reth_primitives::{ForkFilter, Hardfork}; use secp256k1::{SecretKey, SECP256K1}; use tokio::{ diff --git a/crates/net/network/src/session/counter.rs b/crates/net/network/src/session/counter.rs new file mode 100644 index 00000000000..0d8f764f206 --- /dev/null +++ b/crates/net/network/src/session/counter.rs @@ -0,0 +1,106 @@ +use reth_network_api::Direction; +use reth_network_types::SessionLimits; + +use super::ExceedsSessionLimit; + +/// Keeps track of all sessions. +#[derive(Debug, Clone)] +pub struct SessionCounter { + /// Limits to enforce. + limits: SessionLimits, + /// Number of pending incoming sessions. + pending_inbound: u32, + /// Number of pending outgoing sessions. + pending_outbound: u32, + /// Number of active inbound sessions. + active_inbound: u32, + /// Number of active outbound sessions. + active_outbound: u32, +} + +// === impl SessionCounter === + +impl SessionCounter { + pub(crate) const fn new(limits: SessionLimits) -> Self { + Self { + limits, + pending_inbound: 0, + pending_outbound: 0, + active_inbound: 0, + active_outbound: 0, + } + } + + pub(crate) fn inc_pending_inbound(&mut self) { + self.pending_inbound += 1; + } + + pub(crate) fn inc_pending_outbound(&mut self) { + self.pending_outbound += 1; + } + + pub(crate) fn dec_pending(&mut self, direction: &Direction) { + match direction { + Direction::Outgoing(_) => { + self.pending_outbound -= 1; + } + Direction::Incoming => { + self.pending_inbound -= 1; + } + } + } + + pub(crate) fn inc_active(&mut self, direction: &Direction) { + match direction { + Direction::Outgoing(_) => { + self.active_outbound += 1; + } + Direction::Incoming => { + self.active_inbound += 1; + } + } + } + + pub(crate) fn dec_active(&mut self, direction: &Direction) { + match direction { + Direction::Outgoing(_) => { + self.active_outbound -= 1; + } + Direction::Incoming => { + self.active_inbound -= 1; + } + } + } + + pub(crate) const fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> { + Self::ensure(self.pending_outbound, self.limits.max_pending_outbound) + } + + pub(crate) const fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> { + Self::ensure(self.pending_inbound, self.limits.max_pending_inbound) + } + + const fn ensure(current: u32, limit: Option) -> Result<(), ExceedsSessionLimit> { + if let Some(limit) = limit { + if current >= limit { + return Err(ExceedsSessionLimit(limit)) + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_limits() { + let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2)); + assert!(limits.ensure_pending_outbound().is_ok()); + limits.inc_pending_inbound(); + assert!(limits.ensure_pending_inbound().is_ok()); + limits.inc_pending_inbound(); + assert!(limits.ensure_pending_inbound().is_err()); + } +} diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index e8a80bcff19..715ed59cf63 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -1,10 +1,7 @@ //! Support for handling peer sessions. -use crate::{ - message::PeerMessage, - metrics::SessionManagerMetrics, - session::{active::ActiveSession, config::SessionCounter}, -}; +use crate::{message::PeerMessage, metrics::SessionManagerMetrics, session::active::ActiveSession}; +use counter::SessionCounter; use futures::{future::Either, io, FutureExt, StreamExt}; use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ @@ -15,6 +12,7 @@ use reth_eth_wire::{ }; use reth_metrics::common::mpsc::MeteredPollSender; use reth_network_peers::PeerId; +use reth_network_types::SessionsConfig; use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head}; use reth_tasks::TaskSpawner; use rustc_hash::FxHashMap; @@ -37,12 +35,11 @@ use tokio_util::sync::PollSender; use tracing::{debug, instrument, trace}; mod active; -mod config; mod conn; +mod counter; mod handle; pub use crate::message::PeerRequestSender; use crate::protocol::{IntoRlpxSubProtocol, RlpxSubProtocolHandlers, RlpxSubProtocols}; -pub use config::{SessionLimits, SessionsConfig}; pub use handle::{ ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle, SessionCommand, diff --git a/crates/net/network/src/transactions/constants.rs b/crates/net/network/src/transactions/constants.rs index 59ec103cdac..48fb8857cc3 100644 --- a/crates/net/network/src/transactions/constants.rs +++ b/crates/net/network/src/transactions/constants.rs @@ -57,9 +57,9 @@ pub mod tx_manager { /// Constants used by [`TransactionFetcher`](super::TransactionFetcher). pub mod tx_fetcher { - use crate::{ - peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}, - transactions::fetcher::TransactionFetcherInfo, + use crate::transactions::fetcher::TransactionFetcherInfo; + use reth_network_types::peers::config::{ + DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND, }; use super::{