diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 11905e432fa3d..be84bc1d6869d 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -54,9 +54,15 @@ use polkadot_node_subsystem_util::{ TimeoutExt, }; use polkadot_primitives::{ +<<<<<<< HEAD vstaging::{CandidateEvent, CandidateReceiptV2 as CandidateReceipt}, AuthorityDiscoveryId, BlockNumber, CandidateHash, CollatorPair, CoreIndex, GroupIndex, Hash, HeadData, Id as ParaId, SessionIndex, +======= + AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash, + CandidateReceiptV2 as CandidateReceipt, CollatorPair, CoreIndex, Hash, HeadData, Id as ParaId, + SessionIndex, +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) }; use crate::{modify_reputation, LOG_TARGET, LOG_TARGET_STATS}; @@ -66,16 +72,12 @@ mod error; mod metrics; #[cfg(test)] mod tests; -mod validators_buffer; use collation::{ ActiveCollationFetches, Collation, CollationSendResult, CollationStatus, VersionedCollationRequest, WaitingCollationFetches, }; use error::{log_error, Error, FatalError, Result}; -use validators_buffer::{ - ResetInterestTimeout, ValidatorGroupsBuffer, RESET_INTEREST_TIMEOUT, VALIDATORS_BUFFER_CAPACITY, -}; pub use metrics::Metrics; @@ -92,6 +94,9 @@ const COST_APPARENT_FLOOD: Rep = /// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386 const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150); +/// A timeout for resetting validators' interests in collations. +const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6); + /// Ensure that collator updates its connection requests to validators /// this long after the most recent leaf. /// @@ -112,6 +117,36 @@ const RECONNECT_AFTER_LEAF_TIMEOUT: Duration = Duration::from_secs(4); /// connected. type ReconnectTimeout = Fuse; +/// A future that returns a candidate hash along with validator discovery +/// keys once a timeout hit. +/// +/// If a validator doesn't manage to fetch a collation within this timeout +/// we should reset its interest in this advertisement in a buffer. For example, +/// when the PoV was already requested from another peer. +struct ResetInterestTimeout { + fut: futures_timer::Delay, + candidate_hash: CandidateHash, + peer_id: PeerId, +} + +impl ResetInterestTimeout { + /// Returns new `ResetInterestTimeout` that resolves after given timeout. + fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self { + Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id } + } +} + +impl std::future::Future for ResetInterestTimeout { + type Output = (CandidateHash, PeerId); + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id)) + } +} + #[derive(Debug)] enum ShouldAdvertiseTo { Yes, @@ -246,11 +281,20 @@ struct PerRelayParent { } impl PerRelayParent { - fn new( + #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] + async fn new( + ctx: &mut Context, + runtime: &mut RuntimeInfo, para_id: ParaId, claim_queue: ClaimQueueSnapshot, block_number: Option, +<<<<<<< HEAD ) -> Self { +======= + block_hash: Hash, + session_index: SessionIndex, + ) -> Result { +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) let assignments = claim_queue.iter_all_claims().fold(HashMap::new(), |mut acc, (core, claims)| { let n_claims = claims.iter().filter(|para| para == &¶_id).count(); @@ -260,12 +304,27 @@ impl PerRelayParent { acc }); - Self { - validator_group: HashMap::default(), + let mut validator_groups = HashMap::default(); + + for (core, _) in &assignments { + let GroupValidators { validators } = + determine_our_validators(ctx, runtime, *core, block_hash).await?; + let mut group = ValidatorGroup::default(); + group.validators = validators; + validator_groups.insert(*core, group); + } + + Ok(Self { + validator_group: validator_groups, collations: HashMap::new(), assignments, block_number, +<<<<<<< HEAD } +======= + session_index, + }) +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) } } @@ -300,9 +359,6 @@ struct State { /// as we learn the [`PeerId`]'s by `PeerConnected` events. peer_ids: HashMap>, - /// Tracks which validators we want to stay connected to. - validator_groups_buf: ValidatorGroupsBuffer, - /// Timeout-future which is reset after every leaf to [`RECONNECT_AFTER_LEAF_TIMEOUT`] seconds. /// When it fires, we update our reserved peers. reconnect_timeout: ReconnectTimeout, @@ -355,7 +411,6 @@ impl State { per_relay_parent: Default::default(), collation_result_senders: Default::default(), peer_ids: Default::default(), - validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY), reconnect_timeout: Fuse::terminated(), waiting_collation_fetches: Default::default(), active_collation_fetches: Default::default(), @@ -377,7 +432,6 @@ impl State { #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn distribute_collation( ctx: &mut Context, - runtime: &mut RuntimeInfo, state: &mut State, id: ParaId, receipt: CandidateReceipt, @@ -389,7 +443,10 @@ async fn distribute_collation( ) -> Result<()> { let candidate_relay_parent = receipt.descriptor.relay_parent(); let candidate_hash = receipt.hash(); - let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent); + + // We should already be connected to the validators, but if we aren't, we will try to connect to + // them now. + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, id).await; let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) { Some(per_relay_parent) => per_relay_parent, @@ -455,41 +512,29 @@ async fn distribute_collation( ); } - let our_core = core_index; - - // Determine the group on that core. - let GroupValidators { validators, session_index, group_index } = - determine_our_validators(ctx, runtime, our_core, candidate_relay_parent).await?; + let validators = per_relay_parent + .validator_group + .get(&core_index) + .map(|v| v.validators.clone()) + .unwrap_or_default(); if validators.is_empty() { gum::warn!( target: LOG_TARGET, - core = ?our_core, + core = ?core_index, "there are no validators assigned to core", ); return Ok(()) } - // It's important to insert new collation interests **before** - // issuing a connection request. - // - // If a validator managed to fetch all the relevant collations - // but still assigned to our core, we keep the connection alive. - state.validator_groups_buf.note_collation_advertised( - candidate_hash, - session_index, - group_index, - &validators, - ); - gum::debug!( target: LOG_TARGET, para_id = %id, candidate_relay_parent = %candidate_relay_parent, ?candidate_hash, pov_hash = ?pov.hash(), - core = ?our_core, + ?core_index, current_validators = ?validators, "Accepted collation, connecting to validators." ); @@ -501,9 +546,6 @@ async fn distribute_collation( group }); - // Update a set of connected validators if necessary. - connect_to_validators(ctx, cores_assigned, &state.validator_groups_buf).await; - if let Some(result_sender) = result_sender { state.collation_result_senders.insert(candidate_hash, result_sender); } @@ -525,9 +567,23 @@ async fn distribute_collation( status: CollationStatus::Created, }, core_index, +<<<<<<< HEAD stats: per_relay_parent .block_number .map(|n| CollationStats::new(para_head, n, &state.metrics)), +======= + session_index: per_relay_parent.session_index, + stats: per_relay_parent.block_number.map(|n| { + CollationStats::new( + para_head, + n, + candidate_relay_parent, + &state.metrics, + *candidate_hash, + pov_hash, + ) + }), +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) }, ); @@ -572,9 +628,6 @@ async fn distribute_collation( struct GroupValidators { /// The validators of above group (their discovery keys). validators: Vec, - - session_index: SessionIndex, - group_index: GroupIndex, } /// Figure out current group of validators assigned to the para being collated on. @@ -606,11 +659,7 @@ async fn determine_our_validators( let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect(); - let current_validators = GroupValidators { - validators: current_validators, - session_index, - group_index: current_group_index, - }; + let current_validators = GroupValidators { validators: current_validators }; Ok(current_validators) } @@ -656,19 +705,49 @@ fn has_assigned_cores( false } +fn list_of_backing_validators_in_view( + implicit_view: &Option, + per_relay_parent: &HashMap, + para_id: ParaId, +) -> Vec { + let mut backing_validators = HashSet::new(); + let Some(implicit_view) = implicit_view else { return vec![] }; + + for leaf in implicit_view.leaves() { + let allowed_ancestry = implicit_view + .known_allowed_relay_parents_under(leaf, Some(para_id)) + .unwrap_or_default(); + + for allowed_relay_parent in allowed_ancestry { + if let Some(relay_parent) = per_relay_parent.get(allowed_relay_parent) { + for group in relay_parent.validator_group.values() { + backing_validators.extend(group.validators.iter().cloned()); + } + } + } + } + + backing_validators.into_iter().collect() +} + /// Updates a set of connected validators based on their advertisement-bits /// in a validators buffer. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn connect_to_validators( ctx: &mut Context, - cores_assigned: bool, - validator_groups_buf: &ValidatorGroupsBuffer, + implicit_view: &Option, + per_relay_parent: &HashMap, + para_id: ParaId, ) { + let cores_assigned = has_assigned_cores(implicit_view, per_relay_parent); // If no cores are assigned to the para, we still need to send a ConnectToValidators request to // the network bridge passing an empty list of validator ids. Otherwise, it will keep connecting // to the last requested validators until a new request is issued. - let validator_ids = - if cores_assigned { validator_groups_buf.validators_to_connect() } else { Vec::new() }; + let validator_ids = if cores_assigned { + list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id) + } else { + Vec::new() + }; gum::trace!( target: LOG_TARGET, @@ -807,7 +886,6 @@ async fn process_msg( let _ = state.metrics.time_collation_distribution("distribute"); distribute_collation( ctx, - runtime, state, id, candidate_receipt, @@ -1257,7 +1335,16 @@ async fn handle_network_msg( }, OurViewChange(view) => { gum::trace!(target: LOG_TARGET, ?view, "Own view change"); +<<<<<<< HEAD handle_our_view_change(ctx, state, view).await?; +======= + handle_our_view_change(ctx, runtime, state, view).await?; + // Connect only if we are collating on a para. + if let Some(para_id) = state.collating_on { + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, para_id) + .await; + } +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) }, PeerMessage(remote, msg) => { handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?; @@ -1336,6 +1423,7 @@ async fn process_block_events( #[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)] async fn handle_our_view_change( ctx: &mut Context, + runtime: &mut RuntimeInfo, state: &mut State, view: OurView, ) -> Result<()> { @@ -1347,7 +1435,13 @@ async fn handle_our_view_change( let added: Vec<_> = view.iter().filter(|h| !implicit_view.contains_leaf(h)).collect(); for leaf in added { +<<<<<<< HEAD let claim_queue: ClaimQueueSnapshot = fetch_claim_queue(ctx.sender(), *leaf).await?; +======= + let session_index = runtime.get_session_index_for_child(ctx.sender(), *leaf).await?; + + let claim_queue = fetch_claim_queue(ctx.sender(), *leaf).await?; +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) implicit_view .activate_leaf(ctx.sender(), *leaf) @@ -1355,9 +1449,26 @@ async fn handle_our_view_change( .map_err(Error::ImplicitViewFetchError)?; let block_number = implicit_view.block_number(leaf); +<<<<<<< HEAD state .per_relay_parent .insert(*leaf, PerRelayParent::new(para_id, claim_queue, block_number)); +======= + + state.per_relay_parent.insert( + *leaf, + PerRelayParent::new( + ctx, + runtime, + para_id, + claim_queue, + block_number, + *leaf, + session_index, + ) + .await?, + ); +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) process_block_events( ctx, @@ -1383,6 +1494,7 @@ async fn handle_our_view_change( for block_hash in allowed_ancestry { let block_number = implicit_view.block_number(block_hash); +<<<<<<< HEAD if state.per_relay_parent.get(block_hash).is_none() { let claim_queue = fetch_claim_queue(ctx.sender(), *block_hash).await?; state @@ -1392,6 +1504,51 @@ async fn handle_our_view_change( let per_relay_parent = state.per_relay_parent.get_mut(block_hash).expect("Just inserted"); +======= + let per_relay_parent = match state.per_relay_parent.entry(*block_hash) { + Entry::Vacant(entry) => { + let claim_queue = match fetch_claim_queue(ctx.sender(), *block_hash).await { + Ok(cq) => cq, + Err(error) => { + gum::debug!( + target: LOG_TARGET, + ?block_hash, + ?error, + "Failed to fetch claim queue while iterating allowed ancestry", + ); + continue + }, + }; + let session_index = + match runtime.get_session_index_for_child(ctx.sender(), *leaf).await { + Ok(si) => si, + Err(error) => { + gum::debug!( + target: LOG_TARGET, + ?block_hash, + ?error, + "Failed to fetch session index while iterating allowed ancestry", + ); + continue + }, + }; + + entry.insert( + PerRelayParent::new( + ctx, + runtime, + para_id, + claim_queue, + block_number, + *block_hash, + session_index, + ) + .await?, + ) + }, + Entry::Occupied(entry) => entry.into_mut(), + }; +>>>>>>> db5c89ff (collator-protocol: cleanup connecting to backing group (#9178)) // Announce relevant collations to these peers. for peer_id in &peers { @@ -1436,7 +1593,6 @@ async fn handle_our_view_change( let candidate_hash: CandidateHash = collation.receipt.hash(); state.collation_result_senders.remove(&candidate_hash); - state.validator_groups_buf.remove_candidate(&candidate_hash); process_out_of_view_collation(&mut state.collation_tracker, collation_with_core); } @@ -1604,10 +1760,6 @@ async fn run_inner( // timeout, we simply start processing next request. // The request it still alive, it should be kept in a waiting queue. } else { - for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { - // This peer has received the candidate. Not interested anymore. - state.validator_groups_buf.reset_validator_interest(candidate_hash, authority_id); - } waiting.waiting_peers.remove(&(peer_id, candidate_hash)); // Update collation status to fetched. @@ -1672,19 +1824,19 @@ async fn run_inner( } }, (candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => { - // NOTE: it doesn't necessarily mean that a validator gets disconnected, - // it only will if there're no other advertisements we want to send. - // - // No-op if the collation was already fetched or went out of view. - for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { - state - .validator_groups_buf - .reset_validator_interest(candidate_hash, &authority_id); - } + gum::debug!( + target: LOG_TARGET, + ?candidate_hash, + ?peer_id, + "Advertising to peer timed out" + ); } _ = reconnect_timeout => { - let cores_assigned = has_assigned_cores(&state.implicit_view, &state.per_relay_parent); - connect_to_validators(&mut ctx, cores_assigned, &state.validator_groups_buf).await; + + // Connect only if we are collating on a para. + if let Some(para_id) = state.collating_on { + connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent, para_id).await; + } gum::trace!( target: LOG_TARGET, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 997711f076302..3bcb5cb9d9c05 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -99,7 +99,7 @@ impl Default for TestState { let validator_peer_id = std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect(); - let validator_groups = vec![vec![2, 0, 4], vec![1, 3]] + let validator_groups = vec![vec![2, 0, 4], vec![1, 3], vec![], vec![]] .into_iter() .map(|g| g.into_iter().map(ValidatorIndex).collect()) .collect(); @@ -162,6 +162,12 @@ impl TestState { &self.session_info.validator_groups.get(GroupIndex::from(group_idx)).unwrap() } + fn validator_indices_for_core(&self, core: CoreIndex) -> &[ValidatorIndex] { + let core_num = self.claim_queue.len(); + let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(core, core_num); + &self.session_info.validator_groups.get(GroupIndex::from(group_idx)).unwrap() + } + fn current_session_index(&self) -> SessionIndex { self.session_index } @@ -179,6 +185,13 @@ impl TestState { .map(|i| self.session_info.discovery_keys[i.0 as usize].clone()) .collect() } + + fn validator_authority_ids_for_core(&self, core: CoreIndex) -> Vec { + self.validator_indices_for_core(core) + .iter() + .map(|i| self.session_info.discovery_keys[i.0 as usize].clone()) + .collect() + } } type VirtualOverseer = @@ -293,28 +306,31 @@ struct DistributeCollation { pov_block: PoV, } -async fn distribute_collation_with_receipt( +// Check that the next received message is a connection request to validators. +async fn check_connected_to_validators( virtual_overseer: &mut VirtualOverseer, - test_state: &TestState, - relay_parent: Hash, - should_connect: bool, - candidate: CandidateReceipt, - pov: PoV, - parent_head_data_hash: Hash, -) -> DistributeCollation { - overseer_send( - virtual_overseer, - CollatorProtocolMessage::DistributeCollation { - candidate_receipt: candidate.clone(), - parent_head_data_hash, - pov: pov.clone(), - parent_head_data: HeadData(vec![1, 2, 3]), - result_sender: None, - core_index: candidate.descriptor.core_index().unwrap(), - }, - ) - .await; + expected_connected: Vec, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ConnectToValidators { + validator_ids, peer_set: _, failed: _, + } + ) => { + assert_eq!(validator_ids.len(), expected_connected.len()); + for validator in expected_connected.iter() { + assert!(validator_ids.contains(validator)); + } + } + ); +} +// Expect that the next received messages are the ones necessary to determine the validator group. +async fn expect_determine_validator_group( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, +) { // We don't know precisely what is going to come as session info might be cached: loop { match overseer_recv(virtual_overseer).await { @@ -355,7 +371,6 @@ async fn distribute_collation_with_receipt( _relay_parent, RuntimeApiRequest::ValidatorGroups(tx), )) => { - assert_eq!(_relay_parent, relay_parent); tx.send(Ok(( test_state.session_info.validator_groups.to_vec(), test_state.group_rotation_info.clone(), @@ -367,28 +382,38 @@ async fn distribute_collation_with_receipt( other => panic!("Unexpected message received: {:?}", other), } } +} - if should_connect { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { - .. - } - ) => {} - ); - } +async fn distribute_collation_with_receipt( + virtual_overseer: &mut VirtualOverseer, + expected_connected: Vec, + candidate: CandidateReceipt, + pov: PoV, + parent_head_data_hash: Hash, +) -> DistributeCollation { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::DistributeCollation { + candidate_receipt: candidate.clone(), + parent_head_data_hash, + pov: pov.clone(), + parent_head_data: HeadData(vec![1, 2, 3]), + result_sender: None, + core_index: candidate.descriptor.core_index().unwrap(), + }, + ) + .await; + check_connected_to_validators(virtual_overseer, expected_connected).await; DistributeCollation { candidate, pov_block: pov } } /// Create some PoV and distribute it. async fn distribute_collation( virtual_overseer: &mut VirtualOverseer, + expected_connected: Vec, test_state: &TestState, relay_parent: Hash, - // whether or not we expect a connection request or not. - should_connect: bool, ) -> DistributeCollation { // Now we want to distribute a `PoVBlock` let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) }; @@ -406,9 +431,7 @@ async fn distribute_collation( distribute_collation_with_receipt( virtual_overseer, - test_state, - relay_parent, - should_connect, + expected_connected, candidate, pov_block, parent_head_data_hash, @@ -544,11 +567,22 @@ fn v1_protocol_rejected() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; for (val, peer) in test_state .current_group_validator_authority_ids() @@ -590,15 +624,20 @@ fn advertise_and_send_collation() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let DistributeCollation { candidate, pov_block } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - true, ) .await; @@ -701,8 +740,14 @@ fn advertise_and_send_collation() { test_state.relay_parent.randomize(); // Update our view, making the old relay parent go out of the implicit view. - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 20)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 20)], + 1, + ) + .await; let peer = test_state.validator_peer_id[2]; @@ -732,9 +777,9 @@ fn advertise_and_send_collation() { let DistributeCollation { candidate, .. } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - true, ) .await; @@ -780,14 +825,20 @@ fn delay_reputation_change() { ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let DistributeCollation { candidate, .. } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - true, ) .await; @@ -928,6 +979,7 @@ fn collators_declare_to_connected_peers() { .await; update_view( + Some(test_state.current_group_validator_authority_ids()), &test_state, &mut test_harness.virtual_overseer, vec![(test_state.relay_parent, 10)], @@ -971,8 +1023,14 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id)).await; @@ -986,9 +1044,13 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { // And let it tell us that it is has the same view. send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - let DistributeCollation { candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; expect_advertise_collation_msg( virtual_overseer, @@ -1036,8 +1098,14 @@ fn collate_on_two_different_relay_chain_blocks() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id)).await; @@ -1048,9 +1116,13 @@ fn collate_on_two_different_relay_chain_blocks() { expect_declare_msg(virtual_overseer, &test_state, &peer).await; expect_declare_msg(virtual_overseer, &test_state, &peer2).await; - let DistributeCollation { candidate: old_candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate: old_candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; let old_relay_parent = test_state.relay_parent; @@ -1058,6 +1130,7 @@ fn collate_on_two_different_relay_chain_blocks() { // parent are active. test_state.relay_parent.randomize(); update_view( + Some(test_state.current_group_validator_authority_ids()), &test_state, virtual_overseer, vec![(old_relay_parent, 10), (test_state.relay_parent, 10)], @@ -1065,9 +1138,13 @@ fn collate_on_two_different_relay_chain_blocks() { ) .await; - let DistributeCollation { candidate: new_candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate: new_candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await; expect_advertise_collation_msg( @@ -1111,17 +1188,27 @@ fn validator_reconnect_does_not_advertise_a_second_time() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id.clone())) .await; expect_declare_msg(virtual_overseer, &test_state, &peer).await; - let DistributeCollation { candidate, .. } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate, .. } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await; expect_advertise_collation_msg( @@ -1165,8 +1252,14 @@ fn collators_reject_declare_messages() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; // A validator connected to us connect_peer(virtual_overseer, peer, CollationVersion::V2, Some(validator_id)).await; @@ -1226,12 +1319,22 @@ where overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; - update_view(&test_state, virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; - let DistributeCollation { candidate, pov_block } = - distribute_collation(virtual_overseer, &test_state, test_state.relay_parent, true) - .await; + let DistributeCollation { candidate, pov_block } = distribute_collation( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; for (val, peer) in test_state .current_group_validator_authority_ids() @@ -1351,7 +1454,7 @@ where } #[test] -fn connect_to_buffered_groups() { +fn connect_to_group_in_view() { let mut test_state = TestState::default(); let local_peer_id = test_state.local_peer_id; let collator_pair = test_state.collator_pair.clone(); @@ -1370,8 +1473,14 @@ fn connect_to_buffered_groups() { ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let group_a = test_state.current_group_validator_authority_ids(); let peers_a = test_state.current_group_validator_peer_ids(); @@ -1379,21 +1488,12 @@ fn connect_to_buffered_groups() { let DistributeCollation { candidate, .. } = distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - false, ) .await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - assert_eq!(group_a, validator_ids); - } - ); - let head_a = test_state.relay_parent; for (val, peer) in group_a.iter().zip(&peers_a) { @@ -1452,8 +1552,13 @@ fn connect_to_buffered_groups() { let old_relay_parent = test_state.relay_parent; test_state.relay_parent.randomize(); + test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); + // Update our view. + let mut expected_group = group_a.clone(); + expected_group.extend(test_state.current_group_validator_authority_ids()); update_view( + Some(expected_group.clone()), &test_state, &mut virtual_overseer, vec![(old_relay_parent, 10), (test_state.relay_parent, 20)], @@ -1461,8 +1566,6 @@ fn connect_to_buffered_groups() { ) .await; - test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); - let head_b = test_state.relay_parent; let group_b = test_state.current_group_validator_authority_ids(); assert_ne!(head_a, head_b); @@ -1470,27 +1573,12 @@ fn connect_to_buffered_groups() { distribute_collation( &mut virtual_overseer, + expected_group, &test_state, test_state.relay_parent, - false, ) .await; - // Should be connected to both groups except for the validator that fetched advertised - // collation. - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - assert!(!validator_ids.contains(&group_a[0])); - - for validator in group_a[1..].iter().chain(&group_b) { - assert!(validator_ids.contains(validator)); - } - } - ); - TestHarness { virtual_overseer, req_v2_cfg: req_cfg } }, ); @@ -1516,35 +1604,38 @@ fn connect_with_no_cores_assigned() { ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 10)], 1) - .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; let group_a = test_state.current_group_validator_authority_ids(); assert!(group_a.len() > 1); distribute_collation( &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, - false, ) .await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - assert_eq!(group_a, validator_ids); - } - ); - // Create a new relay parent and remove the core assignments. test_state.relay_parent.randomize(); test_state.claim_queue.clear(); - update_view(&test_state, &mut virtual_overseer, vec![(test_state.relay_parent, 20)], 1) - .await; + update_view( + Some(vec![]), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 20)], + 1, + ) + .await; // Send the ActiveLeaves signal to trigger the reconnect timeout. overseer_signal( diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index 8be5c4e876b5b..f305a724db3c9 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -28,6 +28,7 @@ fn get_parent_hash(hash: Hash) -> Hash { /// Handle a view update. pub(super) async fn update_view( + expected_connected: Option>, test_state: &TestState, virtual_overseer: &mut VirtualOverseer, new_view: Vec<(Hash, u32)>, // Hash and block number. @@ -63,7 +64,6 @@ pub(super) async fn update_view( .take(ancestry_len as usize); let ancestry_numbers = (min_number..=leaf_number).rev(); let mut ancestry_iter = ancestry_hashes.clone().zip(ancestry_numbers).peekable(); - if let Some((hash, number)) = ancestry_iter.next() { assert_matches!( overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)).await.unwrap(), @@ -180,6 +180,14 @@ pub(super) async fn update_view( ); } + for (_core, _paras) in test_state + .claim_queue + .iter() + .filter(|(_, paras)| paras.contains(&test_state.para_id)) + { + expect_determine_validator_group(virtual_overseer, &test_state).await; + } + for _ in ancestry_iter { while let Some(msg) = overseer_peek_with_timeout(virtual_overseer, Duration::from_millis(50)).await @@ -196,10 +204,33 @@ pub(super) async fn update_view( _, RuntimeApiRequest::CandidateEvents(_), )) + ) && !matches!( + &msg, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(_), + )) ) { break } + if matches!( + &msg, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionIndexForChild(_), + )) + ) { + for (_core, _paras) in test_state + .claim_queue + .iter() + .filter(|(_, paras)| paras.contains(&test_state.para_id)) + { + expect_determine_validator_group(virtual_overseer, &test_state).await; + } + break; + } + match overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(50)) .await .unwrap() @@ -223,6 +254,9 @@ pub(super) async fn update_view( } } } + if let Some(expected_connected) = expected_connected { + check_connected_to_validators(virtual_overseer, expected_connected).await; + } } /// Check that the next received message is a `Declare` message. @@ -303,12 +337,25 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b if validator_sends_view_first { // Activate leaf `c` to accept at least the collation. - update_view(&test_state, virtual_overseer, vec![(head_c, head_c_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_c, head_c_num)], + 1, + ) + .await; } else { // Activated leaf is `b`, but the collation will be based on `c`. - update_view(&test_state, virtual_overseer, vec![(head_b, head_b_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_b, head_b_num)], + 1, + ) + .await; } - let validator_peer_ids = test_state.current_group_validator_peer_ids(); for (val, peer) in test_state .current_group_validator_authority_ids() @@ -332,27 +379,16 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b ..Default::default() } .build(); + let DistributeCollation { candidate, pov_block: _ } = distribute_collation_with_receipt( virtual_overseer, - &test_state, - head_c, - false, // Check the group manually. + test_state.current_group_validator_authority_ids(), candidate, pov, parent_head_data_hash, ) .await; - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::NetworkBridgeTx( - NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } - ) => { - let expected_validators = test_state.current_group_validator_authority_ids(); - - assert_eq!(expected_validators, validator_ids); - } - ); let candidate_hash = candidate.hash(); @@ -373,7 +409,8 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b if validator_sends_view_first { // Activated leaf is `b`, but the collation will be based on `c`. - update_view(&test_state, virtual_overseer, vec![(head_b, head_b_num)], 1).await; + update_view(None, &test_state, virtual_overseer, vec![(head_b, head_b_num)], 1) + .await; for _ in &validator_peer_ids { expect_advertise_collation_msg( @@ -384,11 +421,24 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b ) .await; } + + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; } // Head `c` goes out of view. // Build a different candidate for this relay parent and attempt to distribute it. - update_view(&test_state, virtual_overseer, vec![(head_a, head_a_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_a, head_a_num)], + 1, + ) + .await; let pov = PoV { block_data: BlockData(vec![4, 5, 6]) }; let parent_head_data_hash = Hash::repeat_byte(0xBB); @@ -412,6 +462,12 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b ) .await; + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; + // Parent out of view, nothing happens. assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) .await @@ -453,7 +509,14 @@ fn distribute_collation_up_to_limit() { overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; // Activated leaf is `a`, but the collation will be based on `b`. - update_view(&test_state, virtual_overseer, vec![(head_a, head_a_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + virtual_overseer, + vec![(head_a, head_a_num)], + 1, + ) + .await; for i in 0..expected_assignments { let pov = PoV { block_data: BlockData(vec![i as u8]) }; @@ -468,9 +531,7 @@ fn distribute_collation_up_to_limit() { .build(); distribute_collation_with_receipt( virtual_overseer, - &test_state, - head_b, - true, + test_state.current_group_validator_authority_ids(), candidate, pov, parent_head_data_hash, @@ -501,6 +562,11 @@ fn distribute_collation_up_to_limit() { ) .await; + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; // Limit has been reached. assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) .await @@ -531,6 +597,12 @@ fn distribute_collation_up_to_limit() { ) .await; + check_connected_to_validators( + virtual_overseer, + test_state.current_group_validator_authority_ids(), + ) + .await; + assert!(overseer_recv_with_timeout(virtual_overseer, Duration::from_millis(100)) .await .is_none()); @@ -566,7 +638,24 @@ fn send_parent_head_data_for_elastic_scaling() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(head_b, head_b_num)], 1).await; + let expected_connected = [CoreIndex(0), CoreIndex(2), CoreIndex(3)] + .into_iter() + .map(|core| test_state.validator_authority_ids_for_core(core)) + .fold(HashSet::new(), |mut acc, res| { + acc.extend(res.into_iter()); + acc + }) + .into_iter() + .collect::>(); + + update_view( + Some(expected_connected.clone()), + &test_state, + &mut virtual_overseer, + vec![(head_b, head_b_num)], + 1, + ) + .await; let pov_data = PoV { block_data: BlockData(vec![1 as u8]) }; let candidate = TestCandidateBuilder { @@ -582,9 +671,7 @@ fn send_parent_head_data_for_elastic_scaling() { distribute_collation_with_receipt( &mut virtual_overseer, - &test_state, - head_b, - true, + expected_connected, candidate.clone(), pov_data.clone(), phdh, @@ -679,8 +766,22 @@ fn advertise_and_send_collation_by_hash() { CollatorProtocolMessage::CollateOn(test_state.para_id), ) .await; - update_view(&test_state, &mut virtual_overseer, vec![(head_b, head_b_num)], 1).await; - update_view(&test_state, &mut virtual_overseer, vec![(head_a, head_a_num)], 1).await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(head_b, head_b_num)], + 1, + ) + .await; + update_view( + Some(test_state.current_group_validator_authority_ids()), + &test_state, + &mut virtual_overseer, + vec![(head_a, head_a_num)], + 1, + ) + .await; let candidates: Vec<_> = (0..2) .map(|i| { @@ -699,9 +800,7 @@ fn advertise_and_send_collation_by_hash() { for (candidate, pov) in &candidates { distribute_collation_with_receipt( &mut virtual_overseer, - &test_state, - head_b, - true, + test_state.current_group_validator_authority_ids(), candidate.clone(), pov.clone(), Hash::zero(), diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs deleted file mode 100644 index 35202fc96299b..0000000000000 --- a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ /dev/null @@ -1,379 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Validator groups buffer for connection managements. -//! -//! Solves 2 problems: -//! 1. A collator may want to stay connected to multiple groups on rotation boundaries. -//! 2. It's important to disconnect from validator when there're no collations to be fetched. -//! -//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement, -//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise. -//! -//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a -//! relay parent, one can reset a bit back to 0 for particular **validator**. For example, if a -//! collation was fetched or some timeout has been hit. -//! -//! The bitwise OR over known advertisements gives us validators indices for connection request. - -use std::{ - collections::{HashMap, VecDeque}, - future::Future, - num::NonZeroUsize, - ops::Range, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use bitvec::{bitvec, vec::BitVec}; -use futures::FutureExt; - -use polkadot_node_network_protocol::PeerId; -use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, GroupIndex, SessionIndex}; - -/// Elastic scaling: how many candidates per relay chain block the collator supports building. -pub const MAX_CHAINED_CANDIDATES_PER_RCB: NonZeroUsize = match NonZeroUsize::new(3) { - Some(cap) => cap, - None => panic!("max candidates per rcb cannot be zero"), -}; - -/// The ring buffer stores at most this many unique validator groups. -/// -/// This value should be chosen in way that all groups assigned to our para -/// in the view can fit into the buffer multiplied by amount of candidates we support per relay -/// chain block in the case of elastic scaling. -pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = - match NonZeroUsize::new(3 * MAX_CHAINED_CANDIDATES_PER_RCB.get()) { - Some(cap) => cap, - None => panic!("buffer capacity must be non-zero"), - }; - -/// Unique identifier of a validators group. -#[derive(Debug)] -struct ValidatorsGroupInfo { - /// Number of validators in the group. - len: usize, - session_index: SessionIndex, - group_index: GroupIndex, -} - -/// Ring buffer of validator groups. -/// -/// Tracks which peers we want to be connected to with respect to advertised collations. -#[derive(Debug)] -pub struct ValidatorGroupsBuffer { - /// Validator groups identifiers we **had** advertisements for. - group_infos: VecDeque, - /// Continuous buffer of validators discovery keys. - validators: VecDeque, - /// Mapping from candidate hashes to bit-vectors with bits for all `validators`. - /// Invariants kept: All bit-vectors are guaranteed to have the same size. - should_be_connected: HashMap, - /// Buffer capacity, limits the number of **groups** tracked. - cap: NonZeroUsize, -} - -impl ValidatorGroupsBuffer { - /// Creates a new buffer with a non-zero capacity. - pub fn with_capacity(cap: NonZeroUsize) -> Self { - Self { - group_infos: VecDeque::new(), - validators: VecDeque::new(), - should_be_connected: HashMap::new(), - cap, - } - } - - /// Returns discovery ids of validators we are assigned to in this backing group window. - pub fn validators_to_connect(&self) -> Vec { - let validators_num = self.validators.len(); - let bits = self - .should_be_connected - .values() - .fold(bitvec![0; validators_num], |acc, next| acc | next); - - let mut should_be_connected: Vec = self - .validators - .iter() - .enumerate() - .filter_map(|(idx, authority_id)| bits[idx].then(|| authority_id.clone())) - .collect(); - - if let Some(last_group) = self.group_infos.iter().last() { - for validator in self.validators.iter().rev().take(last_group.len) { - if !should_be_connected.contains(validator) { - should_be_connected.push(validator.clone()); - } - } - } - - should_be_connected - } - - /// Note a new advertisement, marking that we want to be connected to validators - /// from this group. - /// - /// If max capacity is reached and the group is new, drops validators from the back - /// of the buffer. - pub fn note_collation_advertised( - &mut self, - candidate_hash: CandidateHash, - session_index: SessionIndex, - group_index: GroupIndex, - validators: &[AuthorityDiscoveryId], - ) { - if validators.is_empty() { - return - } - - match self.group_infos.iter().enumerate().find(|(_, group)| { - group.session_index == session_index && group.group_index == group_index - }) { - Some((idx, group)) => { - let group_start_idx = self.group_lengths_iter().take(idx).sum(); - self.set_bits(candidate_hash, group_start_idx..(group_start_idx + group.len)); - }, - None => self.push(candidate_hash, session_index, group_index, validators), - } - } - - /// Note that a validator is no longer interested in a given candidate. - pub fn reset_validator_interest( - &mut self, - candidate_hash: CandidateHash, - authority_id: &AuthorityDiscoveryId, - ) { - let bits = match self.should_be_connected.get_mut(&candidate_hash) { - Some(bits) => bits, - None => return, - }; - - for (idx, auth_id) in self.validators.iter().enumerate() { - if auth_id == authority_id { - bits.set(idx, false); - } - } - } - - /// Remove advertised candidate from the buffer. - /// - /// The buffer will no longer track which validators are interested in a corresponding - /// advertisement. - pub fn remove_candidate(&mut self, candidate_hash: &CandidateHash) { - self.should_be_connected.remove(candidate_hash); - } - - /// Pushes a new group to the buffer along with advertisement, setting all validators - /// bits to 1. - /// - /// If the buffer is full, drops group from the tail. - fn push( - &mut self, - candidate_hash: CandidateHash, - session_index: SessionIndex, - group_index: GroupIndex, - validators: &[AuthorityDiscoveryId], - ) { - let new_group_info = - ValidatorsGroupInfo { len: validators.len(), session_index, group_index }; - - let buf = &mut self.group_infos; - let cap = self.cap.get(); - - if buf.len() >= cap { - let pruned_group = buf.pop_front().expect("buf is not empty; qed"); - self.validators.drain(..pruned_group.len); - - self.should_be_connected.values_mut().for_each(|bits| { - bits.as_mut_bitslice().shift_left(pruned_group.len); - }); - } - - self.validators.extend(validators.iter().cloned()); - buf.push_back(new_group_info); - let buf_len = buf.len(); - let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum(); - - let new_len = self.validators.len(); - self.should_be_connected - .values_mut() - .for_each(|bits| bits.resize(new_len, false)); - self.set_bits(candidate_hash, group_start_idx..(group_start_idx + validators.len())); - } - - /// Sets advertisement bits to 1 in a given range (usually corresponding to some group). - /// If the relay parent is unknown, inserts 0-initialized bitvec first. - /// - /// The range must be ensured to be within bounds. - fn set_bits(&mut self, candidate_hash: CandidateHash, range: Range) { - let bits = self - .should_be_connected - .entry(candidate_hash) - .or_insert_with(|| bitvec![0; self.validators.len()]); - - bits[range].fill(true); - } - - /// Returns iterator over numbers of validators in groups. - /// - /// Useful for getting an index of the first validator in i-th group. - fn group_lengths_iter(&self) -> impl Iterator + '_ { - self.group_infos.iter().map(|group| group.len) - } -} - -/// A timeout for resetting validators' interests in collations. -pub const RESET_INTEREST_TIMEOUT: Duration = Duration::from_secs(6); - -/// A future that returns a candidate hash along with validator discovery -/// keys once a timeout hit. -/// -/// If a validator doesn't manage to fetch a collation within this timeout -/// we should reset its interest in this advertisement in a buffer. For example, -/// when the PoV was already requested from another peer. -pub struct ResetInterestTimeout { - fut: futures_timer::Delay, - candidate_hash: CandidateHash, - peer_id: PeerId, -} - -impl ResetInterestTimeout { - /// Returns new `ResetInterestTimeout` that resolves after given timeout. - pub fn new(candidate_hash: CandidateHash, peer_id: PeerId, delay: Duration) -> Self { - Self { fut: futures_timer::Delay::new(delay), candidate_hash, peer_id } - } -} - -impl Future for ResetInterestTimeout { - type Output = (CandidateHash, PeerId); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.fut.poll_unpin(cx).map(|_| (self.candidate_hash, self.peer_id)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use polkadot_primitives::Hash; - use sp_keyring::Sr25519Keyring; - - #[test] - fn one_capacity_buffer() { - let cap = NonZeroUsize::new(1).unwrap(); - let mut buf = ValidatorGroupsBuffer::with_capacity(cap); - - let hash_a = CandidateHash(Hash::repeat_byte(0x1)); - let hash_b = CandidateHash(Hash::repeat_byte(0x2)); - - let validators: Vec<_> = [ - Sr25519Keyring::Alice, - Sr25519Keyring::Bob, - Sr25519Keyring::Charlie, - Sr25519Keyring::Dave, - Sr25519Keyring::Ferdie, - ] - .into_iter() - .map(|key| AuthorityDiscoveryId::from(key.public())) - .collect(); - - assert!(buf.validators_to_connect().is_empty()); - - buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]); - assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); - - buf.reset_validator_interest(hash_a, &validators[1]); - assert_eq!(buf.validators_to_connect(), validators[0..2].to_vec()); - - buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]); - assert_eq!(buf.validators_to_connect(), validators[2..].to_vec()); - - for validator in &validators[2..] { - buf.reset_validator_interest(hash_b, validator); - } - let mut expected = validators[2..].to_vec(); - expected.sort(); - let mut result = buf.validators_to_connect(); - result.sort(); - assert_eq!(result, expected); - } - - #[test] - fn buffer_works() { - let cap = NonZeroUsize::new(3).unwrap(); - let mut buf = ValidatorGroupsBuffer::with_capacity(cap); - - let hashes: Vec<_> = (0..5).map(|i| CandidateHash(Hash::repeat_byte(i))).collect(); - - let validators: Vec<_> = [ - Sr25519Keyring::Alice, - Sr25519Keyring::Bob, - Sr25519Keyring::Charlie, - Sr25519Keyring::Dave, - Sr25519Keyring::Ferdie, - ] - .into_iter() - .map(|key| AuthorityDiscoveryId::from(key.public())) - .collect(); - - buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]); - buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]); - buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]); - buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]); - - assert_eq!(buf.validators_to_connect(), validators[..4].to_vec()); - - for validator in &validators[2..4] { - buf.reset_validator_interest(hashes[2], validator); - } - - buf.reset_validator_interest(hashes[1], &validators[0]); - let mut expected: Vec<_> = validators[..4].iter().cloned().collect(); - let mut result = buf.validators_to_connect(); - expected.sort(); - result.sort(); - assert_eq!(result, expected); - - buf.reset_validator_interest(hashes[0], &validators[0]); - let mut expected: Vec<_> = validators[1..4].iter().cloned().collect(); - expected.sort(); - let mut result = buf.validators_to_connect(); - result.sort(); - assert_eq!(result, expected); - - buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]); - buf.note_collation_advertised( - hashes[4], - 0, - GroupIndex(2), - std::slice::from_ref(&validators[4]), - ); - - buf.reset_validator_interest(hashes[3], &validators[2]); - buf.note_collation_advertised( - hashes[4], - 0, - GroupIndex(3), - std::slice::from_ref(&validators[0]), - ); - - assert_eq!( - buf.validators_to_connect(), - vec![validators[3].clone(), validators[4].clone(), validators[0].clone()] - ); - } -} diff --git a/prdoc/pr_9178.prdoc b/prdoc/pr_9178.prdoc new file mode 100644 index 0000000000000..55d09d20e42f5 --- /dev/null +++ b/prdoc/pr_9178.prdoc @@ -0,0 +1,12 @@ +title: 'collator-protocol: cleanup connecting to backing group' +doc: +- audience: Node Dev + description: |- + + Have collators always connect to the backing group they got assigned to and keep the connection open + until backing group changes. Also, try to connect when have something to advertise or on timeout to + have more chances of being correctly connected. + +crates: +- name: polkadot-collator-protocol + bump: patch