Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions prdoc/pr_8650.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
title: 'litep2p/peerset: Reject non-reserved peers in the reserved-only mode'
doc:
- audience: Node Operator
description: |-
This PR rejects non-reserved peers in the reserved-only mode of the litep2p notification peerset.

Previously, litep2p ignored completely the reserved-only state while accepting inbound connections. However, it handled it properly during the slot allocation phase.
- the main changes are in the `report_inbound_substream` function, which now propagated a `Rejected` response to litep2p on the reserved-only state
- in response, litep2p should never open an inbound substream after receiving the rejected response
- the state of peers is not advanced while in `Disconnected` or `Backoff` states
- the opening state is moved to `Cancelled`
- for consistency purposes (and fuzzing purposes), the `report_substream_opened` is more robustly handling the `Disconnected` state
- while at it have replaced a panic with `debug_assert` and an instant reject

## Testing Done
- started 2 nodes in Kusama and Polkadot with litep2p
- added the `reserved_only_rejects_non_reserved_peers` test to ensure litep2p handles peers properly from different states


This PR has been extracted from https://github.com/paritytech/polkadot-sdk/pull/8461 to ease the review process

cc @paritytech/networking
crates:
- name: sc-network
bump: patch
94 changes: 87 additions & 7 deletions substrate/client/network/src/litep2p/shim/notification/peerset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl From<Direction> for traits::Direction {
}

