From b4e75b24e63ea521977d33a7eee187d4928b4846 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 10 Feb 2026 12:21:48 -0300 Subject: [PATCH 1/4] fix: make observed_peer_subnets fork-aware to prevent incorrect unsubscription During fork transitions, a peer may be subscribed to the same subnet via topics from different forks (e.g., ssv.v2.42 and /ssv/mainnet/boole/42). Previously, unsubscribing from one fork's topic would clear the subnet bit entirely, even though the peer was still subscribed via the other fork's topic. Track subscriptions per fork so each fork maintains its own bitmap. Query methods aggregate across forks using bitwise union when checking subnet coverage. Closes #818 Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 1 + anchor/network/Cargo.toml | 1 + anchor/network/src/network.rs | 20 +- anchor/network/src/peer_manager/connection.rs | 374 +++++++++++++++++- anchor/network/src/peer_manager/mod.rs | 12 +- 5 files changed, 379 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58ae869a2..f8b33b194 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5283,6 +5283,7 @@ dependencies = [ "async-trait", "discv5", "ethereum_ssz", + "fork", "futures", "global_config", "hex", diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index 4fb3ad524..6e0e4c19f 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -9,6 +9,7 @@ async-broadcast = { workspace = true } async-trait = "0.1.85" discv5 = { workspace = true } ethereum_ssz = { workspace = true } +fork = { workspace = true } futures = { workspace = true } global_config = { workspace = true } gossipsub = { workspace = true } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 569c6fac2..c1f332722 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -276,15 +276,23 @@ impl Network { self.handle_gossipsub_message(propagation_source, message_id, message); } gossipsub::Event::Subscribed { peer_id, topic } => { - if let Some(subnet) = topic::parse_subnet_id(&topic) { - self.peer_manager() - .set_peer_subscription(peer_id, subnet, true); + if let Some(parsed) = topic::parse_topic(&topic) { + self.peer_manager().set_peer_subscription( + peer_id, + parsed.fork, + parsed.subnet_id, + true, + ); } } gossipsub::Event::Unsubscribed { peer_id, topic } => { - if let Some(subnet) = topic::parse_subnet_id(&topic) { - self.peer_manager() - .set_peer_subscription(peer_id, subnet, false); + if let Some(parsed) = topic::parse_topic(&topic) { + self.peer_manager().set_peer_subscription( + peer_id, + parsed.fork, + parsed.subnet_id, + false, + ); } } _ => { diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 55b6102fb..086d53f70 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -4,6 +4,7 @@ use std::{ }; use discv5::libp2p_identity::PeerId; +use fork::Fork; use libp2p::{ Multiaddr, connection_limits::{self, ConnectionLimits}, @@ -46,8 +47,11 @@ pub struct ConnectionManager { pub connected: HashSet, pub target_peers: usize, pub max_with_priority_peers: usize, - // Map of observed gossipsub subscriptions per peer. Prefer this over ENR claims. - observed_peer_subnets: HashMap>>, + // Per-fork observed gossipsub subscriptions per peer. Prefer this over ENR claims. + // Tracked per fork so that unsubscribing from one fork's topic doesn't clear the + // bit for the same subnet on another fork's topic. + // See: https://github.com/sigp/anchor/issues/818 + observed_peer_subnets: HashMap>>>, // Track inbound vs outbound connection counts inbound_count: usize, outbound_count: usize, @@ -116,20 +120,34 @@ impl ConnectionManager { self.connection_limits = Self::create_connection_limits(new_target); } - /// External update from gossipsub events about peer subscription state - pub fn set_peer_subscribed(&mut self, peer: PeerId, subnet: SubnetId, subscribed: bool) { - let entry = self.observed_peer_subnets.entry(peer).or_default(); - + /// External update from gossipsub events about peer subscription state. + /// + /// Subscriptions are tracked per fork so that unsubscribing from one fork's topic + /// (e.g., `ssv.v2.42`) does not clear the subnet bit if the peer is still + /// subscribed via another fork's topic (e.g., `/ssv/mainnet/boole/42`). + pub fn set_peer_subscribed( + &mut self, + peer: PeerId, + fork: Fork, + subnet: SubnetId, + subscribed: bool, + ) { let idx = *subnet.deref() as usize; - if idx < entry.len() { - // Safe to ignore the result of `set` because we have already checked that `idx < - // entry.len()` - let _ = entry.set(idx, subscribed); + let fork_map = self.observed_peer_subnets.entry(peer).or_default(); + let bitfield = fork_map.entry(fork).or_default(); + + if idx < bitfield.len() { + let _ = bitfield.set(idx, subscribed); } - // If peer is now unsubscribed from all observed subnets, drop the entry to keep map small - if !subscribed && !entry.iter().any(|b| b) { - self.observed_peer_subnets.remove(&peer); + // Clean up empty entries to keep maps small + if !subscribed { + if bitfield.is_zero() { + fork_map.remove(&fork); + } + if fork_map.is_empty() { + self.observed_peer_subnets.remove(&peer); + } } } @@ -217,12 +235,12 @@ impl ConnectionManager { return true; } - // Only use observed subscriptions, no ENR fallback - let Some(observed) = self.observed_peer_subnets.get(peer) else { + // Only use observed subscriptions (aggregated across forks), no ENR fallback + let Some(observed) = self.get_peer_subnets_observed_only(peer) else { return false; }; - self.bitfield_offers_any_subnet(observed, needed) + self.bitfield_offers_any_subnet(&observed, needed) } /// Check if a peer offers any needed subnets, using ENR as fallback. @@ -263,9 +281,22 @@ impl ConnectionManager { false } - /// Get subnets a peer claims to support from observed gossipsub only. + /// Get subnets a peer claims to support from observed gossipsub only, + /// aggregated across all forks (union of per-fork bitmaps). fn get_peer_subnets_observed_only(&self, peer: &PeerId) -> Option>> { - self.observed_peer_subnets.get(peer).cloned() + let fork_map = self.observed_peer_subnets.get(peer)?; + Some(Self::aggregate_fork_bitmaps(fork_map)) + } + + /// OR all per-fork bitmaps together into a single aggregate bitfield. + fn aggregate_fork_bitmaps( + fork_map: &HashMap>>, + ) -> Bitfield> { + let mut result = Bitfield::default(); + for bitfield in fork_map.values() { + result = result.union(bitfield); + } + result } /// Get subnets a peer claims to support, with ENR fallback. @@ -286,8 +317,8 @@ impl ConnectionManager { /// Handle connection established event pub fn on_connection_established(&mut self, peer_id: PeerId, is_outbound: bool) -> bool { - // Initialize with empty bitfield to indicate we're now observing this peer - // If they never subscribe to anything, we'll know they offer no subnets + // Initialize with empty fork map to indicate we're now observing this peer. + // If they never subscribe to anything, we'll know they offer no subnets. self.observed_peer_subnets.entry(peer_id).or_default(); // Track connection direction counter @@ -483,3 +514,306 @@ impl ConnectionManager { self.connection_limits.on_swarm_event(event); } } + +#[cfg(test)] +mod tests { + use super::*; + + // ==================== Test constants ==================== + + const TARGET_PEERS: usize = 50; + + // ==================== Helper functions ==================== + + /// Creates a `ConnectionManager` with default target peers for testing. + fn create_test_manager() -> ConnectionManager { + ConnectionManager::new(TARGET_PEERS) + } + + /// Connects a peer to the manager (adds to `connected` set and initializes + /// `observed_peer_subnets`), returning the generated `PeerId`. + fn connect_random_peer(mgr: &mut ConnectionManager) -> PeerId { + let peer = PeerId::random(); + mgr.on_connection_established(peer, /* is_outbound = */ true); + peer + } + + /// Returns the aggregated (union across forks) bitfield for a peer, or `None`. + fn aggregated_bitfield( + mgr: &ConnectionManager, + peer: &PeerId, + ) -> Option>> { + mgr.get_peer_subnets_observed_only(peer) + } + + /// Checks whether a specific subnet bit is set in the aggregated bitfield. + fn is_subnet_set(mgr: &ConnectionManager, peer: &PeerId, subnet: u64) -> bool { + aggregated_bitfield(mgr, peer) + .map(|bf| bf.get(subnet as usize).unwrap_or(false)) + .unwrap_or(false) + } + + // ==================== Single fork subscribe/unsubscribe ==================== + + #[test] + fn test_set_peer_subscribed_single_fork_subscribe() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + let subnet = SubnetId::new(5); + + // Act + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + + // Assert + assert!( + is_subnet_set(&mgr, &peer, 5), + "Subnet 5 should be set after subscribing on Alan fork" + ); + } + + #[test] + fn test_set_peer_subscribed_single_fork_unsubscribe() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + let subnet = SubnetId::new(5); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + + // Act + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + + // Assert + assert!( + !is_subnet_set(&mgr, &peer, 5), + "Subnet 5 should be cleared after unsubscribing on Alan fork" + ); + } + + // ==================== Multi-fork bug scenario (issue #818) ==================== + + /// Regression test for https://github.com/sigp/anchor/issues/818 + /// + /// When a peer is subscribed to the same subnet on two different forks, + /// unsubscribing from one fork must NOT clear the subnet bit in the + /// aggregated view because the other fork still holds it. + #[test] + fn test_unsubscribe_one_fork_retains_subnet_from_other_fork() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + let subnet = SubnetId::new(42); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet, true); + + // Act: unsubscribe from Alan only + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + + // Assert: subnet 42 should still be set because Boole holds it + assert!( + is_subnet_set(&mgr, &peer, 42), + "Subnet 42 must remain set when Boole fork still subscribes to it" + ); + } + + #[test] + fn test_unsubscribe_both_forks_clears_subnet() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + let subnet = SubnetId::new(42); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet, true); + + // Act: unsubscribe from both forks + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet, false); + + // Assert: subnet 42 should now be cleared + assert!( + !is_subnet_set(&mgr, &peer, 42), + "Subnet 42 must be cleared after unsubscribing from all forks" + ); + } + + // ==================== Aggregation across forks ==================== + + #[test] + fn test_aggregation_unions_subnets_across_forks() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + // Act: subscribe to different subnets on different forks + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(10), true); + mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(20), true); + + // Assert: aggregated view should contain both bits + let bf = aggregated_bitfield(&mgr, &peer).expect("peer should have an aggregated bitfield"); + assert!( + bf.get(10).unwrap_or(false), + "Subnet 10 (Alan) must be present in aggregated bitfield" + ); + assert!( + bf.get(20).unwrap_or(false), + "Subnet 20 (Boole) must be present in aggregated bitfield" + ); + assert!( + !bf.get(30).unwrap_or(false), + "Subnet 30 should NOT be present (never subscribed)" + ); + } + + // ==================== Cleanup on full unsubscribe ==================== + + #[test] + fn test_full_unsubscribe_removes_peer_entry() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), true); + mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(99), true); + + // Act: unsubscribe from everything + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), false); + mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(99), false); + + // Assert: the peer should be completely removed from observed_peer_subnets + assert!( + !mgr.observed_peer_subnets.contains_key(&peer), + "Peer entry must be removed from observed_peer_subnets when all \ + per-fork bitmaps are empty" + ); + } + + #[test] + fn test_partial_unsubscribe_keeps_peer_entry() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), true); + mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(99), true); + + // Act: unsubscribe from Alan only + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), false); + + // Assert: peer entry still present (Boole fork still has a subscription) + assert!( + mgr.observed_peer_subnets.contains_key(&peer), + "Peer entry must remain while at least one fork bitmap is non-empty" + ); + } + + // ==================== count_observed_peers_for_subnets with multi-fork ==================== + + #[test] + fn test_count_observed_peers_counts_across_forks() { + // Arrange + let mut mgr = create_test_manager(); + let peer_a = connect_random_peer(&mut mgr); + let peer_b = connect_random_peer(&mut mgr); + + // peer_a subscribes to subnet 10 on Alan + mgr.set_peer_subscribed(peer_a, Fork::Alan, SubnetId::new(10), true); + // peer_b subscribes to subnet 10 on Boole (different fork, same subnet) + mgr.set_peer_subscribed(peer_b, Fork::Boole, SubnetId::new(10), true); + // peer_a also subscribes to subnet 20 on Boole + mgr.set_peer_subscribed(peer_a, Fork::Boole, SubnetId::new(20), true); + + // Act + let counts = mgr.count_observed_peers_for_subnets(&[ + SubnetId::new(10), + SubnetId::new(20), + SubnetId::new(30), + ]); + + // Assert + assert_eq!( + counts, + vec![2, 1, 0], + "Subnet 10 should have 2 peers, subnet 20 should have 1, subnet 30 should have 0" + ); + } + + #[test] + fn test_count_observed_peers_requires_connected() { + // Arrange: subscribe a peer but do NOT connect it via on_connection_established + let mut mgr = create_test_manager(); + let disconnected_peer = PeerId::random(); + // Directly set a subscription without calling on_connection_established + mgr.set_peer_subscribed(disconnected_peer, Fork::Alan, SubnetId::new(5), true); + + // Also add a properly connected peer for the same subnet + let connected_peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(connected_peer, Fork::Alan, SubnetId::new(5), true); + + // Act + let counts = mgr.count_observed_peers_for_subnets(&[SubnetId::new(5)]); + + // Assert: only the connected peer should be counted + assert_eq!( + counts, + vec![1], + "Only connected peers should be counted; disconnected peer must be excluded" + ); + } + + // ==================== peer_offers_needed_subnets_observed_only with multi-fork + // ==================== + + #[test] + fn test_peer_offers_needed_subnets_across_forks() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(50), true); + + let needed = HashSet::from([SubnetId::new(50)]); + + // Act & Assert + assert!( + mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer subscribed to subnet 50 on Boole should satisfy the needed set" + ); + } + + #[test] + fn test_peer_does_not_offer_unsubscribed_subnets() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(10), true); + + let needed = HashSet::from([SubnetId::new(99)]); + + // Act & Assert + assert!( + !mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer subscribed only to subnet 10 should not satisfy need for subnet 99" + ); + } + + /// Verifies that after unsubscribing from one fork, the peer still offers + /// the subnet via the remaining fork. + #[test] + fn test_peer_offers_needed_subnets_after_partial_unsubscribe() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(42), true); + mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(42), true); + + // Act: unsubscribe from Alan + mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(42), false); + + let needed = HashSet::from([SubnetId::new(42)]); + + // Assert: Boole still provides subnet 42 + assert!( + mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer should still offer subnet 42 after unsubscribing only from Alan" + ); + } +} diff --git a/anchor/network/src/peer_manager/mod.rs b/anchor/network/src/peer_manager/mod.rs index 560bed865..564a0c49c 100644 --- a/anchor/network/src/peer_manager/mod.rs +++ b/anchor/network/src/peer_manager/mod.rs @@ -205,10 +205,16 @@ impl PeerManager { &self.needed_subnets } - /// Update observed gossipsub subscription state for a peer - pub fn set_peer_subscription(&mut self, peer: PeerId, subnet: SubnetId, subscribed: bool) { + /// Update observed gossipsub subscription state for a peer on a specific fork. + pub fn set_peer_subscription( + &mut self, + peer: PeerId, + fork: fork::Fork, + subnet: SubnetId, + subscribed: bool, + ) { self.connection_manager - .set_peer_subscribed(peer, subnet, subscribed); + .set_peer_subscribed(peer, fork, subnet, subscribed); } /// Handle a completed handshake by updating peer client type and triggering metrics update. From 19f675cdf9884bbf13baf963bcd4e14679d53889 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 10 Feb 2026 13:32:40 -0300 Subject: [PATCH 2/4] fix: add warning for out-of-bounds subnet IDs and idempotency test Address review feedback: log a warning when a subnet ID exceeds the bitfield capacity instead of silently ignoring it, and add a test verifying that duplicate subscribes on the same fork+subnet don't break state. --- anchor/network/src/peer_manager/connection.rs | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 086d53f70..57fe638f5 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -138,6 +138,12 @@ impl ConnectionManager { if idx < bitfield.len() { let _ = bitfield.set(idx, subscribed); + } else { + tracing::warn!( + subnet = idx, + max = bitfield.len(), + "Subnet ID exceeds bitfield capacity" + ); } // Clean up empty entries to keep maps small @@ -816,4 +822,25 @@ mod tests { "Peer should still offer subnet 42 after unsubscribing only from Alan" ); } + + // ==================== Idempotency ==================== + + #[test] + fn test_duplicate_subscribe_then_single_unsubscribe_clears_bit() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + let subnet = SubnetId::new(5); + + // Act: subscribe twice on the same fork+subnet, then unsubscribe once + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + + // Assert: bit should be cleared — bitfield tracks presence, not a count + assert!( + !is_subnet_set(&mgr, &peer, 5), + "A single unsubscribe should clear the bit regardless of duplicate subscribes" + ); + } } From 481f223e9c1a8de0b4f9c2d96ce26f9bb877aa56 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 10 Feb 2026 14:30:07 -0300 Subject: [PATCH 3/4] ci: disable port publishing in local testnet to prevent port conflicts --- .github/custom/local-testnet/params.yaml | 2 +- .github/workflows/local-testnet.yml | 4 +- anchor/network/src/peer_manager/connection.rs | 146 +++++++++--------- 3 files changed, 80 insertions(+), 72 deletions(-) diff --git a/.github/custom/local-testnet/params.yaml b/.github/custom/local-testnet/params.yaml index 42fea5259..65e38737f 100644 --- a/.github/custom/local-testnet/params.yaml +++ b/.github/custom/local-testnet/params.yaml @@ -30,7 +30,7 @@ network: port_publisher: cl: - enabled: true + enabled: false # Assertoor can only load from URLs here (for some reason). The placeholder is replaced in the workflow. assertoor_params: diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index b0dbe995c..a58d16e22 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -66,7 +66,9 @@ jobs: - name: Start Local Testnet with Assertoor timeout-minutes: 30 - run: kurtosis run --verbosity DETAILED --enclave localnet ssv-mini --args-file anchor/.github/custom/local-testnet/params.yaml + run: | + kurtosis clean -a || true + kurtosis run --verbosity DETAILED --enclave localnet ssv-mini --args-file anchor/.github/custom/local-testnet/params.yaml - name: Await Assertoor Test Result id: assertoor_test_result diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 57fe638f5..8fd0d07b4 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -529,6 +529,18 @@ mod tests { const TARGET_PEERS: usize = 50; + // Subnet IDs used across tests. Each has a distinct role to aid readability. + const SUBNET_A: u64 = 5; + const SUBNET_B: u64 = 42; + const SUBNET_C: u64 = 10; + const SUBNET_D: u64 = 20; + const SUBNET_E: u64 = 99; + const SUBNET_UNSUBSCRIBED: u64 = 30; + + fn subnet(id: u64) -> SubnetId { + SubnetId::new(id) + } + // ==================== Helper functions ==================== /// Creates a `ConnectionManager` with default target peers for testing. @@ -566,15 +578,14 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - let subnet = SubnetId::new(5); // Act - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); // Assert assert!( - is_subnet_set(&mgr, &peer, 5), - "Subnet 5 should be set after subscribing on Alan fork" + is_subnet_set(&mgr, &peer, SUBNET_A), + "Subnet should be set after subscribing on Alan fork" ); } @@ -583,16 +594,15 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - let subnet = SubnetId::new(5); - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); // Act - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); // Assert assert!( - !is_subnet_set(&mgr, &peer, 5), - "Subnet 5 should be cleared after unsubscribing on Alan fork" + !is_subnet_set(&mgr, &peer, SUBNET_A), + "Subnet should be cleared after unsubscribing on Alan fork" ); } @@ -608,18 +618,17 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - let subnet = SubnetId::new(42); - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); - mgr.set_peer_subscribed(peer, Fork::Boole, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); // Act: unsubscribe from Alan only - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), false); - // Assert: subnet 42 should still be set because Boole holds it + // Assert: subnet should still be set because Boole holds it assert!( - is_subnet_set(&mgr, &peer, 42), - "Subnet 42 must remain set when Boole fork still subscribes to it" + is_subnet_set(&mgr, &peer, SUBNET_B), + "Subnet must remain set when Boole fork still subscribes to it" ); } @@ -628,19 +637,18 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - let subnet = SubnetId::new(42); - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); - mgr.set_peer_subscribed(peer, Fork::Boole, subnet, true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); // Act: unsubscribe from both forks - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); - mgr.set_peer_subscribed(peer, Fork::Boole, subnet, false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), false); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), false); - // Assert: subnet 42 should now be cleared + // Assert: subnet should now be cleared assert!( - !is_subnet_set(&mgr, &peer, 42), - "Subnet 42 must be cleared after unsubscribing from all forks" + !is_subnet_set(&mgr, &peer, SUBNET_B), + "Subnet must be cleared after unsubscribing from all forks" ); } @@ -653,22 +661,22 @@ mod tests { let peer = connect_random_peer(&mut mgr); // Act: subscribe to different subnets on different forks - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(10), true); - mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(20), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_C), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_D), true); // Assert: aggregated view should contain both bits let bf = aggregated_bitfield(&mgr, &peer).expect("peer should have an aggregated bitfield"); assert!( - bf.get(10).unwrap_or(false), - "Subnet 10 (Alan) must be present in aggregated bitfield" + bf.get(SUBNET_C as usize).unwrap_or(false), + "Alan subnet must be present in aggregated bitfield" ); assert!( - bf.get(20).unwrap_or(false), - "Subnet 20 (Boole) must be present in aggregated bitfield" + bf.get(SUBNET_D as usize).unwrap_or(false), + "Boole subnet must be present in aggregated bitfield" ); assert!( - !bf.get(30).unwrap_or(false), - "Subnet 30 should NOT be present (never subscribed)" + !bf.get(SUBNET_UNSUBSCRIBED as usize).unwrap_or(false), + "Unsubscribed subnet should NOT be present" ); } @@ -679,12 +687,12 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), true); - mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(99), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), true); // Act: unsubscribe from everything - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), false); - mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(99), false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), false); // Assert: the peer should be completely removed from observed_peer_subnets assert!( @@ -699,11 +707,11 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), true); - mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(99), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), true); // Act: unsubscribe from Alan only - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(7), false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); // Assert: peer entry still present (Boole fork still has a subscription) assert!( @@ -721,25 +729,25 @@ mod tests { let peer_a = connect_random_peer(&mut mgr); let peer_b = connect_random_peer(&mut mgr); - // peer_a subscribes to subnet 10 on Alan - mgr.set_peer_subscribed(peer_a, Fork::Alan, SubnetId::new(10), true); - // peer_b subscribes to subnet 10 on Boole (different fork, same subnet) - mgr.set_peer_subscribed(peer_b, Fork::Boole, SubnetId::new(10), true); - // peer_a also subscribes to subnet 20 on Boole - mgr.set_peer_subscribed(peer_a, Fork::Boole, SubnetId::new(20), true); + // peer_a subscribes to SUBNET_C on Alan + mgr.set_peer_subscribed(peer_a, Fork::Alan, subnet(SUBNET_C), true); + // peer_b subscribes to SUBNET_C on Boole (different fork, same subnet) + mgr.set_peer_subscribed(peer_b, Fork::Boole, subnet(SUBNET_C), true); + // peer_a also subscribes to SUBNET_D on Boole + mgr.set_peer_subscribed(peer_a, Fork::Boole, subnet(SUBNET_D), true); // Act let counts = mgr.count_observed_peers_for_subnets(&[ - SubnetId::new(10), - SubnetId::new(20), - SubnetId::new(30), + subnet(SUBNET_C), + subnet(SUBNET_D), + subnet(SUBNET_UNSUBSCRIBED), ]); // Assert assert_eq!( counts, vec![2, 1, 0], - "Subnet 10 should have 2 peers, subnet 20 should have 1, subnet 30 should have 0" + "SUBNET_C should have 2 peers, SUBNET_D should have 1, SUBNET_UNSUBSCRIBED should have 0" ); } @@ -748,15 +756,14 @@ mod tests { // Arrange: subscribe a peer but do NOT connect it via on_connection_established let mut mgr = create_test_manager(); let disconnected_peer = PeerId::random(); - // Directly set a subscription without calling on_connection_established - mgr.set_peer_subscribed(disconnected_peer, Fork::Alan, SubnetId::new(5), true); + mgr.set_peer_subscribed(disconnected_peer, Fork::Alan, subnet(SUBNET_A), true); // Also add a properly connected peer for the same subnet let connected_peer = connect_random_peer(&mut mgr); - mgr.set_peer_subscribed(connected_peer, Fork::Alan, SubnetId::new(5), true); + mgr.set_peer_subscribed(connected_peer, Fork::Alan, subnet(SUBNET_A), true); // Act - let counts = mgr.count_observed_peers_for_subnets(&[SubnetId::new(5)]); + let counts = mgr.count_observed_peers_for_subnets(&[subnet(SUBNET_A)]); // Assert: only the connected peer should be counted assert_eq!( @@ -774,14 +781,14 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(50), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_D), true); - let needed = HashSet::from([SubnetId::new(50)]); + let needed = HashSet::from([subnet(SUBNET_D)]); // Act & Assert assert!( mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), - "Peer subscribed to subnet 50 on Boole should satisfy the needed set" + "Peer subscribed on Boole should satisfy the needed set" ); } @@ -790,14 +797,14 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(10), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_C), true); - let needed = HashSet::from([SubnetId::new(99)]); + let needed = HashSet::from([subnet(SUBNET_E)]); // Act & Assert assert!( !mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), - "Peer subscribed only to subnet 10 should not satisfy need for subnet 99" + "Peer subscribed to a different subnet should not satisfy the needed set" ); } @@ -808,18 +815,18 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(42), true); - mgr.set_peer_subscribed(peer, Fork::Boole, SubnetId::new(42), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); // Act: unsubscribe from Alan - mgr.set_peer_subscribed(peer, Fork::Alan, SubnetId::new(42), false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), false); - let needed = HashSet::from([SubnetId::new(42)]); + let needed = HashSet::from([subnet(SUBNET_B)]); - // Assert: Boole still provides subnet 42 + // Assert: Boole still provides the subnet assert!( mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), - "Peer should still offer subnet 42 after unsubscribing only from Alan" + "Peer should still offer subnet after unsubscribing only from Alan" ); } @@ -830,16 +837,15 @@ mod tests { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); - let subnet = SubnetId::new(5); // Act: subscribe twice on the same fork+subnet, then unsubscribe once - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, true); - mgr.set_peer_subscribed(peer, Fork::Alan, subnet, false); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); // Assert: bit should be cleared — bitfield tracks presence, not a count assert!( - !is_subnet_set(&mgr, &peer, 5), + !is_subnet_set(&mgr, &peer, SUBNET_A), "A single unsubscribe should clear the bit regardless of duplicate subscribes" ); } From c79569c347b37506e5f81eccb039fde1b39dc698 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 11 Feb 2026 12:54:51 -0300 Subject: [PATCH 4/4] fix: address PR review feedback for fork-aware observed peer subnets Add peer ID to the out-of-bounds subnet warning log and replace the imperative loop in aggregate_fork_bitmaps with a functional fold. --- anchor/network/src/peer_manager/connection.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 8fd0d07b4..2b4a7ceb2 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -140,6 +140,7 @@ impl ConnectionManager { let _ = bitfield.set(idx, subscribed); } else { tracing::warn!( + %peer, subnet = idx, max = bitfield.len(), "Subnet ID exceeds bitfield capacity" @@ -298,11 +299,9 @@ impl ConnectionManager { fn aggregate_fork_bitmaps( fork_map: &HashMap>>, ) -> Bitfield> { - let mut result = Bitfield::default(); - for bitfield in fork_map.values() { - result = result.union(bitfield); - } - result + fork_map + .values() + .fold(Bitfield::default(), |acc, bf| acc.union(bf)) } /// Get subnets a peer claims to support, with ENR fallback.