diff --git a/prdoc/pr_8650.prdoc b/prdoc/pr_8650.prdoc new file mode 100644 index 0000000000000..a2a1a8d06f074 --- /dev/null +++ b/prdoc/pr_8650.prdoc @@ -0,0 +1,25 @@ +title: 'litep2p/peerset: Reject non-reserved peers in the reserved-only mode' +doc: +- audience: Node Operator + description: |- + This PR rejects non-reserved peers in the reserved-only mode of the litep2p notification peerset. + + Previously, litep2p ignored completely the reserved-only state while accepting inbound connections. However, it handled it properly during the slot allocation phase. + - the main changes are in the `report_inbound_substream` function, which now propagated a `Rejected` response to litep2p on the reserved-only state + - in response, litep2p should never open an inbound substream after receiving the rejected response + - the state of peers is not advanced while in `Disconnected` or `Backoff` states + - the opening state is moved to `Cancelled` + - for consistency purposes (and fuzzing purposes), the `report_substream_opened` is more robustly handling the `Disconnected` state + - while at it have replaced a panic with `debug_assert` and an instant reject + + ## Testing Done + - started 2 nodes in Kusama and Polkadot with litep2p + - added the `reserved_only_rejects_non_reserved_peers` test to ensure litep2p handles peers properly from different states + + + This PR has been extracted from https://github.com/paritytech/polkadot-sdk/pull/8461 to ease the review process + + cc @paritytech/networking +crates: +- name: sc-network + bump: patch diff --git a/substrate/client/network/src/litep2p/shim/notification/peerset.rs b/substrate/client/network/src/litep2p/shim/notification/peerset.rs index fb822794ccf0a..153987e5956d3 100644 --- a/substrate/client/network/src/litep2p/shim/notification/peerset.rs +++ b/substrate/client/network/src/litep2p/shim/notification/peerset.rs @@ -139,7 +139,7 @@ impl From for traits::Direction { } /// Open result for a fully-opened connection. -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Debug)] pub enum OpenResult { /// Accept the connection. Accept { @@ -416,6 +416,15 @@ impl Peerset { // if some connected peer gets banned. peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() })); + log::debug!( + target: LOG_TARGET, + "{}: creating new peerset with max_outbound {} and max_inbound {} and reserved_only {}", + protocol, + max_out, + max_in, + reserved_only, + ); + ( Self { protocol, @@ -485,8 +494,25 @@ impl Peerset { return OpenResult::Reject }, + // The peer was already rejected by the `report_inbound_substream` call and this + // should never happen. However, this code path is exercised by our fuzzer. + PeerState::Disconnected => { + log::debug!( + target: LOG_TARGET, + "{}: substream opened for a peer that was previously rejected {peer:?}", + self.protocol, + ); + return OpenResult::Reject + }, state => { - panic!("{}: invalid state for open substream {peer:?} {state:?}", self.protocol); + log::error!( + target: LOG_TARGET, + "{}: substream opened for a peer in invalid state {peer:?}: {state:?}", + self.protocol, + ); + + debug_assert!(false); + return OpenResult::Reject; }, } } @@ -545,14 +571,27 @@ impl Peerset { PeerState::Closing { .. } | PeerState::Connected { .. } => { log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol); }, + // The peer was already rejected by the `report_inbound_substream` call and this + // should never happen. However, this code path is exercised by our fuzzer. + PeerState::Disconnected => { + log::debug!( + target: LOG_TARGET, + "{}: substream closed for a peer that was previously rejected {peer:?}", + self.protocol, + ); + }, state => { log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol); debug_assert!(false); }, } - *state = PeerState::Backoff; - self.connected_peers.fetch_sub(1usize, Ordering::Relaxed); + // Rejected peers do not count towards slot allocation. + if !matches!(state, PeerState::Disconnected) { + self.connected_peers.fetch_sub(1usize, Ordering::Relaxed); + } + + *state = PeerState::Backoff; self.pending_backoffs.push(Box::pin(async move { Delay::new(DEFAULT_BACKOFF).await; (peer, DISCONNECT_ADJUSTMENT) @@ -576,12 +615,25 @@ impl Peerset { let state = self.peers.entry(peer).or_insert(PeerState::Disconnected); let is_reserved_peer = self.reserved_peers.contains(&peer); + // Check if this is a non-reserved peer and if the protocol is in reserved-only mode. + let should_reject = self.reserved_only && !is_reserved_peer; + match state { + // disconnected peers that are reserved-only peers are rejected + PeerState::Disconnected if should_reject => { + log::trace!( + target: LOG_TARGET, + "{}: rejecting non-reserved peer {peer:?} in reserved-only mode (prev state: {state:?})", + self.protocol, + ); + + return ValidationResult::Reject + }, // disconnected peers proceed directly to inbound slot allocation PeerState::Disconnected => {}, // peer is backed off but if it can be accepted (either a reserved peer or inbound slot // available), accept the peer and then just ignore the back-off timer when it expires - PeerState::Backoff => + PeerState::Backoff => { if !is_reserved_peer && self.num_in == self.max_in { log::trace!( target: LOG_TARGET, @@ -590,7 +642,16 @@ impl Peerset { ); return ValidationResult::Reject - }, + } + + // The peer remains in the `PeerState::Backoff` state until the current timer + // expires. Then, the peer will be in the disconnected state, subject to further + // rejection if the peer is not reserved by then. + if should_reject { + return ValidationResult::Reject + } + }, + // `Peerset` had initiated an outbound substream but litep2p had received an inbound // substream before the command to open the substream was received, meaning local and // remote desired to open a connection at the same time. Since outbound substreams @@ -605,6 +666,17 @@ impl Peerset { // inbound substreams, that system has to be kept working for the time being. Once that // issue is fixed, this approach can be re-evaluated if need be. PeerState::Opening { direction: Direction::Outbound(reserved) } => { + if should_reject { + log::trace!( + target: LOG_TARGET, + "{}: rejecting inbound substream from {peer:?} ({reserved:?}) in reserved-only mode that was marked outbound", + self.protocol, + ); + + *state = PeerState::Canceled { direction: Direction::Outbound(*reserved) }; + return ValidationResult::Reject + } + log::trace!( target: LOG_TARGET, "{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound", @@ -616,7 +688,7 @@ impl Peerset { PeerState::Canceled { direction } => { log::trace!( target: LOG_TARGET, - "{}: {peer:?} is canceled, rejecting substream", + "{}: {peer:?} is canceled, rejecting substream should_reject={should_reject}", self.protocol, ); @@ -870,6 +942,12 @@ impl Peerset { &self.peers } + /// Get reference to known peers. + #[cfg(test)] + pub fn peers_mut(&mut self) -> &mut HashMap { + &mut self.peers + } + /// Get reference to reserved peers. #[cfg(test)] pub fn reserved_peers(&self) -> &HashSet { @@ -893,6 +971,8 @@ impl Stream for Peerset { } if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) { + log::trace!(target: LOG_TARGET, "{}: received command {action:?}", self.protocol); + match action { PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) => match self.peers.remove(&peer) { diff --git a/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs b/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs index 295a5b441b3ea..9ec332681336f 100644 --- a/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs +++ b/substrate/client/network/src/litep2p/shim/notification/tests/peerset.rs @@ -35,7 +35,10 @@ use sc_network_types::PeerId; use std::{ collections::HashSet, - sync::{atomic::Ordering, Arc}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::Poll, }; @@ -1123,3 +1126,174 @@ async fn set_reserved_peers_cannot_move_previously_reserved() { assert_eq!(peerset.num_out(), 0usize); assert_eq!(peerset.reserved_peers().len(), 3usize); } + +#[tokio::test] +async fn reserved_only_rejects_non_reserved_peers() { + sp_tracing::try_init_simple(); + + let peerstore_handle = Arc::new(peerstore_handle_test()); + let reserved_peers = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]); + + let connected_peers = Arc::new(AtomicUsize::new(0)); + let (mut peerset, to_peerset) = Peerset::new( + ProtocolName::from("/notif/1"), + 3, + 3, + true, + reserved_peers.clone(), + connected_peers.clone(), + peerstore_handle, + ); + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + + // Step 1. Connect reserved peers. + { + match peerset.next().await { + Some(PeersetNotificationCommand::OpenSubstream { peers: out_peers }) => { + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + + for outbound_peer in &out_peers { + assert!(reserved_peers.contains(outbound_peer)); + assert_eq!( + peerset.peers().get(&outbound_peer), + Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) }) + ); + } + }, + event => panic!("invalid event: {event:?}"), + } + // Report the reserved peers as connected. + for peer in &reserved_peers { + assert!(std::matches!( + peerset.report_substream_opened(*peer, traits::Direction::Outbound), + OpenResult::Accept { .. } + )); + assert_eq!( + peerset.peers().get(peer), + Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) }) + ); + } + assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize); + } + + // Step 2. Ensure non-reserved peers are rejected. + let normal_peers: Vec = vec![PeerId::random(), PeerId::random(), PeerId::random()]; + { + // Report the peers as inbound for validation purposes. + for peer in &normal_peers { + // We are running in reserved only mode. + let result = peerset.report_inbound_substream(*peer); + assert_eq!(result, ValidationResult::Reject); + + // The peer must be kept in the disconnected state. + assert_eq!(peerset.peers().get(peer), Some(&PeerState::Disconnected)); + } + // Ensure slots are not used. + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 0usize); + + // Report that all substreams were opened. + for peer in &normal_peers { + // We must reject them because the peers were rejected prior by + // `report_inbound_substream` and therefore set into the disconnected state. + let result = peerset.report_substream_opened(*peer, traits::Direction::Inbound); + assert_eq!(result, OpenResult::Reject); + + // Peer remains disconnected. + assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Disconnected)); + } + assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize); + + // Because we have returned `Reject` from `report_substream_opened` + // the substreams will later be closed. + for peer in &normal_peers { + peerset.report_substream_closed(*peer); + + // Peer moves into the backoff state. + assert_eq!(peerset.peers().get(peer), Some(&PeerState::Backoff)); + } + // The slots are not used / altered. + assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize); + } + + // Move peers out of the backoff state (ie simulate 5s elapsed time). + for (peer, state) in peerset.peers_mut() { + if normal_peers.contains(peer) { + match state { + PeerState::Backoff => *state = PeerState::Disconnected, + state => panic!("invalid state peer={peer:?} state={state:?}"), + } + } else if reserved_peers.contains(peer) { + match state { + PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) } => {}, + state => panic!("invalid state peer={peer:?} state={state:?}"), + } + } else { + panic!("invalid peer={peer:?} not present"); + } + } + + // Step 3. Allow connections from non-reserved peers. + { + to_peerset + .unbounded_send(PeersetCommand::SetReservedOnly { reserved_only: false }) + .unwrap(); + // This will activate the non-reserved peers and give us the best outgoing + // candidates to connect to. + match peerset.next().await { + Some(PeersetNotificationCommand::OpenSubstream { peers }) => { + // These are the non-reserved peers we informed the peerset above. + assert_eq!(peers.len(), 3); + for peer in &peers { + assert!(!reserved_peers.contains(peer)); + assert_eq!( + peerset.peers().get(peer), + Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) + ); + assert!(normal_peers.contains(peer)); + } + }, + event => panic!("invalid event : {event:?}"), + } + // Ensure slots are used. + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 3usize); + + for peer in &normal_peers { + let result = peerset.report_inbound_substream(*peer); + assert_eq!(result, ValidationResult::Accept); + // Direction is kept from the outbound slot allocation. + assert_eq!( + peerset.peers().get(peer), + Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) }) + ); + } + // Ensure slots are used. + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 3usize); + // Peers are only reported as connected once the substream is opened. + // 3 represents the reserved peers that are already connected. + assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize); + + let (success, failure) = normal_peers.split_at(2); + for peer in success { + assert!(std::matches!( + peerset.report_substream_opened(*peer, traits::Direction::Outbound), + OpenResult::Accept { .. } + )); + assert_eq!( + peerset.peers().get(peer), + Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::No) }) + ); + } + // Simulate one failure. + let failure = failure[0]; + peerset.report_substream_open_failure(failure, NotificationError::ChannelClogged); + assert_eq!(peerset.peers().get(&failure), Some(&PeerState::Backoff)); + assert_eq!(peerset.num_in(), 0usize); + assert_eq!(peerset.num_out(), 2usize); + assert_eq!(connected_peers.load(Ordering::Relaxed), 5usize); + } +}