/// Open result for a fully-opened connection.
#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Debug)]
pub enum OpenResult {
/// Accept the connection.
Accept {
Expand Down Expand Up @@ -416,6 +416,15 @@ impl Peerset {
// if some connected peer gets banned.
peerstore_handle.register_protocol(Arc::new(PeersetHandle { tx: cmd_tx.clone() }));

log::debug!(
target: LOG_TARGET,
"{}: creating new peerset with max_outbound {} and max_inbound {} and reserved_only {}",
protocol,
max_out,
max_in,
reserved_only,
);

(
Self {
protocol,
Expand Down Expand Up @@ -485,8 +494,25 @@ impl Peerset {

return OpenResult::Reject
},
// The peer was already rejected by the `report_inbound_substream` call and this
// should never happen. However, this code path is exercised by our fuzzer.
PeerState::Disconnected => {
log::debug!(
target: LOG_TARGET,
"{}: substream opened for a peer that was previously rejected {peer:?}",
self.protocol,
);
return OpenResult::Reject
},
state => {
panic!("{}: invalid state for open substream {peer:?} {state:?}", self.protocol);
log::error!(
target: LOG_TARGET,
"{}: substream opened for a peer in invalid state {peer:?}: {state:?}",
self.protocol,
);

debug_assert!(false);
return OpenResult::Reject;
},
}
}
Expand Down Expand Up @@ -545,14 +571,27 @@ impl Peerset {
PeerState::Closing { .. } | PeerState::Connected { .. } => {
log::debug!(target: LOG_TARGET, "{}: reserved peer {peer:?} disconnected", self.protocol);
},
// The peer was already rejected by the `report_inbound_substream` call and this
// should never happen. However, this code path is exercised by our fuzzer.
PeerState::Disconnected => {
log::debug!(
target: LOG_TARGET,
"{}: substream closed for a peer that was previously rejected {peer:?}",
self.protocol,
);
},
state => {
log::warn!(target: LOG_TARGET, "{}: invalid state for disconnected peer {peer:?}: {state:?}", self.protocol);
debug_assert!(false);
},
}
*state = PeerState::Backoff;

self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
// Rejected peers do not count towards slot allocation.
if !matches!(state, PeerState::Disconnected) {
self.connected_peers.fetch_sub(1usize, Ordering::Relaxed);
}

*state = PeerState::Backoff;
self.pending_backoffs.push(Box::pin(async move {
Delay::new(DEFAULT_BACKOFF).await;
(peer, DISCONNECT_ADJUSTMENT)
Expand All @@ -576,12 +615,25 @@ impl Peerset {
let state = self.peers.entry(peer).or_insert(PeerState::Disconnected);
let is_reserved_peer = self.reserved_peers.contains(&peer);

// Check if this is a non-reserved peer and if the protocol is in reserved-only mode.
let should_reject = self.reserved_only && !is_reserved_peer;

match state {
// disconnected peers that are reserved-only peers are rejected
PeerState::Disconnected if should_reject => {
log::trace!(
target: LOG_TARGET,
"{}: rejecting non-reserved peer {peer:?} in reserved-only mode (prev state: {state:?})",
self.protocol,
);

return ValidationResult::Reject
},
// disconnected peers proceed directly to inbound slot allocation
PeerState::Disconnected => {},
// peer is backed off but if it can be accepted (either a reserved peer or inbound slot
// available), accept the peer and then just ignore the back-off timer when it expires
PeerState::Backoff =>
PeerState::Backoff => {
if !is_reserved_peer && self.num_in == self.max_in {
log::trace!(
target: LOG_TARGET,
Expand All @@ -590,7 +642,16 @@ impl Peerset {
);

return ValidationResult::Reject
},
}

// The peer remains in the `PeerState::Backoff` state until the current timer
// expires. Then, the peer will be in the disconnected state, subject to further
// rejection if the peer is not reserved by then.
if should_reject {
return ValidationResult::Reject
}
},

// `Peerset` had initiated an outbound substream but litep2p had received an inbound
// substream before the command to open the substream was received, meaning local and
// remote desired to open a connection at the same time. Since outbound substreams
Expand All @@ -605,6 +666,17 @@ impl Peerset {
// inbound substreams, that system has to be kept working for the time being. Once that
// issue is fixed, this approach can be re-evaluated if need be.
PeerState::Opening { direction: Direction::Outbound(reserved) } => {
if should_reject {
log::trace!(
target: LOG_TARGET,
"{}: rejecting inbound substream from {peer:?} ({reserved:?}) in reserved-only mode that was marked outbound",
self.protocol,
);

*state = PeerState::Canceled { direction: Direction::Outbound(*reserved) };
return ValidationResult::Reject
}

log::trace!(
target: LOG_TARGET,
"{}: inbound substream received for {peer:?} ({reserved:?}) that was marked outbound",
Expand All @@ -616,7 +688,7 @@ impl Peerset {
PeerState::Canceled { direction } => {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} is canceled, rejecting substream",
"{}: {peer:?} is canceled, rejecting substream should_reject={should_reject}",
self.protocol,
);

Expand Down Expand Up @@ -870,6 +942,12 @@ impl Peerset {
&self.peers
}

/// Get reference to known peers.
#[cfg(test)]
pub fn peers_mut(&mut self) -> &mut HashMap<PeerId, PeerState> {
&mut self.peers
}

/// Get reference to reserved peers.
#[cfg(test)]
pub fn reserved_peers(&self) -> &HashSet<PeerId> {
Expand All @@ -893,6 +971,8 @@ impl Stream for Peerset {
}

if let Poll::Ready(Some(action)) = Pin::new(&mut self.cmd_rx).poll_next(cx) {
log::trace!(target: LOG_TARGET, "{}: received command {action:?}", self.protocol);

match action {
PeersetCommand::DisconnectPeer { peer } if !self.reserved_peers.contains(&peer) =>
match self.peers.remove(&peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use sc_network_types::PeerId;

use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Poll,
};

Expand Down Expand Up @@ -1123,3 +1126,174 @@ async fn set_reserved_peers_cannot_move_previously_reserved() {
assert_eq!(peerset.num_out(), 0usize);
assert_eq!(peerset.reserved_peers().len(), 3usize);
}

#[tokio::test]
async fn reserved_only_rejects_non_reserved_peers() {
sp_tracing::try_init_simple();

let peerstore_handle = Arc::new(peerstore_handle_test());
let reserved_peers = HashSet::from_iter([PeerId::random(), PeerId::random(), PeerId::random()]);

let connected_peers = Arc::new(AtomicUsize::new(0));
let (mut peerset, to_peerset) = Peerset::new(
ProtocolName::from("/notif/1"),
3,
3,
true,
reserved_peers.clone(),
connected_peers.clone(),
peerstore_handle,
);
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);

// Step 1. Connect reserved peers.
{
match peerset.next().await {
Some(PeersetNotificationCommand::OpenSubstream { peers: out_peers }) => {
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);

for outbound_peer in &out_peers {
assert!(reserved_peers.contains(outbound_peer));
assert_eq!(
peerset.peers().get(&outbound_peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::Yes) })
);
}
},
event => panic!("invalid event: {event:?}"),
}
// Report the reserved peers as connected.
for peer in &reserved_peers {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) })
);
}
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);
}

// Step 2. Ensure non-reserved peers are rejected.
let normal_peers: Vec<PeerId> = vec![PeerId::random(), PeerId::random(), PeerId::random()];
{
// Report the peers as inbound for validation purposes.
for peer in &normal_peers {
// We are running in reserved only mode.
let result = peerset.report_inbound_substream(*peer);
assert_eq!(result, ValidationResult::Reject);

// The peer must be kept in the disconnected state.
assert_eq!(peerset.peers().get(peer), Some(&PeerState::Disconnected));
}
// Ensure slots are not used.
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 0usize);

// Report that all substreams were opened.
for peer in &normal_peers {
// We must reject them because the peers were rejected prior by
// `report_inbound_substream` and therefore set into the disconnected state.
let result = peerset.report_substream_opened(*peer, traits::Direction::Inbound);
assert_eq!(result, OpenResult::Reject);

// Peer remains disconnected.
assert_eq!(peerset.peers().get(&peer), Some(&PeerState::Disconnected));
}
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);

// Because we have returned `Reject` from `report_substream_opened`
// the substreams will later be closed.
for peer in &normal_peers {
peerset.report_substream_closed(*peer);

// Peer moves into the backoff state.
assert_eq!(peerset.peers().get(peer), Some(&PeerState::Backoff));
}
// The slots are not used / altered.
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);
}

// Move peers out of the backoff state (ie simulate 5s elapsed time).
for (peer, state) in peerset.peers_mut() {
if normal_peers.contains(peer) {
match state {
PeerState::Backoff => *state = PeerState::Disconnected,
state => panic!("invalid state peer={peer:?} state={state:?}"),
}
} else if reserved_peers.contains(peer) {
match state {
PeerState::Connected { direction: Direction::Outbound(Reserved::Yes) } => {},
state => panic!("invalid state peer={peer:?} state={state:?}"),
}
} else {
panic!("invalid peer={peer:?} not present");
}
}

// Step 3. Allow connections from non-reserved peers.
{
to_peerset
.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only: false })
.unwrap();
// This will activate the non-reserved peers and give us the best outgoing
// candidates to connect to.
match peerset.next().await {
Some(PeersetNotificationCommand::OpenSubstream { peers }) => {
// These are the non-reserved peers we informed the peerset above.
assert_eq!(peers.len(), 3);
for peer in &peers {
assert!(!reserved_peers.contains(peer));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
assert!(normal_peers.contains(peer));
}
},
event => panic!("invalid event : {event:?}"),
}
// Ensure slots are used.
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);

for peer in &normal_peers {
let result = peerset.report_inbound_substream(*peer);
assert_eq!(result, ValidationResult::Accept);
// Direction is kept from the outbound slot allocation.
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Opening { direction: Direction::Outbound(Reserved::No) })
);
}
// Ensure slots are used.
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 3usize);
// Peers are only reported as connected once the substream is opened.
// 3 represents the reserved peers that are already connected.
assert_eq!(connected_peers.load(Ordering::Relaxed), 3usize);

let (success, failure) = normal_peers.split_at(2);
for peer in success {
assert!(std::matches!(
peerset.report_substream_opened(*peer, traits::Direction::Outbound),
OpenResult::Accept { .. }
));
assert_eq!(
peerset.peers().get(peer),
Some(&PeerState::Connected { direction: Direction::Outbound(Reserved::No) })
);
}
// Simulate one failure.
let failure = failure[0];
peerset.report_substream_open_failure(failure, NotificationError::ChannelClogged);
assert_eq!(peerset.peers().get(&failure), Some(&PeerState::Backoff));
assert_eq!(peerset.num_in(), 0usize);
assert_eq!(peerset.num_out(), 2usize);
assert_eq!(connected_peers.load(Ordering::Relaxed), 5usize);
}
}
Loading