diff --git a/Cargo.lock b/Cargo.lock index 8e9421f14..58ae869a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5293,6 +5293,7 @@ dependencies = [ "message_receiver", "metrics", "network_utils", + "parking_lot", "prometheus-client", "quick-protobuf", "rand 0.9.2", diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index b9d9998d6..4fb3ad524 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -31,6 +31,7 @@ libp2p = { workspace = true, default-features = false, features = [ message_receiver = { workspace = true } metrics = { workspace = true } network_utils = { workspace = true } +parking_lot = { workspace = true } peer-store = { package = "libp2p-peer-store", git = "https://github.com/libp2p/rust-libp2p.git", rev = "d63dab1" } prometheus-client = { workspace = true } quick-protobuf = "0.8.1" diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index cb35ccbb5..3c4deeb18 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -13,7 +13,7 @@ use types::{ChainSpec, EthSpec}; use version::version_with_platform; use crate::{ - Config, + Config, SharedDomainType, behaviour::BehaviourError::Gossipsub, discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}, handshake, @@ -92,6 +92,7 @@ impl AnchorBehaviour { network_config: &Config, metrics_registry: &mut Registry, spec: &ChainSpec, + domain_type: SharedDomainType, ) -> Result { let identify = { let local_public_key = local_keypair.public(); @@ -155,7 +156,8 @@ impl AnchorBehaviour { let discovery = { // Build and start the discovery sub-behaviour - let mut discovery = Discovery::new(local_keypair.clone(), network_config).await?; + let mut discovery = + Discovery::new(local_keypair.clone(), network_config, domain_type.clone()).await?; // start searching for peers discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); discovery @@ -169,17 +171,13 @@ impl AnchorBehaviour { }; let handshake = { - let domain_type: String = network_config.domain_type.into(); - let node_info = handshake::node_info::NodeInfo::new( - domain_type, - Some(handshake::node_info::NodeMetadata { - node_version: version_with_platform(), - execution_node: "geth/v1.10.8".to_string(), - consensus_node: "lighthouse/v1.5.0".to_string(), - subnets: "00000000000000000000000000000000".to_string(), - }), - ); - handshake::Behaviour::new(local_keypair, node_info) + let metadata = handshake::node_info::NodeMetadata { + node_version: version_with_platform(), + execution_node: "geth/v1.10.8".to_string(), + consensus_node: "lighthouse/v1.5.0".to_string(), + subnets: "00000000000000000000000000000000".to_string(), + }; + handshake::Behaviour::new(local_keypair, domain_type, metadata) }; let upnp = Toggle::from( diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 9e90ad5f2..bfbaee99f 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -35,7 +35,7 @@ use tracing::{debug, error, info, trace, warn}; use typenum::U128; use crate::{ - Config, + Config, SharedDomainType, discovery::DiscoveryError::{Discv5Init, Discv5Start, EnrKey}, }; @@ -152,7 +152,7 @@ pub struct Discovery { /// been started update_ports: UpdatePorts, - domain_type: DomainType, + domain_type: SharedDomainType, enr_file_path: PathBuf, } @@ -161,6 +161,7 @@ impl Discovery { pub async fn new( local_keypair: Keypair, network_config: &Config, + domain_type: SharedDomainType, ) -> Result { let protocol_identity = ProtocolIdentity { protocol_id: *b"ssvdv5", @@ -306,7 +307,7 @@ impl Discovery { discv5, event_stream, started: !network_config.disable_discovery, - domain_type: network_config.domain_type, + domain_type, update_ports, enr_file_path, }) @@ -412,24 +413,11 @@ impl Discovery { Ok(true) } - /// Update the domain type in both the local state and ENR. + /// Update the ENR domain type so other nodes can discover us with the new fork's domain. /// - /// Called when a fork activates to ensure this node can be discovered by - /// nodes using the new fork's domain type filter. - /// - /// # Arguments - /// - /// * `new_domain_type` - The domain type for the newly activated fork - /// - /// # Returns - /// - /// * `Ok(())` - Domain type was updated successfully - /// * `Err(String)` - Update failed with the given error message - pub fn update_domain_type(&mut self, new_domain_type: DomainType) -> Result<(), String> { - // Update local state used for query filtering - self.domain_type = new_domain_type; - - // Update ENR so other nodes can discover us + /// Called when a fork activates. The shared domain type is updated separately; + /// this method only handles the ENR update and disk persistence. + pub fn update_enr_domain_type(&mut self, new_domain_type: DomainType) -> Result<(), String> { self.discv5 .enr_insert("domaintype", &new_domain_type.0) .map_err(|e| format!("Failed to update ENR domain type: {e:?}"))?; @@ -459,12 +447,12 @@ impl Discovery { // predicate for finding nodes with a valid tcp port let tcp_predicate = move |enr: &Enr| enr.tcp4().is_some() || enr.tcp6().is_some(); - // Capture a copy of the domain type so the closure no longer references `self`. - let local_domain_type = self.domain_type; + // Clone the shared domain type so the closure can read the current value at query time. + let shared_domain_type = self.domain_type.clone(); let domain_type_predicate = move |enr: &Enr| { if let Some(Ok(domain_type)) = enr.get_decodable::<[u8; 4]>("domaintype") { - local_domain_type.0 == domain_type + shared_domain_type.get().0 == domain_type } else { trace!(?enr, "Rejecting ENR with missing domaintype"); false diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index c0a23db55..86592c16d 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -18,7 +18,10 @@ use libp2p::{ }; use tracing::{debug, trace}; -use crate::handshake::{codec::Codec, node_info::NodeInfo}; +use crate::{ + SharedDomainType, + handshake::{codec::Codec, node_info::NodeInfo}, +}; /// Event emitted on handshake completion or failure. #[derive(Debug)] @@ -37,7 +40,8 @@ pub enum Event { /// Automatically initiates handshakes on outbound connections. pub struct Behaviour { inner: RequestResponseBehaviour, - node_info: NodeInfo, + domain_type: SharedDomainType, + metadata: node_info::NodeMetadata, events: VecDeque, } @@ -56,7 +60,11 @@ pub enum Error { impl Behaviour { /// Create a new handshake Behaviour. /// The behaviour automatically initiates handshakes on outbound connections. - pub fn new(keypair: Keypair, node_info: NodeInfo) -> Self { + pub fn new( + keypair: Keypair, + domain_type: SharedDomainType, + metadata: node_info::NodeMetadata, + ) -> Self { let protocol = StreamProtocol::new("/ssv/info/0.0.1"); let inner = RequestResponseBehaviour::with_codec( Codec::new(keypair), @@ -65,34 +73,43 @@ impl Behaviour { ); Self { inner, - node_info, + domain_type, + metadata, events: VecDeque::new(), } } + /// Construct our [`NodeInfo`] from the current shared domain type and metadata. + fn our_node_info(&self) -> NodeInfo { + NodeInfo { + domain_type: self.domain_type.get().into(), + metadata: Some(self.metadata.clone()), + } + } + fn verify_and_emit_event(&mut self, peer_id: PeerId, their_info: NodeInfo) { - match verify_node_info(&self.node_info, &their_info) { + let our_info = self.our_node_info(); + match verify_node_info(&our_info, &their_info) { Ok(()) => { // Log handshake completion and record metrics if let Some(metadata) = &their_info.metadata { - if let Some(our_metadata) = self.node_metadata() { - let matching_count = - count_matching_subnets(&our_metadata.subnets, &metadata.subnets); - debug!( - %peer_id, - our_subnets = %our_metadata.subnets, - their_subnets = %metadata.subnets, - node_version = %metadata.node_version, - matching_subnets = matching_count, - "Handshake completed" - ); - - // Record subnet match count metric - if let Ok(gauge_vec) = crate::metrics::HANDSHAKE_SUBNET_MATCHES.as_ref() { - let label = &matching_count.to_string(); - if let Ok(gauge) = gauge_vec.get_metric_with_label_values(&[label]) { - gauge.inc(); - } + let our_metadata = self.node_metadata(); + let matching_count = + count_matching_subnets(&our_metadata.subnets, &metadata.subnets); + debug!( + %peer_id, + our_subnets = %our_metadata.subnets, + their_subnets = %metadata.subnets, + node_version = %metadata.node_version, + matching_subnets = matching_count, + "Handshake completed" + ); + + // Record subnet match count metric + if let Ok(gauge_vec) = crate::metrics::HANDSHAKE_SUBNET_MATCHES.as_ref() { + let label = &matching_count.to_string(); + if let Ok(gauge) = gauge_vec.get_metric_with_label_values(&[label]) { + gauge.inc(); } } } else { @@ -123,7 +140,7 @@ impl Behaviour { // Send our info back to the peer if self .inner - .send_response(channel, self.node_info.clone()) + .send_response(channel, self.our_node_info()) .is_err() { trace!( @@ -162,12 +179,12 @@ impl Behaviour { } } - pub fn node_metadata(&self) -> &Option { - &self.node_info.metadata + pub fn node_metadata(&self) -> &node_info::NodeMetadata { + &self.metadata } - pub fn node_metadata_mut(&mut self) -> &mut Option { - &mut self.node_info.metadata + pub fn node_metadata_mut(&mut self) -> &mut node_info::NodeMetadata { + &mut self.metadata } } @@ -192,10 +209,10 @@ fn count_matching_subnets(our_subnets: &str, their_subnets: &str) -> usize { } fn verify_node_info(ours: &NodeInfo, theirs: &NodeInfo) -> Result<(), Error> { - if ours.network_id != theirs.network_id { + if ours.domain_type != theirs.domain_type { return Err(Error::NetworkMismatch { - ours: ours.network_id.clone(), - theirs: theirs.network_id.clone(), + ours: ours.domain_type.clone(), + theirs: theirs.domain_type.clone(), }); } Ok(()) @@ -245,7 +262,7 @@ impl NetworkBehaviour for Behaviour { ?peer_id, "Auto-initiating handshake on first outbound connection" ); - self.inner.send_request(peer_id, self.node_info.clone()); + self.inner.send_request(peer_id, self.our_node_info()); } self.inner.on_swarm_event(event); } @@ -335,24 +352,31 @@ mod tests { use discv5::libp2p_identity::Keypair; use libp2p::swarm::Swarm; use libp2p_swarm_test::{SwarmExt, drive}; + use ssv_types::domain_type::DomainType; use super::*; - use crate::handshake::node_info::NodeMetadata; + use crate::{SharedDomainType, handshake::node_info::NodeMetadata}; - fn node_info(network: &str, version: &str) -> NodeInfo { - NodeInfo { - network_id: network.to_string(), - metadata: Some(NodeMetadata { - node_version: version.to_string(), - execution_node: "".to_string(), - consensus_node: "".to_string(), - subnets: "".to_string(), - }), + const DOMAIN_A: DomainType = DomainType([0, 0, 0, 1]); + const DOMAIN_B: DomainType = DomainType([0, 0, 0, 2]); + + fn test_metadata(version: &str) -> NodeMetadata { + NodeMetadata { + node_version: version.to_string(), + execution_node: "".to_string(), + consensus_node: "".to_string(), + subnets: "".to_string(), } } - fn create_test_swarm(keypair: Keypair, node_info: NodeInfo) -> Swarm { - Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, node_info)) + fn create_test_swarm( + keypair: Keypair, + domain_type: DomainType, + version: &str, + ) -> Swarm { + let shared = SharedDomainType::new(domain_type); + let metadata = test_metadata(version); + Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, shared, metadata)) } fn assert_completed(event: Event, expected_peer: PeerId, expected_version: &str) { @@ -393,10 +417,8 @@ mod tests { async fn handshake_success() { *DEBUG; - let mut local_swarm = - create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); - let mut remote_swarm = - create_test_swarm(Keypair::generate_ed25519(), node_info("test", "remote")); + let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), DOMAIN_A, "local"); + let mut remote_swarm = create_test_swarm(Keypair::generate_ed25519(), DOMAIN_A, "remote"); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; @@ -421,10 +443,8 @@ mod tests { async fn concurrent_dials_only_one_handshake() { *DEBUG; - let mut local_swarm = - create_test_swarm(Keypair::generate_ed25519(), node_info("test", "local")); - let mut remote_swarm = - create_test_swarm(Keypair::generate_ed25519(), node_info("test", "remote")); + let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), DOMAIN_A, "local"); + let mut remote_swarm = create_test_swarm(Keypair::generate_ed25519(), DOMAIN_A, "remote"); tokio::spawn(async move { local_swarm.listen().with_memory_addr_external().await; @@ -465,20 +485,184 @@ mod tests { async fn mismatched_networks_handshake_failed() { *DEBUG; + let mut local_swarm = create_test_swarm(Keypair::generate_ed25519(), DOMAIN_A, "local"); + let mut remote_swarm = create_test_swarm(Keypair::generate_ed25519(), DOMAIN_B, "remote"); + + let domain_a_hex: String = DOMAIN_A.into(); + let domain_b_hex: String = DOMAIN_B.into(); + + tokio::spawn(async move { + local_swarm.listen().with_memory_addr_external().await; + remote_swarm.connect(&mut local_swarm).await; + + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + + assert_network_mismatch( + local_event, + *remote_swarm.local_peer_id(), + &domain_a_hex, + &domain_b_hex, + ); + assert_network_mismatch( + remote_event, + *local_swarm.local_peer_id(), + &domain_b_hex, + &domain_a_hex, + ); + }) + .await + .expect("test completed"); + } + + fn create_test_swarm_with_shared_domain( + keypair: Keypair, + shared: SharedDomainType, + version: &str, + ) -> Swarm { + let metadata = test_metadata(version); + Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, shared, metadata)) + } + + /// Tests that updates to `SharedDomainType` propagate to subsequent handshakes. + /// + /// This validates the core fix from PR #814: when a fork activates and updates the + /// shared domain type, all future handshakes use the new value. The test runs through + /// three phases: + /// 1. Both peers on DOMAIN_A - handshake succeeds + /// 2. Only local updates to DOMAIN_B - handshake fails with NetworkMismatch + /// 3. Both peers update to DOMAIN_B - handshake succeeds again + #[tokio::test] + async fn shared_domain_type_updates_propagate_to_handshakes() { + use futures::future::Either; + use libp2p::swarm::SwarmEvent; + + *DEBUG; + + let domain_a_hex: String = DOMAIN_A.into(); + let domain_b_hex: String = DOMAIN_B.into(); + + // Arrange: Create SharedDomainType instances externally so we can update them + let local_shared = SharedDomainType::new(DOMAIN_A); + let remote_shared = SharedDomainType::new(DOMAIN_A); + + let local_keypair = Keypair::generate_ed25519(); + let remote_keypair = Keypair::generate_ed25519(); + let mut local_swarm = - create_test_swarm(Keypair::generate_ed25519(), node_info("test1", "local")); + create_test_swarm_with_shared_domain(local_keypair, local_shared.clone(), "local"); let mut remote_swarm = - create_test_swarm(Keypair::generate_ed25519(), node_info("test2", "remote")); + create_test_swarm_with_shared_domain(remote_keypair, remote_shared.clone(), "remote"); tokio::spawn(async move { + // ==================== Phase 1: Both on DOMAIN_A - handshake succeeds + // ==================== + local_swarm.listen().with_memory_addr_external().await; + remote_swarm.listen().with_memory_addr_external().await; + remote_swarm.connect(&mut local_swarm).await; + + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + + assert_completed(local_event, *remote_swarm.local_peer_id(), "remote"); + assert_completed(remote_event, *local_swarm.local_peer_id(), "local"); + + // ==================== Phase 2: Only local updates to DOMAIN_B - mismatch + // ==================== + + // Act: Update only the local shared domain type + local_shared.set(DOMAIN_B); + + // Disconnect both peers + let remote_peer = *remote_swarm.local_peer_id(); + let local_peer = *local_swarm.local_peer_id(); + local_swarm + .disconnect_peer_id(remote_peer) + .expect("disconnect remote from local"); + remote_swarm + .disconnect_peer_id(local_peer) + .expect("disconnect local from remote"); + + // Wait for ConnectionClosed on both sides + let mut local_closed = false; + let mut remote_closed = false; + loop { + match futures::future::select( + local_swarm.next_swarm_event(), + remote_swarm.next_swarm_event(), + ) + .await + { + Either::Left((SwarmEvent::ConnectionClosed { .. }, _)) => { + local_closed = true; + } + Either::Right((SwarmEvent::ConnectionClosed { .. }, _)) => { + remote_closed = true; + } + _ => {} // keep polling + } + if local_closed && remote_closed { + break; + } + } + + // Reconnect: remote connects to local to trigger outbound handshake + remote_swarm.connect(&mut local_swarm).await; + + let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = + drive(&mut local_swarm, &mut remote_swarm).await; + + // Assert: Both sides detect network mismatch + assert_network_mismatch(local_event, remote_peer, &domain_b_hex, &domain_a_hex); + assert_network_mismatch(remote_event, local_peer, &domain_a_hex, &domain_b_hex); + + // ==================== Phase 3: Both update to DOMAIN_B - handshake succeeds + // ==================== + + // Act: Update remote's shared domain type to match + remote_shared.set(DOMAIN_B); + + // Disconnect both peers again + local_swarm + .disconnect_peer_id(remote_peer) + .expect("disconnect remote from local"); + remote_swarm + .disconnect_peer_id(local_peer) + .expect("disconnect local from remote"); + + // Wait for ConnectionClosed on both sides + let mut local_closed = false; + let mut remote_closed = false; + loop { + match futures::future::select( + local_swarm.next_swarm_event(), + remote_swarm.next_swarm_event(), + ) + .await + { + Either::Left((SwarmEvent::ConnectionClosed { .. }, _)) => { + local_closed = true; + } + Either::Right((SwarmEvent::ConnectionClosed { .. }, _)) => { + remote_closed = true; + } + _ => {} // keep polling + } + if local_closed && remote_closed { + break; + } + } + + // Reconnect remote_swarm.connect(&mut local_swarm).await; let ([local_event], [remote_event]): ([Event; 1], [Event; 1]) = drive(&mut local_swarm, &mut remote_swarm).await; - assert_network_mismatch(local_event, *remote_swarm.local_peer_id(), "test1", "test2"); - assert_network_mismatch(remote_event, *local_swarm.local_peer_id(), "test2", "test1"); + // Assert: Both sides complete successfully on the new domain + assert_completed(local_event, remote_peer, "remote"); + assert_completed(remote_event, local_peer, "local"); }) .await .expect("test completed"); diff --git a/anchor/network/src/handshake/node_info.rs b/anchor/network/src/handshake/node_info.rs index 9f4ce7fe4..90c3a43c1 100644 --- a/anchor/network/src/handshake/node_info.rs +++ b/anchor/network/src/handshake/node_info.rs @@ -35,7 +35,7 @@ pub struct NodeMetadata { #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] pub struct NodeInfo { - pub network_id: String, + pub domain_type: String, pub metadata: Option, } @@ -47,9 +47,9 @@ struct Serializable { } impl NodeInfo { - pub fn new(network_id: String, metadata: Option) -> Self { + pub fn new(domain_type: String, metadata: Option) -> Self { NodeInfo { - network_id, + domain_type, metadata, } } @@ -61,8 +61,8 @@ impl NodeInfo { /// Serialize `NodeInfo` to JSON bytes. fn marshal(&self) -> Result, Error> { let mut entries = vec![ - "".to_string(), // formerly forkVersion, now deprecated - format!("0x{}", self.network_id.clone()), // network id + "".to_string(), // formerly forkVersion, now deprecated + format!("0x{}", self.domain_type.clone()), // domain type ]; if let Some(meta) = &self.metadata { @@ -82,10 +82,10 @@ impl NodeInfo { return Err(Validation("node info must have at least 2 entries".into())); } // skip ser.entries[0]: old forkVersion - let network_id = ser.entries[1] + let domain_type = ser.entries[1] .clone() .strip_prefix("0x") - .ok_or_else(|| Validation("network id must be prefixed with 0x".into()))? + .ok_or_else(|| Validation("domain type must be prefixed with 0x".into()))? .to_string(); let metadata = if ser.entries.len() >= 3 { @@ -94,7 +94,7 @@ impl NodeInfo { } else { None }; - Ok(NodeInfo::new(network_id, metadata)) + Ok(NodeInfo::new(domain_type, metadata)) } /// Seals a `Record` into an Envelope by: @@ -197,7 +197,7 @@ mod tests { // The "current" NodeInfo data let current_data = NodeInfo { - network_id: HOLESKY.to_string(), + domain_type: HOLESKY.to_string(), metadata: Some(NodeMetadata { node_version: "v0.1.12".into(), execution_node: "geth/x".into(), diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 507bf7f18..1893478dc 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -11,8 +11,47 @@ mod peer_manager; mod scoring; mod transport; +use std::sync::Arc; + pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}; pub use network::Network; pub use network_utils::listen_addr::{ListenAddr, ListenAddress}; +use parking_lot::RwLock; +use ssv_types::domain_type::DomainType; pub type Enr = discv5::enr::Enr; pub use peer_manager::types::{ClientType, PeerInfo}; + +/// A shared, thread-safe domain type that serves as a single source of truth. +/// +/// # Lifecycle +/// +/// Created by [`Network::try_new`] with the initial domain type from config. Clones are +/// passed to `Discovery` and `handshake::Behaviour` so all three components share the +/// same underlying value. +/// +/// # Updates +/// +/// When a fork activates, [`Network::on_fork_phase`] calls [`SharedDomainType::set`] once. +/// All components reading via [`SharedDomainType::get`] see the new domain type immediately, +/// eliminating the need for per-component update methods. +/// +/// # Concurrency +/// +/// Uses [`parking_lot::RwLock`] for interior mutability. Writes are brief (single `Copy` +/// assignment) and rare (only on fork activation), so contention is negligible. +#[derive(Clone, Debug)] +pub(crate) struct SharedDomainType(Arc>); + +impl SharedDomainType { + pub fn new(domain_type: DomainType) -> Self { + Self(Arc::new(RwLock::new(domain_type))) + } + + pub fn get(&self) -> DomainType { + *self.0.read() + } + + pub fn set(&self, domain_type: DomainType) { + *self.0.write() = domain_type; + } +} diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 638585f90..569c6fac2 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -24,7 +24,6 @@ use libp2p::{ use message_receiver::{MessageReceiver, Outcome, TopicContext}; use prometheus_client::registry::Registry; use ssv_network_config::ForkPhase; -use ssv_types::domain_type::DomainType; use subnet_service::{SUBNET_COUNT, SubnetId, TopicEvent, topic}; use task_executor::TaskExecutor; use thiserror::Error; @@ -33,7 +32,7 @@ use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec}; use crate::{ - Config, Enr, + Config, Enr, SharedDomainType, behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError}, discovery::{DiscoveredPeers, Discovery, DiscoveryError}, handshake, @@ -77,7 +76,7 @@ pub struct Network { peer_id: PeerId, message_receiver: Arc, outcome_rx: mpsc::Receiver, - domain_type: DomainType, + domain_type: SharedDomainType, metrics_registry: Option, spec: Arc, is_dynamic_target_peers: bool, @@ -111,10 +110,17 @@ impl Network { let mut metrics_registry = Registry::default(); - let behaviour = - AnchorBehaviour::new::(local_keypair.clone(), config, &mut metrics_registry, &spec) - .await - .map_err(|e| Box::new(NetworkError::Behaviour(e)))?; + let domain_type = SharedDomainType::new(config.domain_type); + + let behaviour = AnchorBehaviour::new::( + local_keypair.clone(), + config, + &mut metrics_registry, + &spec, + domain_type.clone(), + ) + .await + .map_err(|e| Box::new(NetworkError::Behaviour(e)))?; let peer_id = local_keypair.public().to_peer_id(); @@ -131,7 +137,7 @@ impl Network { peer_id, message_receiver, outcome_rx, - domain_type: config.domain_type, + domain_type, metrics_registry: Some(metrics_registry), spec, is_dynamic_target_peers, @@ -413,20 +419,21 @@ impl Network { /// Handle fork phase transition events. /// - /// - `Activated`: Update ENR domain type. + /// - `Activated`: Update shared domain type and ENR. fn on_fork_phase(&mut self, phase: ForkPhase) { if let ForkPhase::Activated { current, previous } = phase { info!( current_fork = %current.fork, previous_fork = %previous.fork, - "Fork activated, updating ENR domain type" + "Fork activated, updating domain type" ); - // Update local domain type for any future use - self.domain_type = current.domain_type; + // Update the shared domain type — all components (discovery, handshake) see the + // new value immediately. + self.domain_type.set(current.domain_type); - // Update ENR domain type so other nodes can discover us with the new fork's domain - if let Err(e) = self.discovery().update_domain_type(current.domain_type) { + // Update ENR so other nodes can discover us with the new fork's domain + if let Err(e) = self.discovery().update_enr_domain_type(current.domain_type) { error!(?e, "Failed to update ENR domain type after fork activation"); } } @@ -637,19 +644,18 @@ impl Network { fn update_subnet_membership(&mut self, subnet: SubnetId, subscribed: bool) { self.discovery().set_subscribed(subnet, subscribed); - if let Some(metadata) = self.handshake().node_metadata_mut() { - match metadata.set_subscribed(subnet, subscribed) { - Ok(()) => { - info!( - subnet = *subnet, - subscribed = subscribed, - subnets_bitfield = %metadata.subnets, - "Updated node_info metadata subnet bitfield" - ); - } - Err(err) => { - error!(?err, "unable to update node info"); - } + let metadata = self.handshake().node_metadata_mut(); + match metadata.set_subscribed(subnet, subscribed) { + Ok(()) => { + info!( + subnet = *subnet, + subscribed = subscribed, + subnets_bitfield = %metadata.subnets, + "Updated node_info metadata subnet bitfield" + ); + } + Err(err) => { + error!(?err, "unable to update node info"); } } }