diff --git a/.github/custom/local-testnet/params.yaml b/.github/custom/local-testnet/params.yaml index 42fea5259..65e38737f 100644 --- a/.github/custom/local-testnet/params.yaml +++ b/.github/custom/local-testnet/params.yaml @@ -30,7 +30,7 @@ network: port_publisher: cl: - enabled: true + enabled: false # Assertoor can only load from URLs here (for some reason). The placeholder is replaced in the workflow. assertoor_params: diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index b0dbe995c..a58d16e22 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -66,7 +66,9 @@ jobs: - name: Start Local Testnet with Assertoor timeout-minutes: 30 - run: kurtosis run --verbosity DETAILED --enclave localnet ssv-mini --args-file anchor/.github/custom/local-testnet/params.yaml + run: | + kurtosis clean -a || true + kurtosis run --verbosity DETAILED --enclave localnet ssv-mini --args-file anchor/.github/custom/local-testnet/params.yaml - name: Await Assertoor Test Result id: assertoor_test_result diff --git a/Cargo.lock b/Cargo.lock index 58ae869a2..f8b33b194 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5283,6 +5283,7 @@ dependencies = [ "async-trait", "discv5", "ethereum_ssz", + "fork", "futures", "global_config", "hex", diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index 4fb3ad524..6e0e4c19f 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -9,6 +9,7 @@ async-broadcast = { workspace = true } async-trait = "0.1.85" discv5 = { workspace = true } ethereum_ssz = { workspace = true } +fork = { workspace = true } futures = { workspace = true } global_config = { workspace = true } gossipsub = { workspace = true } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 569c6fac2..c1f332722 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -276,15 +276,23 @@ impl Network { self.handle_gossipsub_message(propagation_source, message_id, message); } gossipsub::Event::Subscribed { peer_id, topic } => { - if let Some(subnet) = topic::parse_subnet_id(&topic) { - self.peer_manager() - .set_peer_subscription(peer_id, subnet, true); + if let Some(parsed) = topic::parse_topic(&topic) { + self.peer_manager().set_peer_subscription( + peer_id, + parsed.fork, + parsed.subnet_id, + true, + ); } } gossipsub::Event::Unsubscribed { peer_id, topic } => { - if let Some(subnet) = topic::parse_subnet_id(&topic) { - self.peer_manager() - .set_peer_subscription(peer_id, subnet, false); + if let Some(parsed) = topic::parse_topic(&topic) { + self.peer_manager().set_peer_subscription( + peer_id, + parsed.fork, + parsed.subnet_id, + false, + ); } } _ => { diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 55b6102fb..2b4a7ceb2 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -4,6 +4,7 @@ use std::{ }; use discv5::libp2p_identity::PeerId; +use fork::Fork; use libp2p::{ Multiaddr, connection_limits::{self, ConnectionLimits}, @@ -46,8 +47,11 @@ pub struct ConnectionManager { pub connected: HashSet, pub target_peers: usize, pub max_with_priority_peers: usize, - // Map of observed gossipsub subscriptions per peer. Prefer this over ENR claims. - observed_peer_subnets: HashMap>>, + // Per-fork observed gossipsub subscriptions per peer. Prefer this over ENR claims. + // Tracked per fork so that unsubscribing from one fork's topic doesn't clear the + // bit for the same subnet on another fork's topic. + // See: https://github.com/sigp/anchor/issues/818 + observed_peer_subnets: HashMap>>>, // Track inbound vs outbound connection counts inbound_count: usize, outbound_count: usize, @@ -116,20 +120,41 @@ impl ConnectionManager { self.connection_limits = Self::create_connection_limits(new_target); } - /// External update from gossipsub events about peer subscription state - pub fn set_peer_subscribed(&mut self, peer: PeerId, subnet: SubnetId, subscribed: bool) { - let entry = self.observed_peer_subnets.entry(peer).or_default(); - + /// External update from gossipsub events about peer subscription state. + /// + /// Subscriptions are tracked per fork so that unsubscribing from one fork's topic + /// (e.g., `ssv.v2.42`) does not clear the subnet bit if the peer is still + /// subscribed via another fork's topic (e.g., `/ssv/mainnet/boole/42`). + pub fn set_peer_subscribed( + &mut self, + peer: PeerId, + fork: Fork, + subnet: SubnetId, + subscribed: bool, + ) { let idx = *subnet.deref() as usize; - if idx < entry.len() { - // Safe to ignore the result of `set` because we have already checked that `idx < - // entry.len()` - let _ = entry.set(idx, subscribed); + let fork_map = self.observed_peer_subnets.entry(peer).or_default(); + let bitfield = fork_map.entry(fork).or_default(); + + if idx < bitfield.len() { + let _ = bitfield.set(idx, subscribed); + } else { + tracing::warn!( + %peer, + subnet = idx, + max = bitfield.len(), + "Subnet ID exceeds bitfield capacity" + ); } - // If peer is now unsubscribed from all observed subnets, drop the entry to keep map small - if !subscribed && !entry.iter().any(|b| b) { - self.observed_peer_subnets.remove(&peer); + // Clean up empty entries to keep maps small + if !subscribed { + if bitfield.is_zero() { + fork_map.remove(&fork); + } + if fork_map.is_empty() { + self.observed_peer_subnets.remove(&peer); + } } } @@ -217,12 +242,12 @@ impl ConnectionManager { return true; } - // Only use observed subscriptions, no ENR fallback - let Some(observed) = self.observed_peer_subnets.get(peer) else { + // Only use observed subscriptions (aggregated across forks), no ENR fallback + let Some(observed) = self.get_peer_subnets_observed_only(peer) else { return false; }; - self.bitfield_offers_any_subnet(observed, needed) + self.bitfield_offers_any_subnet(&observed, needed) } /// Check if a peer offers any needed subnets, using ENR as fallback. @@ -263,9 +288,20 @@ impl ConnectionManager { false } - /// Get subnets a peer claims to support from observed gossipsub only. + /// Get subnets a peer claims to support from observed gossipsub only, + /// aggregated across all forks (union of per-fork bitmaps). fn get_peer_subnets_observed_only(&self, peer: &PeerId) -> Option>> { - self.observed_peer_subnets.get(peer).cloned() + let fork_map = self.observed_peer_subnets.get(peer)?; + Some(Self::aggregate_fork_bitmaps(fork_map)) + } + + /// OR all per-fork bitmaps together into a single aggregate bitfield. + fn aggregate_fork_bitmaps( + fork_map: &HashMap>>, + ) -> Bitfield> { + fork_map + .values() + .fold(Bitfield::default(), |acc, bf| acc.union(bf)) } /// Get subnets a peer claims to support, with ENR fallback. @@ -286,8 +322,8 @@ impl ConnectionManager { /// Handle connection established event pub fn on_connection_established(&mut self, peer_id: PeerId, is_outbound: bool) -> bool { - // Initialize with empty bitfield to indicate we're now observing this peer - // If they never subscribe to anything, we'll know they offer no subnets + // Initialize with empty fork map to indicate we're now observing this peer. + // If they never subscribe to anything, we'll know they offer no subnets. self.observed_peer_subnets.entry(peer_id).or_default(); // Track connection direction counter @@ -483,3 +519,333 @@ impl ConnectionManager { self.connection_limits.on_swarm_event(event); } } + +#[cfg(test)] +mod tests { + use super::*; + + // ==================== Test constants ==================== + + const TARGET_PEERS: usize = 50; + + // Subnet IDs used across tests. Each has a distinct role to aid readability. + const SUBNET_A: u64 = 5; + const SUBNET_B: u64 = 42; + const SUBNET_C: u64 = 10; + const SUBNET_D: u64 = 20; + const SUBNET_E: u64 = 99; + const SUBNET_UNSUBSCRIBED: u64 = 30; + + fn subnet(id: u64) -> SubnetId { + SubnetId::new(id) + } + + // ==================== Helper functions ==================== + + /// Creates a `ConnectionManager` with default target peers for testing. + fn create_test_manager() -> ConnectionManager { + ConnectionManager::new(TARGET_PEERS) + } + + /// Connects a peer to the manager (adds to `connected` set and initializes + /// `observed_peer_subnets`), returning the generated `PeerId`. + fn connect_random_peer(mgr: &mut ConnectionManager) -> PeerId { + let peer = PeerId::random(); + mgr.on_connection_established(peer, /* is_outbound = */ true); + peer + } + + /// Returns the aggregated (union across forks) bitfield for a peer, or `None`. + fn aggregated_bitfield( + mgr: &ConnectionManager, + peer: &PeerId, + ) -> Option>> { + mgr.get_peer_subnets_observed_only(peer) + } + + /// Checks whether a specific subnet bit is set in the aggregated bitfield. + fn is_subnet_set(mgr: &ConnectionManager, peer: &PeerId, subnet: u64) -> bool { + aggregated_bitfield(mgr, peer) + .map(|bf| bf.get(subnet as usize).unwrap_or(false)) + .unwrap_or(false) + } + + // ==================== Single fork subscribe/unsubscribe ==================== + + #[test] + fn test_set_peer_subscribed_single_fork_subscribe() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + // Act + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + + // Assert + assert!( + is_subnet_set(&mgr, &peer, SUBNET_A), + "Subnet should be set after subscribing on Alan fork" + ); + } + + #[test] + fn test_set_peer_subscribed_single_fork_unsubscribe() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + + // Act + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + + // Assert + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_A), + "Subnet should be cleared after unsubscribing on Alan fork" + ); + } + + // ==================== Multi-fork bug scenario (issue #818) ==================== + + /// Regression test for https://github.com/sigp/anchor/issues/818 + /// + /// When a peer is subscribed to the same subnet on two different forks, + /// unsubscribing from one fork must NOT clear the subnet bit in the + /// aggregated view because the other fork still holds it. + #[test] + fn test_unsubscribe_one_fork_retains_subnet_from_other_fork() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Act: unsubscribe from Alan only + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), false); + + // Assert: subnet should still be set because Boole holds it + assert!( + is_subnet_set(&mgr, &peer, SUBNET_B), + "Subnet must remain set when Boole fork still subscribes to it" + ); + } + + #[test] + fn test_unsubscribe_both_forks_clears_subnet() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Act: unsubscribe from both forks + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), false); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), false); + + // Assert: subnet should now be cleared + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_B), + "Subnet must be cleared after unsubscribing from all forks" + ); + } + + // ==================== Aggregation across forks ==================== + + #[test] + fn test_aggregation_unions_subnets_across_forks() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + // Act: subscribe to different subnets on different forks + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_C), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_D), true); + + // Assert: aggregated view should contain both bits + let bf = aggregated_bitfield(&mgr, &peer).expect("peer should have an aggregated bitfield"); + assert!( + bf.get(SUBNET_C as usize).unwrap_or(false), + "Alan subnet must be present in aggregated bitfield" + ); + assert!( + bf.get(SUBNET_D as usize).unwrap_or(false), + "Boole subnet must be present in aggregated bitfield" + ); + assert!( + !bf.get(SUBNET_UNSUBSCRIBED as usize).unwrap_or(false), + "Unsubscribed subnet should NOT be present" + ); + } + + // ==================== Cleanup on full unsubscribe ==================== + + #[test] + fn test_full_unsubscribe_removes_peer_entry() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), true); + + // Act: unsubscribe from everything + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), false); + + // Assert: the peer should be completely removed from observed_peer_subnets + assert!( + !mgr.observed_peer_subnets.contains_key(&peer), + "Peer entry must be removed from observed_peer_subnets when all \ + per-fork bitmaps are empty" + ); + } + + #[test] + fn test_partial_unsubscribe_keeps_peer_entry() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), true); + + // Act: unsubscribe from Alan only + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + + // Assert: peer entry still present (Boole fork still has a subscription) + assert!( + mgr.observed_peer_subnets.contains_key(&peer), + "Peer entry must remain while at least one fork bitmap is non-empty" + ); + } + + // ==================== count_observed_peers_for_subnets with multi-fork ==================== + + #[test] + fn test_count_observed_peers_counts_across_forks() { + // Arrange + let mut mgr = create_test_manager(); + let peer_a = connect_random_peer(&mut mgr); + let peer_b = connect_random_peer(&mut mgr); + + // peer_a subscribes to SUBNET_C on Alan + mgr.set_peer_subscribed(peer_a, Fork::Alan, subnet(SUBNET_C), true); + // peer_b subscribes to SUBNET_C on Boole (different fork, same subnet) + mgr.set_peer_subscribed(peer_b, Fork::Boole, subnet(SUBNET_C), true); + // peer_a also subscribes to SUBNET_D on Boole + mgr.set_peer_subscribed(peer_a, Fork::Boole, subnet(SUBNET_D), true); + + // Act + let counts = mgr.count_observed_peers_for_subnets(&[ + subnet(SUBNET_C), + subnet(SUBNET_D), + subnet(SUBNET_UNSUBSCRIBED), + ]); + + // Assert + assert_eq!( + counts, + vec![2, 1, 0], + "SUBNET_C should have 2 peers, SUBNET_D should have 1, SUBNET_UNSUBSCRIBED should have 0" + ); + } + + #[test] + fn test_count_observed_peers_requires_connected() { + // Arrange: subscribe a peer but do NOT connect it via on_connection_established + let mut mgr = create_test_manager(); + let disconnected_peer = PeerId::random(); + mgr.set_peer_subscribed(disconnected_peer, Fork::Alan, subnet(SUBNET_A), true); + + // Also add a properly connected peer for the same subnet + let connected_peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(connected_peer, Fork::Alan, subnet(SUBNET_A), true); + + // Act + let counts = mgr.count_observed_peers_for_subnets(&[subnet(SUBNET_A)]); + + // Assert: only the connected peer should be counted + assert_eq!( + counts, + vec![1], + "Only connected peers should be counted; disconnected peer must be excluded" + ); + } + + // ==================== peer_offers_needed_subnets_observed_only with multi-fork + // ==================== + + #[test] + fn test_peer_offers_needed_subnets_across_forks() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_D), true); + + let needed = HashSet::from([subnet(SUBNET_D)]); + + // Act & Assert + assert!( + mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer subscribed on Boole should satisfy the needed set" + ); + } + + #[test] + fn test_peer_does_not_offer_unsubscribed_subnets() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_C), true); + + let needed = HashSet::from([subnet(SUBNET_E)]); + + // Act & Assert + assert!( + !mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer subscribed to a different subnet should not satisfy the needed set" + ); + } + + /// Verifies that after unsubscribing from one fork, the peer still offers + /// the subnet via the remaining fork. + #[test] + fn test_peer_offers_needed_subnets_after_partial_unsubscribe() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Act: unsubscribe from Alan + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), false); + + let needed = HashSet::from([subnet(SUBNET_B)]); + + // Assert: Boole still provides the subnet + assert!( + mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer should still offer subnet after unsubscribing only from Alan" + ); + } + + // ==================== Idempotency ==================== + + #[test] + fn test_duplicate_subscribe_then_single_unsubscribe_clears_bit() { + // Arrange + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + // Act: subscribe twice on the same fork+subnet, then unsubscribe once + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + + // Assert: bit should be cleared — bitfield tracks presence, not a count + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_A), + "A single unsubscribe should clear the bit regardless of duplicate subscribes" + ); + } +} diff --git a/anchor/network/src/peer_manager/mod.rs b/anchor/network/src/peer_manager/mod.rs index 560bed865..564a0c49c 100644 --- a/anchor/network/src/peer_manager/mod.rs +++ b/anchor/network/src/peer_manager/mod.rs @@ -205,10 +205,16 @@ impl PeerManager { &self.needed_subnets } - /// Update observed gossipsub subscription state for a peer - pub fn set_peer_subscription(&mut self, peer: PeerId, subnet: SubnetId, subscribed: bool) { + /// Update observed gossipsub subscription state for a peer on a specific fork. + pub fn set_peer_subscription( + &mut self, + peer: PeerId, + fork: fork::Fork, + subnet: SubnetId, + subscribed: bool, + ) { self.connection_manager - .set_peer_subscribed(peer, subnet, subscribed); + .set_peer_subscribed(peer, fork, subnet, subscribed); } /// Handle a completed handshake by updating peer client type and triggering metrics update.