From 97555c751e3bd32eaec7e5f43242f3bc99047c03 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Fri, 11 Jul 2025 17:24:41 +0300 Subject: [PATCH 01/11] collator-protocol: cleanup connecting to backing group Signed-off-by: Alexandru Gheorghe --- .../src/collator_side/mod.rs | 45 ++++++++++++++----- .../src/collator_side/validators_buffer.rs | 30 +------------ 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index a9d4116e132ac..29cb4023c5dc4 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -120,7 +120,7 @@ enum ShouldAdvertiseTo { /// Info about validators we are currently connected to. /// /// It keeps track to which validators we advertised our collation. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] struct ValidatorGroup { /// Validators discovery ids. Lazily initialized when first /// distributing a collation. @@ -386,7 +386,10 @@ async fn distribute_collation( ) -> Result<()> { let candidate_relay_parent = receipt.descriptor.relay_parent(); let candidate_hash = receipt.hash(); - let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent); + + // We should already be connected to the validators, but if we aren't, we will connect to them + // now. + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent).await; let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) { Some(per_relay_parent) => per_relay_parent, @@ -498,9 +501,6 @@ async fn distribute_collation( group }); - // Update a set of connected validators if necessary. - connect_to_validators(ctx, cores_assigned, &state.validator_groups_buf).await; - if let Some(result_sender) = result_sender { state.collation_result_senders.insert(candidate_hash, result_sender); } @@ -647,19 +647,41 @@ fn has_assigned_cores( false } +fn list_of_backing_validators_in_view( + implicit_view: &Option, + per_relay_parent: &HashMap, +) -> Vec { + let mut backing_validators = HashSet::new(); + let Some(implicit_view) = implicit_view else { return vec![] }; + + for leaf in implicit_view.leaves() { + if let Some(relay_parent) = per_relay_parent.get(leaf) { + for group in relay_parent.validator_group.values() { + backing_validators.extend(group.validators.iter().cloned()); + } + } + } + + backing_validators.into_iter().collect() +} + /// Updates a set of connected validators based on their advertisement-bits /// in a validators buffer. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn connect_to_validators( ctx: &mut Context, - cores_assigned: bool, - validator_groups_buf: &ValidatorGroupsBuffer, + implicit_view: &Option, + per_relay_parent: &HashMap, ) { + let cores_assigned = has_assigned_cores(implicit_view, per_relay_parent); // If no cores are assigned to the para, we still need to send a ConnectToValidators request to // the network bridge passing an empty list of validator ids. Otherwise, it will keep connecting // to the last requested validators until a new request is issued. - let validator_ids = - if cores_assigned { validator_groups_buf.validators_to_connect() } else { Vec::new() }; + let validator_ids = if cores_assigned { + list_of_backing_validators_in_view(implicit_view, per_relay_parent) + } else { + Vec::new() + }; gum::trace!( target: LOG_TARGET, @@ -1243,6 +1265,7 @@ async fn handle_network_msg( OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); handle_our_view_change(ctx, state, view).await?; + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent).await; }, PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; @@ -1668,8 +1691,8 @@ async fn run_inner( } } _ = reconnect_timeout => { - let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent); - connect_to_validators(&mut ctx, cores_assigned, &state.validator_groups_buf).await; + + connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent).await; gum::trace!( target: LOG_TARGET, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 35202fc96299b..8ed4a3c266c0f 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -63,7 +63,7 @@ pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = }; /// Unique identifier of a validators group. -#[derive(Debug)] +#[derive(Debug, Clone)] struct ValidatorsGroupInfo { /// Number of validators in the group. len: usize, @@ -74,7 +74,7 @@ struct ValidatorsGroupInfo { /// Ring buffer of validator groups. /// /// Tracks which peers we want to be connected to with respect to advertised collations. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ValidatorGroupsBuffer { /// Validator groups identifiers we **had** advertisements for. group_infos: VecDeque, @@ -98,32 +98,6 @@ impl ValidatorGroupsBuffer { } } - /// Returns discovery ids of validators we are assigned to in this backing group window. - pub fn validators_to_connect(&self) -> Vec { - let validators_num = self.validators.len(); - let bits = self - .should_be_connected - .values() - .fold(bitvec![0; validators_num], |acc, next| acc | next); - - let mut should_be_connected: Vec = self - .validators - .iter() - .enumerate() - .filter_map(|(idx, authority_id)| bits[idx].then(|| authority_id.clone())) - .collect(); - - if let Some(last_group) = self.group_infos.iter().last() { - for validator in self.validators.iter().rev().take(last_group.len) { - if !should_be_connected.contains(validator) { - should_be_connected.push(validator.clone()); - } - } - } - - should_be_connected - } - /// Note a new advertisement, marking that we want to be connected to validators /// from this group. /// From 2fc08e1aaffb492a2c5aaa7f25f305aa2fc898f0 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 15 Jul 2025 17:55:00 +0300 Subject: [PATCH 02/11] fixup connection to validators Signed-off-by: Alexandru Gheorghe --- .../src/collator_side/mod.rs | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 29cb4023c5dc4..30d7fb11d2a3e 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -244,11 +244,15 @@ struct PerRelayParent { } impl PerRelayParent { - fn new( + #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] + async fn new( + ctx: &mut Context, + runtime: &mut RuntimeInfo, para_id: ParaId, claim_queue: ClaimQueueSnapshot, block_number: Option, - ) -> Self { + block_hash: Hash, + ) -> Result { let assignments = claim_queue.iter_all_claims().fold(HashMap::new(), |mut acc, (core, claims)| { let n_claims = claims.iter().filter(|para| para == &¶_id).count(); @@ -258,12 +262,22 @@ impl PerRelayParent { acc }); - Self { - validator_group: HashMap::default(), + let mut validator_groups = HashMap::default(); + + for (core, _) in &assignments { + let GroupValidators { validators, session_index: _, group_index: _ } = + determine_our_validators(ctx, runtime, *core, block_hash).await?; + let mut group = ValidatorGroup::default(); + group.validators = validators; + validator_groups.insert(*core, group); + } + + Ok(Self { + validator_group: validator_groups, collations: HashMap::new(), assignments, block_number, - } + }) } } @@ -387,8 +401,8 @@ async fn distribute_collation( let candidate_relay_parent = receipt.descriptor.relay_parent(); let candidate_hash = receipt.hash(); - // We should already be connected to the validators, but if we aren't, we will connect to them - // now. + // We should already be connected to the validators, but if we aren't, we will try to connect to + // them now. connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent).await; let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) { @@ -1264,7 +1278,7 @@ async fn handle_network_msg( }, OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); - handle_our_view_change(ctx, state, view).await?; + handle_our_view_change(ctx, runtime, state, view).await?; connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent).await; }, PeerMessage(remote, msg) => { @@ -1344,6 +1358,7 @@ async fn process_block_events( #[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)] async fn handle_our_view_change( ctx: &mut Context, + runtime: &mut RuntimeInfo, state: &mut State, view: OurView, ) -> Result<()> { @@ -1363,9 +1378,10 @@ async fn handle_our_view_change( .map_err(Error::ImplicitViewFetchError)?; let block_number = implicit_view.block_number(leaf); - state - .per_relay_parent - .insert(*leaf, PerRelayParent::new(para_id, claim_queue, block_number)); + state.per_relay_parent.insert( + *leaf, + PerRelayParent::new(ctx, runtime, para_id, claim_queue, block_number, *leaf).await?, + ); process_block_events( ctx, @@ -1393,9 +1409,18 @@ async fn handle_our_view_change( if state.per_relay_parent.get(block_hash).is_none() { let claim_queue = fetch_claim_queue(ctx.sender(), *block_hash).await?; - state - .per_relay_parent - .insert(*block_hash, PerRelayParent::new(para_id, claim_queue, block_number)); + state.per_relay_parent.insert( + *block_hash, + PerRelayParent::new( + ctx, + runtime, + para_id, + claim_queue, + block_number, + *block_hash, + ) + .await?, + ); } let per_relay_parent = From 48ab6d0363cf5d2791a77cb560a097e1d97843ad Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Wed, 16 Jul 2025 14:50:40 +0300 Subject: [PATCH 03/11] fix tests and cleanup unneeded code Signed-off-by: Alexandru Gheorghe --- .../src/collator_side/mod.rs | 58 +-- .../src/collator_side/tests/mod.rs | 334 +++++++++++------- .../tests/prospective_parachains.rs | 159 +++++++-- .../src/collator_side/validators_buffer.rs | 283 +-------------- 4 files changed, 359 insertions(+), 475 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 30d7fb11d2a3e..a684cb5e29602 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -53,8 +53,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ vstaging::{CandidateEvent, CandidateReceiptV2 as CandidateReceipt}, - AuthorityDiscoveryId, BlockNumber, CandidateHash, CollatorPair, CoreIndex, GroupIndex, Hash, - HeadData, Id as ParaId, SessionIndex, + AuthorityDiscoveryId, BlockNumber, CandidateHash, CollatorPair, CoreIndex, Hash, HeadData, + Id as ParaId, }; use crate::{modify_reputation, LOG_TARGET, LOG_TARGET_STATS}; @@ -71,9 +71,7 @@ use collation::{ VersionedCollationRequest, WaitingCollationFetches, }; use error::{log_error, Error, FatalError, Result}; -use validators_buffer::{ - ResetInterestTimeout, ValidatorGroupsBuffer, RESET_INTEREST_TIMEOUT, VALIDATORS_BUFFER_CAPACITY, -}; +use validators_buffer::{ResetInterestTimeout, RESET_INTEREST_TIMEOUT}; pub use metrics::Metrics; @@ -265,7 +263,7 @@ impl PerRelayParent { let mut validator_groups = HashMap::default(); for (core, _) in &assignments { - let GroupValidators { validators, session_index: _, group_index: _ } = + let GroupValidators { validators } = determine_our_validators(ctx, runtime, *core, block_hash).await?; let mut group = ValidatorGroup::default(); group.validators = validators; @@ -312,9 +310,6 @@ struct State { /// as we learn the [`PeerId`]'s by `PeerConnected` events. peer_ids: HashMap>, - /// Tracks which validators we want to stay connected to. - validator_groups_buf: ValidatorGroupsBuffer, - /// Timeout-future which is reset after every leaf to [`RECONNECT_AFTER_LEAF_TIMEOUT`] seconds. /// When it fires, we update our reserved peers. reconnect_timeout: ReconnectTimeout, @@ -367,7 +362,6 @@ impl State { per_relay_parent: Default::default(), collation_result_senders: Default::default(), peer_ids: Default::default(), - validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY), reconnect_timeout: Fuse::terminated(), waiting_collation_fetches: Default::default(), active_collation_fetches: Default::default(), @@ -472,7 +466,7 @@ async fn distribute_collation( let our_core = core_index; // Determine the group on that core. - let GroupValidators { validators, session_index, group_index } = + let GroupValidators { validators } = determine_our_validators(ctx, runtime, our_core, candidate_relay_parent).await?; if validators.is_empty() { @@ -485,18 +479,6 @@ async fn distribute_collation( return Ok(()) } - // It's important to insert new collation interests **before** - // issuing a connection request. - // - // If a validator managed to fetch all the relevant collations - // but still assigned to our core, we keep the connection alive. - state.validator_groups_buf.note_collation_advertised( - candidate_hash, - session_index, - group_index, - &validators, - ); - gum::debug!( target: LOG_TARGET, para_id = %id, @@ -577,9 +559,6 @@ async fn distribute_collation( struct GroupValidators { /// The validators of above group (their discovery keys). validators: Vec, - - session_index: SessionIndex, - group_index: GroupIndex, } /// Figure out current group of validators assigned to the para being collated on. @@ -611,11 +590,7 @@ async fn determine_our_validators( let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect(); - let current_validators = GroupValidators { - validators: current_validators, - session_index, - group_index: current_group_index, - }; + let current_validators = GroupValidators { validators: current_validators }; Ok(current_validators) } @@ -1409,6 +1384,7 @@ async fn handle_our_view_change( if state.per_relay_parent.get(block_hash).is_none() { let claim_queue = fetch_claim_queue(ctx.sender(), *block_hash).await?; + state.per_relay_parent.insert( *block_hash, PerRelayParent::new( @@ -1469,7 +1445,6 @@ async fn handle_our_view_change( let candidate_hash: CandidateHash = collation.receipt.hash(); state.collation_result_senders.remove(&candidate_hash); - state.validator_groups_buf.remove_candidate(&candidate_hash); process_out_of_view_collation(&mut state.collation_tracker, collation_with_core); } @@ -1637,10 +1612,6 @@ async fn run_inner( // timeout, we simply start processing next request. // The request it still alive, it should be kept in a waiting queue. } else { - for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { - // This peer has received the candidate. Not interested anymore. - state.validator_groups_buf.reset_validator_interest(candidate_hash, authority_id); - } waiting.waiting_peers.remove(&(peer_id, candidate_hash)); // Update collation status to fetched. @@ -1705,15 +1676,12 @@ async fn run_inner( } }, (candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => { - // NOTE: it doesn't necessarily mean that a validator gets disconnected, - // it only will if there're no other advertisements we want to send. - // - // No-op if the collation was already fetched or went out of view. - for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { - state - .validator_groups_buf - .reset_validator_interest(candidate_hash, &authority_id); - } + gum::debug!( + target: LOG_TARGET, + ?candidate_hash, + ?peer_id, + "Advertising to peer timed out" + ); } _ = reconnect_timeout => { diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 0123ccb39a195..1627e93c75dec 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -99,7 +99,7 @@ impl Default for TestState { let validator_peer_id = std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect(); - let validator_groups = vec![vec![2, 0, 4], vec![1, 3]] + let validator_groups = vec![vec![2, 0, 4], vec![1, 3], vec![], vec![]] .into_iter() .map(|g| g.into_iter().map(ValidatorIndex).collect()) .collect(); @@ -162,6 +162,12 @@ impl TestState { &self.session_info.validator_groups.get(GroupIndex::from(group_idx)).unwrap() } + fn validator_indices_for_core(&self, core: CoreIndex) -> &[ValidatorIndex] { + let core_num = self.claim_queue.len(); + let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(core, core_num); + &self.session_info.validator_groups.get(GroupIndex::from(group_idx)).unwrap() + } + fn current_session_index(&self) -> SessionIndex { self.session_index } @@ -179,6 +185,13 @@ impl TestState { .map(|i| self.session_info.discovery_keys[i.0 as usize].clone()) .collect() } + + fn validator_authority_ids_for_core(&self, core: CoreIndex) -> Vec { + self.validator_indices_for_core(core) + .iter() + .map(|i| self.session_info.discovery_keys[i.0 as usize].clone()) + .collect() + } } type VirtualOverseer = @@ -293,28 +306,31 @@ struct DistributeCollation { pov_block: PoV, } -async fn distribute_collation_with_receipt( +// Check that the next received message is a connection request to validators. +async fn check_connected_to_validators( virtual_overseer: &mut VirtualOverseer, - test_state: &TestState, - relay_parent: Hash, - should_connect: bool, - candidate: CandidateReceipt, - pov: PoV, - parent_head_data_hash: Hash, -) -> DistributeCollation { - overseer_send( - virtual_overseer, - CollatorProtocolMessage::DistributeCollation { - candidate_receipt: candidate.clone(), - parent_head_data_hash, - pov: pov.clone(), - parent_head_data: HeadData(vec![1, 2, 3]), - result_sender: None, - core_index: candidate.descriptor.core_index().unwrap(), - }, - ) - .await; + expected_connected: Vec, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ConnectToValidators { + validator_ids, peer_set: _, failed: _, + } + ) => { + assert_eq!(validator_ids.len(), expected_connected.len()); + for validator in expected_connected.iter() { + assert!(validator_ids.contains(validator)); + } + } + ); +} +// Expect that the next received messages are the ones necessary to determine the validator group. +async fn expect_determine_validator_group( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, +) { // We don't know precisely what is going to come as session info might be cached: loop { match overseer_recv(virtual_overseer).await { @@ -355,7 +371,6 @@ async fn distribute_collation_with_receipt( _relay_parent, RuntimeApiRequest::ValidatorGroups(tx), )) => { - assert_eq!(_relay_parent, relay_parent); tx.send(Ok(( test_state.session_info.validator_groups.to_vec(), test_state.group_rotation_info.clone(), @@ -367,28 +382,40 @@ async fn distribute_collation_with_receipt( other => panic!("Unexpected message received: {:?}", other), } } +} - if should_connect { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { - .. - } - ) => {} - ); - } +async fn distribute_collation_with_receipt( + virtual_overseer: &mut VirtualOverseer, + expected_connected: Vec, + test_state: &TestState, + candidate: CandidateReceipt, + pov: PoV, + parent_head_data_hash: Hash, +) -> DistributeCollation { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::DistributeCollation { + candidate_receipt: candidate.clone(), + parent_head_data_hash, + pov: pov.clone(), + parent_head_data: HeadData(vec![1, 2, 3]), + result_sender: None, + core_index: candidate.descriptor.core_index().unwrap(), + }, + ) + .await; + check_connected_to_validators(virtual_overseer, expected_connected).await; + expect_determine_validator_group(virtual_overseer, test_state).await; DistributeCollation { candidate, pov_block: pov } } /// Create some PoV and distribute it. async fn distribute_collation( virtual_overseer: &mut VirtualOverseer, + expected_connected: Vec, test_state: &TestState, relay_parent: Hash, - // whether or not we expect a connection request or not. - should_connect: bool, ) -> DistributeCollation { // Now we want to distribute a `PoVBlock` let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) }; @@ -406,9 +433,8 @@ async fn distribute_collation( distribute_collation_with_receipt( virtual_overseer, + expected_connected, test_state, - relay_parent, - should_connect, candidate, pov_block, parent_head_data_hash, @@ -545,11 +571,22 @@ fn v1_protocol_rejected() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; for (val, peer) in test_state .current_group_validator_authority_ids() @@ -591,15 +628,20 @@ fn advertise_and_send_collation() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let DistributeCollation { candidate, pov_block } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - true, ) .await; @@ -702,8 +744,14 @@ fn advertise_and_send_collation() { test_state.relay_parent.randomize(); // Update our view, making the old relay parent go out of the implicit view. - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 20)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 20)], + 1, + ) + .await; let peer = test_state.validator_peer_id[2]; @@ -733,9 +781,9 @@ fn advertise_and_send_collation() { let DistributeCollation { candidate, .. } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - true, ) .await; @@ -781,14 +829,20 @@ fn delay_reputation_change() { ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let DistributeCollation { candidate, .. } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - true, ) .await; @@ -929,6 +983,7 @@ fn collators_declare_to_connected_peers() { .await; update_view( + Some(test_state.current_group_validator_authority_ids()), &test_state, &mut test_harness.virtual_overseer, vec![(test_state.relay_parent, 10)], @@ -972,8 +1027,14 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id)).await; @@ -987,9 +1048,13 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { // And let it tell us that it is has the same view. send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - let DistributeCollation { candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; expect_advertise_collation_msg( virtual_overseer, @@ -1037,8 +1102,14 @@ fn collate_on_two_different_relay_chain_blocks() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id)).await; @@ -1049,9 +1120,13 @@ fn collate_on_two_different_relay_chain_blocks() { expect_declare_msg(virtual_overseer, &test_state, &peer).await; expect_declare_msg(virtual_overseer, &test_state, &peer2).await; - let DistributeCollation { candidate: old_candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate: old_candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; let old_relay_parent = test_state.relay_parent; @@ -1059,6 +1134,7 @@ fn collate_on_two_different_relay_chain_blocks() { // parent are active. test_state.relay_parent.randomize(); update_view( + Some(test_state.current_group_validator_authority_ids()), &test_state, virtual_overseer, vec![(old_relay_parent, 10), (test_state.relay_parent, 10)], @@ -1066,9 +1142,13 @@ fn collate_on_two_different_relay_chain_blocks() { ) .await; - let DistributeCollation { candidate: new_candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate: new_candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await; expect_advertise_collation_msg( @@ -1112,17 +1192,27 @@ fn validator_reconnect_does_not_advertise_a_second_time() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id.clone())) .await; expect_declare_msg(virtual_overseer, &test_state, &peer).await; - let DistributeCollation { candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await; expect_advertise_collation_msg( @@ -1166,8 +1256,14 @@ fn collators_reject_declare_messages() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id)).await; @@ -1227,12 +1323,22 @@ where overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; - let DistributeCollation { candidate, pov_block } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate, pov_block } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; for (val, peer) in test_state .current_group_validator_authority_ids() @@ -1352,7 +1458,7 @@ where } #[test] -fn connect_to_buffered_groups() { +fn connect_to_group_in_view() { let mut test_state = TestState::default(); let local_peer_id = test_state.local_peer_id; let collator_pair = test_state.collator_pair.clone(); @@ -1371,8 +1477,14 @@ fn connect_to_buffered_groups() { ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let group_a = test_state.current_group_validator_authority_ids(); let peers_a = test_state.current_group_validator_peer_ids(); @@ -1380,21 +1492,12 @@ fn connect_to_buffered_groups() { let DistributeCollation { candidate, .. } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - false, ) .await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - assert_eq!(group_a, validator_ids); - } - ); - let head_a = test_state.relay_parent; for (val, peer) in group_a.iter().zip(&peers_a) { @@ -1453,8 +1556,13 @@ fn connect_to_buffered_groups() { let old_relay_parent = test_state.relay_parent; test_state.relay_parent.randomize(); + test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); + // Update our view. + let mut expected_group = group_a.clone(); + expected_group.extend(test_state.current_group_validator_authority_ids()); update_view( + Some(expected_group.clone()), &test_state, &mut virtual_overseer, vec![(old_relay_parent, 10), (test_state.relay_parent, 20)], @@ -1462,8 +1570,6 @@ fn connect_to_buffered_groups() { ) .await; - test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); - let head_b = test_state.relay_parent; let group_b = test_state.current_group_validator_authority_ids(); assert_ne!(head_a, head_b); @@ -1471,27 +1577,12 @@ fn connect_to_buffered_groups() { distribute_collation( &mut virtual_overseer, + expected_group, &test_state, test_state.relay_parent, - false, ) .await; - // Should be connected to both groups except for the validator that fetched advertised - // collation. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - assert!(!validator_ids.contains(&group_a[0])); - - for validator in group_a[1..].iter().chain(&group_b) { - assert!(validator_ids.contains(validator)); - } - } - ); - TestHarness { virtual_overseer, req_v2_cfg: req_cfg } }, ); @@ -1517,35 +1608,38 @@ fn connect_with_no_cores_assigned() { ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let group_a = test_state.current_group_validator_authority_ids(); assert!(group_a.len() > 1); distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - false, ) .await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - assert_eq!(group_a, validator_ids); - } - ); - // Create a new relay parent and remove the core assignments. test_state.relay_parent.randomize(); test_state.claim_queue.clear(); - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 20)], 1) - .await; + update_view( + Some(vec![]), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 20)], + 1, + ) + .await; // Send the ActiveLeaves signal to trigger the reconnect timeout. overseer_signal( diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index 8be5c4e876b5b..fbf514f39105e 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -28,6 +28,7 @@ fn get_parent_hash(hash: Hash) -> Hash { /// Handle a view update. pub(super) async fn update_view( + expected_connected: Option>, test_state: &TestState, virtual_overseer: &mut VirtualOverseer, new_view: Vec<(Hash, u32)>, // Hash and block number. @@ -63,7 +64,6 @@ pub(super) async fn update_view( .take(ancestry_len as usize); let ancestry_numbers = (min_number..=leaf_number).rev(); let mut ancestry_iter = ancestry_hashes.clone().zip(ancestry_numbers).peekable(); - if let Some((hash, number)) = ancestry_iter.next() { assert_matches!( overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), @@ -180,6 +180,14 @@ pub(super) async fn update_view( ); } + for (_core, _paras) in test_state + .claim_queue + .iter() + .filter(|(_, paras)| paras.contains(&test_state.para_id)) + { + expect_determine_validator_group(virtual_overseer, &test_state).await; + } + for _ in ancestry_iter { while let Some(msg) = overseer_peek_with_timeout(virtual_overseer, Duration::from_millis(50)).await @@ -196,10 +204,33 @@ pub(super) async fn update_view( _, RuntimeApiRequest::CandidateEvents(_), )) + ) && !matches!( + &msg, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(_), + )) ) { break } + if matches!( + &msg, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(_), + )) + ) { + for (_core, _paras) in test_state + .claim_queue + .iter() + .filter(|(_, paras)| paras.contains(&test_state.para_id)) + { + expect_determine_validator_group(virtual_overseer, &test_state).await; + } + break; + } + match overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)) .await .unwrap() @@ -223,6 +254,9 @@ pub(super) async fn update_view( } } } + if let Some(expected_connected) = expected_connected { + check_connected_to_validators(virtual_overseer, expected_connected).await; + } } /// Check that the next received message is a `Declare` message. @@ -303,12 +337,25 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b if validator_sends_view_first { // Activate leaf `c` to accept at least the collation. - update_view(&test_state, virtual_overseer, vec![(head_c, head_c_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_c, head_c_num)], + 1, + ) + .await; } else { // Activated leaf is `b`, but the collation will be based on `c`. - update_view(&test_state, virtual_overseer, vec![(head_b, head_b_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_b, head_b_num)], + 1, + ) + .await; } - let validator_peer_ids = test_state.current_group_validator_peer_ids(); for (val, peer) in test_state .current_group_validator_authority_ids() @@ -332,27 +379,17 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b ..Default::default() } .build(); + let DistributeCollation { candidate, pov_block: _ } = distribute_collation_with_receipt( virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, - head_c, - false, // Check the group manually. candidate, pov, parent_head_data_hash, ) .await; - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - let expected_validators = test_state.current_group_validator_authority_ids(); - - assert_eq!(expected_validators, validator_ids); - } - ); let candidate_hash = candidate.hash(); @@ -373,7 +410,8 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b if validator_sends_view_first { // Activated leaf is `b`, but the collation will be based on `c`. - update_view(&test_state, virtual_overseer, vec![(head_b, head_b_num)], 1).await; + update_view(None, &test_state, virtual_overseer, vec![(head_b, head_b_num)], 1) + .await; for _ in &validator_peer_ids { expect_advertise_collation_msg( @@ -384,11 +422,24 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b ) .await; } + + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; } // Head `c` goes out of view. // Build a different candidate for this relay parent and attempt to distribute it. - update_view(&test_state, virtual_overseer, vec![(head_a, head_a_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_a, head_a_num)], + 1, + ) + .await; let pov = PoV { block_data: BlockData(vec![4, 5, 6]) }; let parent_head_data_hash = Hash::repeat_byte(0xBB); @@ -412,6 +463,12 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b ) .await; + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; + // Parent out of view, nothing happens. assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) .await @@ -453,7 +510,14 @@ fn distribute_collation_up_to_limit() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; // Activated leaf is `a`, but the collation will be based on `b`. - update_view(&test_state, virtual_overseer, vec![(head_a, head_a_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_a, head_a_num)], + 1, + ) + .await; for i in 0..expected_assignments { let pov = PoV { block_data: BlockData(vec![i as u8]) }; @@ -468,9 +532,8 @@ fn distribute_collation_up_to_limit() { .build(); distribute_collation_with_receipt( virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, - head_b, - true, candidate, pov, parent_head_data_hash, @@ -501,6 +564,11 @@ fn distribute_collation_up_to_limit() { ) .await; + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; // Limit has been reached. assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) .await @@ -531,6 +599,12 @@ fn distribute_collation_up_to_limit() { ) .await; + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; + assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) .await .is_none()); @@ -566,7 +640,24 @@ fn send_parent_head_data_for_elastic_scaling() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(head_b, head_b_num)], 1).await; + let expected_connected = [CoreIndex(0), CoreIndex(2), CoreIndex(3)] + .into_iter() + .map(|core| test_state.validator_authority_ids_for_core(core)) + .fold(HashSet::new(), |mut acc, res| { + acc.extend(res.into_iter()); + acc + }) + .into_iter() + .collect::>(); + + update_view( + Some(expected_connected.clone()), + &test_state, + &mut virtual_overseer, + vec![(head_b, head_b_num)], + 1, + ) + .await; let pov_data = PoV { block_data: BlockData(vec![1 as u8]) }; let candidate = TestCandidateBuilder { @@ -582,9 +673,8 @@ fn send_parent_head_data_for_elastic_scaling() { distribute_collation_with_receipt( &mut virtual_overseer, + expected_connected, &test_state, - head_b, - true, candidate.clone(), pov_data.clone(), phdh, @@ -679,8 +769,22 @@ fn advertise_and_send_collation_by_hash() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(head_b, head_b_num)], 1).await; - update_view(&test_state, &mut virtual_overseer, vec![(head_a, head_a_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(head_b, head_b_num)], + 1, + ) + .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(head_a, head_a_num)], + 1, + ) + .await; let candidates: Vec<_> = (0..2) .map(|i| { @@ -699,9 +803,8 @@ fn advertise_and_send_collation_by_hash() { for (candidate, pov) in &candidates { distribute_collation_with_receipt( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, - head_b, - true, candidate.clone(), pov.clone(), Hash::zero(), diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 8ed4a3c266c0f..ec467c0ab843e 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -30,185 +30,16 @@ //! The bitwise OR over known advertisements gives us validators indices for connection request. use std::{ - collections::{HashMap, VecDeque}, future::Future, - num::NonZeroUsize, - ops::Range, pin::Pin, task::{Context, Poll}, time::Duration, }; -use bitvec::{bitvec, vec::BitVec}; use futures::FutureExt; use polkadot_node_network_protocol::PeerId; -use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex}; - -/// Elastic scaling: how many candidates per relay chain block the collator supports building. -pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) { - Some(cap) => cap, - None => panic!("max candidates per rcb cannot be zero"), -}; - -/// The ring buffer stores at most this many unique validator groups. -/// -/// This value should be chosen in way that all groups assigned to our para -/// in the view can fit into the buffer multiplied by amount of candidates we support per relay -/// chain block in the case of elastic scaling. -pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = - match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) { - Some(cap) => cap, - None => panic!("buffer capacity must be non-zero"), - }; - -/// Unique identifier of a validators group. -#[derive(Debug, Clone)] -struct ValidatorsGroupInfo { - /// Number of validators in the group. - len: usize, - session_index: SessionIndex, - group_index: GroupIndex, -} - -/// Ring buffer of validator groups. -/// -/// Tracks which peers we want to be connected to with respect to advertised collations. -#[derive(Debug, Clone)] -pub struct ValidatorGroupsBuffer { - /// Validator groups identifiers we **had** advertisements for. - group_infos: VecDeque, - /// Continuous buffer of validators discovery keys. - validators: VecDeque, - /// Mapping from candidate hashes to bit-vectors with bits for all `validators`. - /// Invariants kept: All bit-vectors are guaranteed to have the same size. - should_be_connected: HashMap, - /// Buffer capacity, limits the number of **groups** tracked. - cap: NonZeroUsize, -} - -impl ValidatorGroupsBuffer { - /// Creates a new buffer with a non-zero capacity. - pub fn with_capacity(cap: NonZeroUsize) -> Self { - Self { - group_infos: VecDeque::new(), - validators: VecDeque::new(), - should_be_connected: HashMap::new(), - cap, - } - } - - /// Note a new advertisement, marking that we want to be connected to validators - /// from this group. - /// - /// If max capacity is reached and the group is new, drops validators from the back - /// of the buffer. - pub fn note_collation_advertised( - &mut self, - candidate_hash: CandidateHash, - session_index: SessionIndex, - group_index: GroupIndex, - validators: &[AuthorityDiscoveryId], - ) { - if validators.is_empty() { - return - } - - match self.group_infos.iter().enumerate().find(|(_, group)| { - group.session_index == session_index && group.group_index == group_index - }) { - Some((idx, group)) => { - let group_start_idx = self.group_lengths_iter().take(idx).sum(); - self.set_bits(candidate_hash, group_start_idx..(group_start_idx + group.len)); - }, - None => self.push(candidate_hash, session_index, group_index, validators), - } - } - - /// Note that a validator is no longer interested in a given candidate. - pub fn reset_validator_interest( - &mut self, - candidate_hash: CandidateHash, - authority_id: &AuthorityDiscoveryId, - ) { - let bits = match self.should_be_connected.get_mut(&candidate_hash) { - Some(bits) => bits, - None => return, - }; - - for (idx, auth_id) in self.validators.iter().enumerate() { - if auth_id == authority_id { - bits.set(idx, false); - } - } - } - - /// Remove advertised candidate from the buffer. - /// - /// The buffer will no longer track which validators are interested in a corresponding - /// advertisement. - pub fn remove_candidate(&mut self, candidate_hash: &CandidateHash) { - self.should_be_connected.remove(candidate_hash); - } - - /// Pushes a new group to the buffer along with advertisement, setting all validators - /// bits to 1. - /// - /// If the buffer is full, drops group from the tail. - fn push( - &mut self, - candidate_hash: CandidateHash, - session_index: SessionIndex, - group_index: GroupIndex, - validators: &[AuthorityDiscoveryId], - ) { - let new_group_info = - ValidatorsGroupInfo { len: validators.len(), session_index, group_index }; - - let buf = &mut self.group_infos; - let cap = self.cap.get(); - - if buf.len() >= cap { - let pruned_group = buf.pop_front().expect("buf is not empty; qed"); - self.validators.drain(..pruned_group.len); - - self.should_be_connected.values_mut().for_each(|bits| { - bits.as_mut_bitslice().shift_left(pruned_group.len); - }); - } - - self.validators.extend(validators.iter().cloned()); - buf.push_back(new_group_info); - let buf_len = buf.len(); - let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum(); - - let new_len = self.validators.len(); - self.should_be_connected - .values_mut() - .for_each(|bits| bits.resize(new_len, false)); - self.set_bits(candidate_hash, group_start_idx..(group_start_idx + validators.len())); - } - - /// Sets advertisement bits to 1 in a given range (usually corresponding to some group). - /// If the relay parent is unknown, inserts 0-initialized bitvec first. - /// - /// The range must be ensured to be within bounds. - fn set_bits(&mut self, candidate_hash: CandidateHash, range: Range) { - let bits = self - .should_be_connected - .entry(candidate_hash) - .or_insert_with(|| bitvec![0; self.validators.len()]); - - bits[range].fill(true); - } - - /// Returns iterator over numbers of validators in groups. - /// - /// Useful for getting an index of the first validator in i-th group. - fn group_lengths_iter(&self) -> impl Iterator + '_ { - self.group_infos.iter().map(|group| group.len) - } -} +use polkadot_primitives::CandidateHash; /// A timeout for resetting validators' interests in collations. pub const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6); @@ -239,115 +70,3 @@ impl Future for ResetInterestTimeout { self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id)) } } - -#[cfg(test)] -mod tests { - use super::*; - use polkadot_primitives::Hash; - use sp_keyring::Sr25519Keyring; - - #[test] - fn one_capacity_buffer() { - let cap = NonZeroUsize::new(1).unwrap(); - let mut buf = ValidatorGroupsBuffer::with_capacity(cap); - - let hash_a = CandidateHash(Hash::repeat_byte(0x1)); - let hash_b = CandidateHash(Hash::repeat_byte(0x2)); - - let validators: Vec<_> = [ - Sr25519Keyring::Alice, - Sr25519Keyring::Bob, - Sr25519Keyring::Charlie, - Sr25519Keyring::Dave, - Sr25519Keyring::Ferdie, - ] - .into_iter() - .map(|key| AuthorityDiscoveryId::from(key.public())) - .collect(); - - assert!(buf.validators_to_connect().is_empty()); - - buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]); - assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); - - buf.reset_validator_interest(hash_a, &validators[1]); - assert_eq!(buf.validators_to_connect(), validators[0..2].to_vec()); - - buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]); - assert_eq!(buf.validators_to_connect(), validators[2..].to_vec()); - - for validator in &validators[2..] { - buf.reset_validator_interest(hash_b, validator); - } - let mut expected = validators[2..].to_vec(); - expected.sort(); - let mut result = buf.validators_to_connect(); - result.sort(); - assert_eq!(result, expected); - } - - #[test] - fn buffer_works() { - let cap = NonZeroUsize::new(3).unwrap(); - let mut buf = ValidatorGroupsBuffer::with_capacity(cap); - - let hashes: Vec<_> = (0..5).map(|i| CandidateHash(Hash::repeat_byte(i))).collect(); - - let validators: Vec<_> = [ - Sr25519Keyring::Alice, - Sr25519Keyring::Bob, - Sr25519Keyring::Charlie, - Sr25519Keyring::Dave, - Sr25519Keyring::Ferdie, - ] - .into_iter() - .map(|key| AuthorityDiscoveryId::from(key.public())) - .collect(); - - buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]); - buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]); - buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]); - buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]); - - assert_eq!(buf.validators_to_connect(), validators[..4].to_vec()); - - for validator in &validators[2..4] { - buf.reset_validator_interest(hashes[2], validator); - } - - buf.reset_validator_interest(hashes[1], &validators[0]); - let mut expected: Vec<_> = validators[..4].iter().cloned().collect(); - let mut result = buf.validators_to_connect(); - expected.sort(); - result.sort(); - assert_eq!(result, expected); - - buf.reset_validator_interest(hashes[0], &validators[0]); - let mut expected: Vec<_> = validators[1..4].iter().cloned().collect(); - expected.sort(); - let mut result = buf.validators_to_connect(); - result.sort(); - assert_eq!(result, expected); - - buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]); - buf.note_collation_advertised( - hashes[4], - 0, - GroupIndex(2), - std::slice::from_ref(&validators[4]), - ); - - buf.reset_validator_interest(hashes[3], &validators[2]); - buf.note_collation_advertised( - hashes[4], - 0, - GroupIndex(3), - std::slice::from_ref(&validators[0]), - ); - - assert_eq!( - buf.validators_to_connect(), - vec![validators[3].clone(), validators[4].clone(), validators[0].clone()] - ); - } -} From 76d7cdfa2de9ed072b43b40513763137905a238c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 2 Oct 2025 14:27:28 +0300 Subject: [PATCH 04/11] ensure we consider all valid relay parents Signed-off-by: Andrei Sandu --- .../src/collator_side/mod.rs | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 6ec1227abb984..9f22ea0c76a63 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -53,8 +53,8 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::{ AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash, - CandidateReceiptV2 as CandidateReceipt, CollatorPair, CoreIndex, Hash, HeadData, - Id as ParaId, SessionIndex, + CandidateReceiptV2 as CandidateReceipt, CollatorPair, CoreIndex, Hash, HeadData, Id as ParaId, + SessionIndex, }; use crate::{modify_reputation, LOG_TARGET, LOG_TARGET_STATS}; @@ -402,7 +402,7 @@ async fn distribute_collation( // We should already be connected to the validators, but if we aren't, we will try to connect to // them now. - connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent).await; + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, id).await; let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) { Some(per_relay_parent) => per_relay_parent, @@ -645,14 +645,21 @@ fn has_assigned_cores( fn list_of_backing_validators_in_view( implicit_view: &Option, per_relay_parent: &HashMap, + para_id: ParaId, ) -> Vec { let mut backing_validators = HashSet::new(); let Some(implicit_view) = implicit_view else { return vec![] }; for leaf in implicit_view.leaves() { - if let Some(relay_parent) = per_relay_parent.get(leaf) { - for group in relay_parent.validator_group.values() { - backing_validators.extend(group.validators.iter().cloned()); + let allowed_ancestry = implicit_view + .known_allowed_relay_parents_under(leaf, Some(para_id)) + .unwrap_or_default(); + + for allowed_relay_parent in allowed_ancestry { + if let Some(relay_parent) = per_relay_parent.get(allowed_relay_parent) { + for group in relay_parent.validator_group.values() { + backing_validators.extend(group.validators.iter().cloned()); + } } } } @@ -667,13 +674,14 @@ async fn connect_to_validators( ctx: &mut Context, implicit_view: &Option, per_relay_parent: &HashMap, + para_id: ParaId, ) { let cores_assigned = has_assigned_cores(implicit_view, per_relay_parent); // If no cores are assigned to the para, we still need to send a ConnectToValidators request to // the network bridge passing an empty list of validator ids. Otherwise, it will keep connecting // to the last requested validators until a new request is issued. let validator_ids = if cores_assigned { - list_of_backing_validators_in_view(implicit_view, per_relay_parent) + list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id) } else { Vec::new() }; @@ -1260,7 +1268,11 @@ async fn handle_network_msg( OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); handle_our_view_change(ctx, runtime, state, view).await?; - connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent).await; + // Connect only if we are collating on a para. + if let Some(para_id) = state.collating_on { + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, para_id) + .await; + } }, PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; @@ -1363,7 +1375,16 @@ async fn handle_our_view_change( state.per_relay_parent.insert( *leaf, - PerRelayParent::new(ctx, runtime, para_id, claim_queue, block_number, *leaf, session_index).await?, + PerRelayParent::new( + ctx, + runtime, + para_id, + claim_queue, + block_number, + *leaf, + session_index, + ) + .await?, ); process_block_events( @@ -1426,7 +1447,7 @@ async fn handle_our_view_change( claim_queue, block_number, *block_hash, - session_index + session_index, ) .await?, ) @@ -1740,7 +1761,10 @@ async fn run_inner( } _ = reconnect_timeout => { - connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent).await; + // Connect only if we are collating on a para. + if let Some(para_id) = state.collating_on { + connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent, para_id).await; + } gum::trace!( target: LOG_TARGET, From 1ac73b5dd9858845246b3f1400acea4d10a576c8 Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 2 Oct 2025 12:09:44 +0000 Subject: [PATCH 05/11] Update from github-actions[bot] running command 'prdoc generate --bump patch' --- prdoc/pr_9178.prdoc | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 prdoc/pr_9178.prdoc diff --git a/prdoc/pr_9178.prdoc b/prdoc/pr_9178.prdoc new file mode 100644 index 0000000000000..fd4f0aa52f63f --- /dev/null +++ b/prdoc/pr_9178.prdoc @@ -0,0 +1,22 @@ +title: 'collator-protocol: cleanup connecting to backing group' +doc: +- audience: Todo + description: |- + There are a few things wrong with the way we are handling connecting the validators in the backing group: + 1. `validators_to_connect` returns only validators in groups we already have a block to advertise and the last backing groups we advertised something to, that means that if our backing group changes, but we don't have anything to advertise it will continue to try to connect to the previous backing group and validator will log this and disconnect it immediately. + On the validator you will see`Declared as collator for unneeded para` and on the collator you will see Connect/Disconnect requests. This will continue every reconnect_timeout(4s from each active signal) until the collator advertises something to the new backing group. This is harmless, but it pollutes both the collator and the validator logs. + + 2. A collator connects only when it has something to advertise to its backing group, this is a bit too late and we can improve it by connecting the collators to the backing group immediately after they notice their assigned backing group. + + 3. Staying connected to the last backingroup we advertised something does not work for elastic scaling because we have different backing groups and if the collator set is big enough that collators author just one block per group rotation, then we will always connect just when we have a candidate to advertise. + + ## Proposal to fix: + + Have collators always connect to the backing group they got assigned to and keep the connection open until backing group changes. Also, try to connect when have something to advertise or on timeout to have more chances of being correctly connected. + + ## Todo + - [x] Confirm that proposal does not have other undesired side effects. + - [x] Tests +crates: +- name: polkadot-collator-protocol + bump: patch From 7112e9f76b9ef3be2a335be35306ad355d5ba50b Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 18:49:35 +0300 Subject: [PATCH 06/11] fix prdoc Signed-off-by: Andrei Sandu --- prdoc/pr_9178.prdoc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/prdoc/pr_9178.prdoc b/prdoc/pr_9178.prdoc index fd4f0aa52f63f..5f5e11996d140 100644 --- a/prdoc/pr_9178.prdoc +++ b/prdoc/pr_9178.prdoc @@ -1,6 +1,6 @@ title: 'collator-protocol: cleanup connecting to backing group' doc: -- audience: Todo +- audience: Node Dev description: |- There are a few things wrong with the way we are handling connecting the validators in the backing group: 1. `validators_to_connect` returns only validators in groups we already have a block to advertise and the last backing groups we advertised something to, that means that if our backing group changes, but we don't have anything to advertise it will continue to try to connect to the previous backing group and validator will log this and disconnect it immediately. @@ -10,13 +10,10 @@ doc: 3. Staying connected to the last backingroup we advertised something does not work for elastic scaling because we have different backing groups and if the collator set is big enough that collators author just one block per group rotation, then we will always connect just when we have a candidate to advertise. - ## Proposal to fix: + ## Fix: Have collators always connect to the backing group they got assigned to and keep the connection open until backing group changes. Also, try to connect when have something to advertise or on timeout to have more chances of being correctly connected. - ## Todo - - [x] Confirm that proposal does not have other undesired side effects. - - [x] Tests crates: - name: polkadot-collator-protocol bump: patch From f106131debe045b1f0ff082b4e21548e9ea1cdf1 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 18:17:29 +0300 Subject: [PATCH 07/11] remove validator buffer file Signed-off-by: Andrei Sandu --- .../src/collator_side/mod.rs | 35 ++++++++- .../src/collator_side/validators_buffer.rs | 72 ------------------- 2 files changed, 33 insertions(+), 74 deletions(-) delete mode 100644 polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index bcfb6d9fbca7c..a390f12edae2c 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -66,14 +66,12 @@ mod error; mod metrics; #[cfg(test)] mod tests; -mod validators_buffer; use collation::{ ActiveCollationFetches, Collation, CollationSendResult, CollationStatus, VersionedCollationRequest, WaitingCollationFetches, }; use error::{log_error, Error, FatalError, Result}; -use validators_buffer::{ResetInterestTimeout, RESET_INTEREST_TIMEOUT}; pub use metrics::Metrics; @@ -90,6 +88,9 @@ const COST_APPARENT_FLOOD: Rep = /// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386 const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150); +/// A timeout for resetting validators' interests in collations. +const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6); + /// Ensure that collator updates its connection requests to validators /// this long after the most recent leaf. /// @@ -113,6 +114,36 @@ const MAX_PARALLEL_CHAIN_API_REQUESTS: usize = 10; /// connected. type ReconnectTimeout = Fuse; +/// A future that returns a candidate hash along with validator discovery +/// keys once a timeout hit. +/// +/// If a validator doesn't manage to fetch a collation within this timeout +/// we should reset its interest in this advertisement in a buffer. For example, +/// when the PoV was already requested from another peer. +struct ResetInterestTimeout { + fut: futures_timer::Delay, + candidate_hash: CandidateHash, + peer_id: PeerId, +} + +impl ResetInterestTimeout { + /// Returns new `ResetInterestTimeout` that resolves after given timeout. + fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self { + Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id } + } +} + +impl std::future::Future for ResetInterestTimeout { + type Output = (CandidateHash, PeerId); + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id)) + } +} + #[derive(Debug)] enum ShouldAdvertiseTo { Yes, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs deleted file mode 100644 index ec467c0ab843e..0000000000000 --- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Validator groups buffer for connection managements. -//! -//! Solves 2 problems: -//! 1. A collator may want to stay connected to multiple groups on rotation boundaries. -//! 2. It's important to disconnect from validator when there're no collations to be fetched. -//! -//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement, -//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise. -//! -//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a -//! relay parent, one can reset a bit back to 0 for particular **validator**. For example, if a -//! collation was fetched or some timeout has been hit. -//! -//! The bitwise OR over known advertisements gives us validators indices for connection request. - -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use futures::FutureExt; - -use polkadot_node_network_protocol::PeerId; -use polkadot_primitives::CandidateHash; - -/// A timeout for resetting validators' interests in collations. -pub const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6); - -/// A future that returns a candidate hash along with validator discovery -/// keys once a timeout hit. -/// -/// If a validator doesn't manage to fetch a collation within this timeout -/// we should reset its interest in this advertisement in a buffer. For example, -/// when the PoV was already requested from another peer. -pub struct ResetInterestTimeout { - fut: futures_timer::Delay, - candidate_hash: CandidateHash, - peer_id: PeerId, -} - -impl ResetInterestTimeout { - /// Returns new `ResetInterestTimeout` that resolves after given timeout. - pub fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self { - Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id } - } -} - -impl Future for ResetInterestTimeout { - type Output = (CandidateHash, PeerId); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id)) - } -} From c66197358c90c38996d361f57c04653ce7263e0a Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 19:02:56 +0300 Subject: [PATCH 08/11] review Signed-off-by: Andrei Sandu --- .../collator-protocol/src/collator_side/mod.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index a390f12edae2c..874c3a7b77017 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -424,7 +424,6 @@ impl State { #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn distribute_collation( ctx: &mut Context, - runtime: &mut RuntimeInfo, state: &mut State, id: ParaId, receipt: CandidateReceipt, @@ -504,16 +503,16 @@ async fn distribute_collation( ); } - let our_core = core_index; - - // Determine the group on that core. - let GroupValidators { validators } = - determine_our_validators(ctx, runtime, our_core, candidate_relay_parent).await?; + let validators = per_relay_parent + .validator_group + .get(&core_index) + .map(|v| v.validators.clone()) + .unwrap_or_default(); if validators.is_empty() { gum::warn!( target: LOG_TARGET, - core = ?our_core, + core = ?core_index, "there are no validators assigned to core", ); @@ -526,7 +525,7 @@ async fn distribute_collation( candidate_relay_parent = %candidate_relay_parent, ?candidate_hash, pov_hash = ?pov.hash(), - core = ?our_core, + ?core_index, current_validators = ?validators, "Accepted collation, connecting to validators." ); @@ -867,7 +866,6 @@ async fn process_msg( let _ = state.metrics.time_collation_distribution("distribute"); distribute_collation( ctx, - runtime, state, id, candidate_receipt, From 386ca1506739d24bf81fb354efcac8d4a85a94dd Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 20 Oct 2025 14:12:49 +0300 Subject: [PATCH 09/11] fix tests Signed-off-by: Andrei Sandu --- .../network/collator-protocol/src/collator_side/tests/mod.rs | 3 --- .../src/collator_side/tests/prospective_parachains.rs | 4 ---- 2 files changed, 7 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 1627e93c75dec..de14b5bb0389e 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -387,7 +387,6 @@ async fn expect_determine_validator_group( async fn distribute_collation_with_receipt( virtual_overseer: &mut VirtualOverseer, expected_connected: Vec, - test_state: &TestState, candidate: CandidateReceipt, pov: PoV, parent_head_data_hash: Hash, @@ -406,7 +405,6 @@ async fn distribute_collation_with_receipt( .await; check_connected_to_validators(virtual_overseer, expected_connected).await; - expect_determine_validator_group(virtual_overseer, test_state).await; DistributeCollation { candidate, pov_block: pov } } @@ -434,7 +432,6 @@ async fn distribute_collation( distribute_collation_with_receipt( virtual_overseer, expected_connected, - test_state, candidate, pov_block, parent_head_data_hash, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index 48f40fec6f05c..4f4c3526d7f2f 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -394,7 +394,6 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b distribute_collation_with_receipt( virtual_overseer, test_state.current_group_validator_authority_ids(), - &test_state, candidate, pov, parent_head_data_hash, @@ -543,7 +542,6 @@ fn distribute_collation_up_to_limit() { distribute_collation_with_receipt( virtual_overseer, test_state.current_group_validator_authority_ids(), - &test_state, candidate, pov, parent_head_data_hash, @@ -684,7 +682,6 @@ fn send_parent_head_data_for_elastic_scaling() { distribute_collation_with_receipt( &mut virtual_overseer, expected_connected, - &test_state, candidate.clone(), pov_data.clone(), phdh, @@ -814,7 +811,6 @@ fn advertise_and_send_collation_by_hash() { distribute_collation_with_receipt( &mut virtual_overseer, test_state.current_group_validator_authority_ids(), - &test_state, candidate.clone(), pov.clone(), Hash::zero(), From 02362a9f1f539a56f68aeed7b64e5bbb920fddc3 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 20 Oct 2025 14:16:02 +0300 Subject: [PATCH 10/11] remove clone derive Signed-off-by: Andrei Sandu --- .../node/network/collator-protocol/src/collator_side/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 874c3a7b77017..8bd0c3bf2da61 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -154,7 +154,7 @@ enum ShouldAdvertiseTo { /// Info about validators we are currently connected to. /// /// It keeps track to which validators we advertised our collation. -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default)] struct ValidatorGroup { /// Validators discovery ids. Lazily initialized when first /// distributing a collation. From 0169bd68d3fc4f27dc0420693b6bfbf808c0254e Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 20 Oct 2025 14:18:40 +0300 Subject: [PATCH 11/11] update prdoc Signed-off-by: Andrei Sandu --- prdoc/pr_9178.prdoc | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/prdoc/pr_9178.prdoc b/prdoc/pr_9178.prdoc index 5f5e11996d140..55d09d20e42f5 100644 --- a/prdoc/pr_9178.prdoc +++ b/prdoc/pr_9178.prdoc @@ -2,17 +2,10 @@ title: 'collator-protocol: cleanup connecting to backing group' doc: - audience: Node Dev description: |- - There are a few things wrong with the way we are handling connecting the validators in the backing group: - 1. `validators_to_connect` returns only validators in groups we already have a block to advertise and the last backing groups we advertised something to, that means that if our backing group changes, but we don't have anything to advertise it will continue to try to connect to the previous backing group and validator will log this and disconnect it immediately. - On the validator you will see`Declared as collator for unneeded para` and on the collator you will see Connect/Disconnect requests. This will continue every reconnect_timeout(4s from each active signal) until the collator advertises something to the new backing group. This is harmless, but it pollutes both the collator and the validator logs. - - 2. A collator connects only when it has something to advertise to its backing group, this is a bit too late and we can improve it by connecting the collators to the backing group immediately after they notice their assigned backing group. - - 3. Staying connected to the last backingroup we advertised something does not work for elastic scaling because we have different backing groups and if the collator set is big enough that collators author just one block per group rotation, then we will always connect just when we have a candidate to advertise. - - ## Fix: - - Have collators always connect to the backing group they got assigned to and keep the connection open until backing group changes. Also, try to connect when have something to advertise or on timeout to have more chances of being correctly connected. + + Have collators always connect to the backing group they got assigned to and keep the connection open + until backing group changes. Also, try to connect when have something to advertise or on timeout to + have more chances of being correctly connected. crates: - name: polkadot-collator-protocol