From 604cfc42b8640a483fc4c63b75b29bc7f46f161d Mon Sep 17 00:00:00 2001 From: Chris Sosnin <48099298+slumber@users.noreply.github.com> Date: Wed, 19 Oct 2022 14:18:23 +0400 Subject: [PATCH] validator assignment fixes for backing and collator protocol (#6158) * Rename depth->ancestry len in tests * Refactor group assignments * Remove implicit assignments * backing: consider occupied core assignments * Track a single para on validator side --- node/core/backing/src/lib.rs | 40 +++-- .../src/tests/prospective_parachains.rs | 166 ++++++++++++++++-- .../src/validator_side/collation.rs | 30 ++-- .../src/validator_side/mod.rs | 117 ++++-------- .../tests/prospective_parachains.rs | 34 ++-- .../src/legacy_v1/mod.rs | 4 +- 6 files changed, 226 insertions(+), 165 deletions(-) diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs index a743c529165a..b319a9cbcfac 100644 --- a/node/core/backing/src/lib.rs +++ b/node/core/backing/src/lib.rs @@ -651,7 +651,7 @@ async fn validate_and_make_available( let pov = match pov { PoVData::Ready(pov) => pov, - PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => { + PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => match request_pov( &mut sender, relay_parent, @@ -674,8 +674,7 @@ async fn validate_and_make_available( }, Err(err) => return Err(err), Ok(pov) => pov, - } - } + }, }; let v = { @@ -1077,16 +1076,26 @@ async fn construct_per_relay_parent_state( let mut assignment = None; for (idx, core) in cores.into_iter().enumerate() { - // Ignore prospective assignments on occupied cores for the time being. - if let CoreState::Scheduled(scheduled) = core { - let core_index = CoreIndex(idx as _); - let group_index = group_rotation_info.group_for_core(core_index, n_cores); - if let Some(g) = validator_groups.get(group_index.0 as usize) { - if validator.as_ref().map_or(false, |v| g.contains(&v.index())) { - assignment = Some((scheduled.para_id, scheduled.collator)); - } - groups.insert(scheduled.para_id, g.clone()); + let core_para_id = match core { + CoreState::Scheduled(scheduled) => scheduled.para_id, + CoreState::Occupied(occupied) => + if mode.is_enabled() { + // Async backing makes it legal to build on top of + // occupied core. + occupied.candidate_descriptor.para_id + } else { + continue + }, + CoreState::Free => continue, + }; + + let core_index = CoreIndex(idx as _); + let group_index = group_rotation_info.group_for_core(core_index, n_cores); + if let Some(g) = validator_groups.get(group_index.0 as usize) { + if validator.as_ref().map_or(false, |v| g.contains(&v.index())) { + assignment = Some(core_para_id); } + groups.insert(core_para_id, g.clone()); } } @@ -1098,13 +1107,6 @@ async fn construct_per_relay_parent_state( }, }; - // TODO [now]: I've removed the `required_collator` more broadly, - // because it's not used in practice and was intended for parathreads. - // - // We should attempt parathreads another way, I think, so it makes sense - // to remove. - let assignment = assignment.map(|(a, _required_collator)| a); - Ok(Some(PerRelayParentState { prospective_parachains_mode: mode, parent, diff --git a/node/core/backing/src/tests/prospective_parachains.rs b/node/core/backing/src/tests/prospective_parachains.rs index 59db7f62b722..59ed1027e624 100644 --- a/node/core/backing/src/tests/prospective_parachains.rs +++ b/node/core/backing/src/tests/prospective_parachains.rs @@ -17,7 +17,7 @@ //! Tests for the backing subsystem with enabled prospective parachains. use polkadot_node_subsystem::{messages::ChainApiMessage, TimeoutExt}; -use polkadot_primitives::v2::{BlockNumber, Header}; +use polkadot_primitives::v2::{BlockNumber, Header, OccupiedCore}; use super::*; @@ -299,7 +299,7 @@ fn seconding_sanity_check_allowed() { test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { // Candidate is seconded in a parent of the activated `leaf_a`. const LEAF_A_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_A_DEPTH: BlockNumber = 3; + const LEAF_A_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; let leaf_b_hash = Hash::from_low_u64_be(128); @@ -312,11 +312,11 @@ fn seconding_sanity_check_allowed() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_ANCESTRY_LEN)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; const LEAF_B_BLOCK_NUMBER: BlockNumber = LEAF_A_BLOCK_NUMBER + 2; - const LEAF_B_DEPTH: BlockNumber = 4; + const LEAF_B_ANCESTRY_LEN: BlockNumber = 4; let activated = ActivatedLeaf { hash: leaf_b_hash, @@ -324,7 +324,7 @@ fn seconding_sanity_check_allowed() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_B_BLOCK_NUMBER - LEAF_B_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_B_BLOCK_NUMBER - LEAF_B_ANCESTRY_LEN)]; let test_leaf_b = TestLeaf { activated, min_relay_parents }; activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; @@ -436,7 +436,7 @@ fn seconding_sanity_check_disallowed() { test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { // Candidate is seconded in a parent of the activated `leaf_a`. const LEAF_A_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_A_DEPTH: BlockNumber = 3; + const LEAF_A_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; let leaf_b_hash = Hash::from_low_u64_be(128); @@ -449,11 +449,11 @@ fn seconding_sanity_check_disallowed() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_ANCESTRY_LEN)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; const LEAF_B_BLOCK_NUMBER: BlockNumber = LEAF_A_BLOCK_NUMBER + 2; - const LEAF_B_DEPTH: BlockNumber = 4; + const LEAF_B_ANCESTRY_LEN: BlockNumber = 4; let activated = ActivatedLeaf { hash: leaf_b_hash, @@ -461,7 +461,7 @@ fn seconding_sanity_check_disallowed() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_B_BLOCK_NUMBER - LEAF_B_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_B_BLOCK_NUMBER - LEAF_B_ANCESTRY_LEN)]; let test_leaf_b = TestLeaf { activated, min_relay_parents }; activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; @@ -633,7 +633,7 @@ fn prospective_parachains_reject_candidate() { test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { // Candidate is seconded in a parent of the activated `leaf_a`. const LEAF_A_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_A_DEPTH: BlockNumber = 3; + const LEAF_A_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; let leaf_a_hash = Hash::from_low_u64_be(130); @@ -644,7 +644,7 @@ fn prospective_parachains_reject_candidate() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_ANCESTRY_LEN)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; @@ -798,7 +798,7 @@ fn second_multiple_candidates_per_relay_parent() { test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { // Candidate `a` is seconded in a parent of the activated `leaf`. const LEAF_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_DEPTH: BlockNumber = 3; + const LEAF_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; let leaf_hash = Hash::from_low_u64_be(130); @@ -810,7 +810,7 @@ fn second_multiple_candidates_per_relay_parent() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_BLOCK_NUMBER - LEAF_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_BLOCK_NUMBER - LEAF_ANCESTRY_LEN)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; @@ -922,7 +922,7 @@ fn backing_works() { test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { // Candidate `a` is seconded in a parent of the activated `leaf`. const LEAF_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_DEPTH: BlockNumber = 3; + const LEAF_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; let leaf_hash = Hash::from_low_u64_be(130); @@ -933,7 +933,7 @@ fn backing_works() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_BLOCK_NUMBER - LEAF_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_BLOCK_NUMBER - LEAF_ANCESTRY_LEN)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; @@ -1081,7 +1081,7 @@ fn concurrent_dependent_candidates() { // Candidate `a` is seconded in a grandparent of the activated `leaf`, // candidate `b` -- in parent. const LEAF_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_DEPTH: BlockNumber = 3; + const LEAF_ANCESTRY_LEN: BlockNumber = 3; let para_id = test_state.chain_ids[0]; let leaf_hash = Hash::from_low_u64_be(130); @@ -1093,7 +1093,7 @@ fn concurrent_dependent_candidates() { status: LeafStatus::Fresh, span: Arc::new(jaeger::Span::Disabled), }; - let min_relay_parents = vec![(para_id, LEAF_BLOCK_NUMBER - LEAF_DEPTH)]; + let min_relay_parents = vec![(para_id, LEAF_BLOCK_NUMBER - LEAF_ANCESTRY_LEN)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; @@ -1308,7 +1308,7 @@ fn seconding_sanity_check_occupy_same_depth() { test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { // Candidate `a` is seconded in a parent of the activated `leaf`. const LEAF_BLOCK_NUMBER: BlockNumber = 100; - const LEAF_DEPTH: BlockNumber = 3; + const LEAF_ANCESTRY_LEN: BlockNumber = 3; let para_id_a = test_state.chain_ids[0]; let para_id_b = test_state.chain_ids[1]; @@ -1323,7 +1323,7 @@ fn seconding_sanity_check_occupy_same_depth() { span: Arc::new(jaeger::Span::Disabled), }; - let min_block_number = LEAF_BLOCK_NUMBER - LEAF_DEPTH; + let min_block_number = LEAF_BLOCK_NUMBER - LEAF_ANCESTRY_LEN; let min_relay_parents = vec![(para_id_a, min_block_number), (para_id_b, min_block_number)]; let test_leaf_a = TestLeaf { activated, min_relay_parents }; @@ -1433,3 +1433,131 @@ fn seconding_sanity_check_occupy_same_depth() { virtual_overseer }); } + +// Test that the subsystem doesn't skip occupied cores assignments. +#[test] +fn occupied_core_assignment() { + let mut test_state = TestState::default(); + test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move { + // Candidate is seconded in a parent of the activated `leaf_a`. + const LEAF_A_BLOCK_NUMBER: BlockNumber = 100; + const LEAF_A_ANCESTRY_LEN: BlockNumber = 3; + let para_id = test_state.chain_ids[0]; + + // Set the core state to occupied. + let mut candidate_descriptor = ::test_helpers::dummy_candidate_descriptor(Hash::zero()); + candidate_descriptor.para_id = para_id; + test_state.availability_cores[0] = CoreState::Occupied(OccupiedCore { + group_responsible: Default::default(), + next_up_on_available: None, + occupied_since: 100_u32, + time_out_at: 200_u32, + next_up_on_time_out: None, + availability: Default::default(), + candidate_descriptor, + candidate_hash: Default::default(), + }); + + let leaf_a_hash = Hash::from_low_u64_be(130); + let leaf_a_parent = get_parent_hash(leaf_a_hash); + let activated = ActivatedLeaf { + hash: leaf_a_hash, + number: LEAF_A_BLOCK_NUMBER, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + }; + let min_relay_parents = vec![(para_id, LEAF_A_BLOCK_NUMBER - LEAF_A_ANCESTRY_LEN)]; + let test_leaf_a = TestLeaf { activated, min_relay_parents }; + + activate_leaf(&mut virtual_overseer, test_leaf_a, &test_state, 0).await; + + let pov = PoV { block_data: BlockData(vec![42, 43, 44]) }; + let pvd = dummy_pvd(); + let validation_code = ValidationCode(vec![1, 2, 3]); + + let expected_head_data = test_state.head_data.get(¶_id).unwrap(); + + let pov_hash = pov.hash(); + let candidate = TestCandidateBuilder { + para_id, + relay_parent: leaf_a_parent, + pov_hash, + head_data: expected_head_data.clone(), + erasure_root: make_erasure_root(&test_state, pov.clone(), pvd.clone()), + persisted_validation_data_hash: pvd.hash(), + validation_code: validation_code.0.clone(), + ..Default::default() + } + .build(); + + let second = CandidateBackingMessage::Second( + leaf_a_hash, + candidate.to_plain(), + pvd.clone(), + pov.clone(), + ); + + virtual_overseer.send(FromOrchestra::Communication { msg: second }).await; + + assert_validate_seconded_candidate( + &mut virtual_overseer, + leaf_a_parent, + &candidate, + &pov, + &pvd, + &validation_code, + expected_head_data, + false, + ) + .await; + + // `seconding_sanity_check` + let expected_request = HypotheticalDepthRequest { + candidate_hash: candidate.hash(), + candidate_para: para_id, + parent_head_data_hash: pvd.parent_head.hash(), + candidate_relay_parent: leaf_a_parent, + fragment_tree_relay_parent: leaf_a_hash, + }; + assert_hypothetical_depth_requests( + &mut virtual_overseer, + vec![(expected_request, vec![0, 1, 2, 3])], + ) + .await; + // Prospective parachains are notified. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ProspectiveParachains( + ProspectiveParachainsMessage::CandidateSeconded( + candidate_para, + candidate_receipt, + _pvd, + tx, + ), + ) if candidate_receipt == candidate && candidate_para == para_id && pvd == _pvd => { + // Any non-empty response will do. + tx.send(vec![(leaf_a_hash, vec![0, 1, 2, 3])]).unwrap(); + } + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::Share( + parent_hash, + _signed_statement, + ) + ) if parent_hash == leaf_a_parent => {} + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::CollatorProtocol(CollatorProtocolMessage::Seconded(hash, statement)) => { + assert_eq!(leaf_a_parent, hash); + assert_matches!(statement.payload(), Statement::Seconded(_)); + } + ); + + virtual_overseer + }); +} diff --git a/node/network/collator-protocol/src/validator_side/collation.rs b/node/network/collator-protocol/src/validator_side/collation.rs index c5cb848d2815..48353cb266ad 100644 --- a/node/network/collator-protocol/src/validator_side/collation.rs +++ b/node/network/collator-protocol/src/validator_side/collation.rs @@ -28,7 +28,7 @@ //! └─▶Advertised ─▶ Pending ─▶ Fetched ─▶ Validated use futures::channel::oneshot; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use polkadot_node_network_protocol::PeerId; use polkadot_node_primitives::PoV; @@ -183,14 +183,14 @@ pub struct Collations { pub fetching_from: Option<(CollatorId, Option)>, /// Collation that were advertised to us, but we did not yet fetch. pub waiting_queue: VecDeque<(PendingCollation, CollatorId)>, - /// How many collations have been seconded per parachain. - pub seconded_count: HashMap, + /// How many collations have been seconded. + pub seconded_count: usize, } impl Collations { /// Note a seconded collation for a given para. - pub(super) fn note_seconded(&mut self, para_id: ParaId) { - *self.seconded_count.entry(para_id).or_insert(0) += 1 + pub(super) fn note_seconded(&mut self) { + self.seconded_count += 1 } /// Returns the next collation to fetch from the `waiting_queue`. @@ -225,17 +225,12 @@ impl Collations { match self.status { // We don't need to fetch any other collation when we already have seconded one. CollationStatus::Seconded => None, - CollationStatus::Waiting => { - while let Some(next) = self.waiting_queue.pop_front() { - let para_id = next.0.para_id; - if !self.is_seconded_limit_reached(relay_parent_mode, para_id) { - continue - } - - return Some(next) - } - None - }, + CollationStatus::Waiting => + if !self.is_seconded_limit_reached(relay_parent_mode) { + None + } else { + self.waiting_queue.pop_front() + }, CollationStatus::WaitingOnValidation | CollationStatus::Fetching => unreachable!("We have reset the status above!"), } @@ -245,10 +240,9 @@ impl Collations { pub(super) fn is_seconded_limit_reached( &self, relay_parent_mode: ProspectiveParachainsMode, - para_id: ParaId, ) -> bool { let seconded_limit = if relay_parent_mode.is_enabled() { MAX_CANDIDATE_DEPTH + 1 } else { 1 }; - self.seconded_count.get(¶_id).map_or(true, |&num| num < seconded_limit) + self.seconded_count < seconded_limit } } diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 2454a23e3581..2b93b295418d 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -364,31 +364,10 @@ impl PeerData { } } -#[derive(Debug, Copy, Clone)] -enum AssignedCoreState { - Scheduled, - Occupied, -} - -impl AssignedCoreState { - fn is_occupied(&self) -> bool { - matches!(self, AssignedCoreState::Occupied) - } -} - -#[derive(Debug, Clone)] +#[derive(Debug)] struct GroupAssignments { /// Current assignment. - current: Option<(ParaId, AssignedCoreState)>, - /// Paras we're implicitly assigned to with respect to ancestry. - /// This only includes paras from children relay chain blocks assignments. - /// - /// Implicit assignments are not reference-counted since they're accumulated - /// from the most recent leaf. - /// - /// Should be relatively small depending on the group rotation frequency and - /// allowed ancestry length. - implicit: Vec, + current: Option, } struct PerRelayParent { @@ -401,7 +380,7 @@ impl PerRelayParent { fn new(mode: ProspectiveParachainsMode) -> Self { Self { prospective_parachains_mode: mode, - assignment: GroupAssignments { current: None, implicit: Vec::new() }, + assignment: GroupAssignments { current: None }, collations: Collations::default(), } } @@ -490,6 +469,7 @@ async fn assign_incoming( current_assignments: &mut HashMap, keystore: &SyncCryptoStorePtr, relay_parent: Hash, + relay_parent_mode: ProspectiveParachainsMode, ) -> Result<()> where Sender: CollatorProtocolSenderTrait, @@ -518,9 +498,9 @@ where let core_now = rotation_info.core_for_group(group, cores.len()); cores.get(core_now.0 as usize).and_then(|c| match c { - CoreState::Occupied(core) => Some((core.para_id(), AssignedCoreState::Occupied)), - CoreState::Scheduled(core) => Some((core.para_id, AssignedCoreState::Scheduled)), - CoreState::Free => None, + CoreState::Occupied(core) if relay_parent_mode.is_enabled() => Some(core.para_id()), + CoreState::Scheduled(core) => Some(core.para_id), + CoreState::Occupied(_) | CoreState::Free => None, }) }, None => { @@ -538,7 +518,7 @@ where // // However, this'll work fine for parachains, as each parachain gets a dedicated // core. - if let Some((para_id, _)) = para_now.as_ref() { + if let Some(para_id) = para_now.as_ref() { let entry = current_assignments.entry(*para_id).or_default(); *entry += 1; if *entry == 1 { @@ -551,7 +531,7 @@ where } } - *group_assignment = GroupAssignments { current: para_now, implicit: Vec::new() }; + *group_assignment = GroupAssignments { current: para_now }; Ok(()) } @@ -562,7 +542,7 @@ fn remove_outgoing( ) { let GroupAssignments { current, .. } = per_relay_parent.assignment; - if let Some((cur, _)) = current { + if let Some(cur) = current { if let Entry::Occupied(mut occupied) = current_assignments.entry(cur) { *occupied.get_mut() -= 1; if *occupied.get() == 0 { @@ -944,9 +924,6 @@ enum AdvertisementError { UndeclaredCollator, /// We're assigned to a different para at the given relay parent. InvalidAssignment, - /// Collator is trying to build on top of occupied core - /// when async backing is disabled. - CoreOccupied, /// An advertisement format doesn't match the relay parent. ProtocolMismatch, /// Para reached a limit of seconded candidates for this relay parent. @@ -962,8 +939,7 @@ impl AdvertisementError { use AdvertisementError::*; match self { InvalidAssignment => Some(COST_WRONG_PARA), - RelayParentUnknown | UndeclaredCollator | CoreOccupied | Invalid(_) => - Some(COST_UNEXPECTED_MESSAGE), + RelayParentUnknown | UndeclaredCollator | Invalid(_) => Some(COST_UNEXPECTED_MESSAGE), UnknownPeer | ProtocolMismatch | SecondedLimitReached | @@ -1000,16 +976,8 @@ where peer_data.collating_para().ok_or(AdvertisementError::UndeclaredCollator)?; match assignment.current { - Some((id, core_state)) if id == collator_para_id => { - // Disallow building on top occupied core if async - // backing is disabled. - if !relay_parent_mode.is_enabled() && core_state.is_occupied() { - return Err(AdvertisementError::CoreOccupied) - } - }, - _ if assignment.implicit.contains(&collator_para_id) => { - // This relay parent is a part of implicit ancestry, - // thus async backing is enabled. + Some(id) if id == collator_para_id => { + // Our assignment. }, _ => return Err(AdvertisementError::InvalidAssignment), }; @@ -1063,7 +1031,7 @@ where }); let collations = &mut per_relay_parent.collations; - if !collations.is_seconded_limit_reached(relay_parent_mode, collator_para_id) { + if !collations.is_seconded_limit_reached(relay_parent_mode) { return Err(AdvertisementError::SecondedLimitReached) } @@ -1141,13 +1109,11 @@ where &mut state.current_assignments, keystore, *leaf, + mode, ) .await?; state.active_leaves.insert(*leaf, mode); - - let mut implicit_assignment = - Vec::from_iter(per_relay_parent.assignment.current.map(|(para, _)| para)); state.per_relay_parent.insert(*leaf, per_relay_parent); if mode.is_enabled() { @@ -1163,37 +1129,21 @@ where .known_allowed_relay_parents_under(leaf, None) .unwrap_or_default(); for block_hash in allowed_ancestry { - let entry = match state.per_relay_parent.entry(*block_hash) { - Entry::Vacant(entry) => { - let mut per_relay_parent = - PerRelayParent::new(ProspectiveParachainsMode::Enabled); - assign_incoming( - sender, - &mut per_relay_parent.assignment, - &mut state.current_assignments, - keystore, - *block_hash, - ) - .await?; - - entry.insert(per_relay_parent) - }, - Entry::Occupied(entry) => entry.into_mut(), - }; - - let current = entry.assignment.current.map(|(para, _)| para); - let implicit = &mut entry.assignment.implicit; + if let Entry::Vacant(entry) = state.per_relay_parent.entry(*block_hash) { + let mut per_relay_parent = + PerRelayParent::new(ProspectiveParachainsMode::Enabled); + assign_incoming( + sender, + &mut per_relay_parent.assignment, + &mut state.current_assignments, + keystore, + *block_hash, + mode, + ) + .await?; - // Extend implicitly assigned parachains. - for para in &implicit_assignment { - if !implicit.contains(para) { - implicit.push(*para); - } + entry.insert(per_relay_parent); } - // Current assignment propagates to parents, meaning that a parachain - // we're assigned to in fresh blocks can submit collations built - // on top of relay parents in the allowed ancestry, but not vice versa. - implicit_assignment.extend(current); } } } @@ -1355,9 +1305,8 @@ async fn process_msg( let fetched_collation = FetchedCollation::from(&receipt.to_plain()); if let Some(collation_event) = state.fetched_candidates.remove(&fetched_collation) { let (collator_id, pending_collation) = collation_event; - let PendingCollation { - relay_parent, peer_id, para_id, prospective_candidate, .. - } = pending_collation; + let PendingCollation { relay_parent, peer_id, prospective_candidate, .. } = + pending_collation; note_good_collation(ctx.sender(), &state.peer_data, collator_id.clone()).await; if let Some(peer_data) = state.peer_data.get(&peer_id) { notify_collation_seconded( @@ -1370,9 +1319,9 @@ async fn process_msg( .await; } - if let Some(state) = state.per_relay_parent.get_mut(&parent) { - state.collations.status = CollationStatus::Seconded; - state.collations.note_seconded(para_id); + if let Some(rp_state) = state.per_relay_parent.get_mut(&parent) { + rp_state.collations.status = CollationStatus::Seconded; + rp_state.collations.note_seconded(); } // If async backing is enabled, make an attempt to fetch next collation. let maybe_candidate_hash = diff --git a/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs b/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs index 1eccb97cbd67..b854ae202014 100644 --- a/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs +++ b/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs @@ -294,10 +294,11 @@ fn accept_advertisements_from_implicit_view() { let head_b = Hash::from_low_u64_be(128); let head_b_num: u32 = 2; + let head_c = get_parent_hash(head_b); // Grandparent of head `b`. - // Group rotation frequency is 1 by default, at `c` we're assigned + // Group rotation frequency is 1 by default, at `d` we're assigned // to the first para. - let head_c = Hash::from_low_u64_be(130); + let head_d = get_parent_hash(head_c); // Activated leaf is `b`, but the collation will be based on `c`. update_view(&mut virtual_overseer, &test_state, vec![(head_b, head_b_num)], 1).await; @@ -332,35 +333,24 @@ fn accept_advertisements_from_implicit_view() { Some((candidate_hash, parent_head_data_hash)), ) .await; - // Advertise with different para. - advertise_collation( + assert_fetch_collation_request( &mut virtual_overseer, - peer_a, head_c, - Some((candidate_hash, parent_head_data_hash)), + test_state.chain_ids[1], + Some(candidate_hash), ) .await; - - let response_channel = assert_fetch_collation_request( + // Advertise with different para. + advertise_collation( &mut virtual_overseer, - head_c, - test_state.chain_ids[1], - Some(candidate_hash), + peer_a, + head_d, // Note different relay parent. + Some((candidate_hash, parent_head_data_hash)), ) .await; - - // Respond with an error to abort seconding. - response_channel - .send(Err(sc_network::RequestFailure::NotConnected)) - .expect("Sending response should succeed"); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ReportPeer(..),) - ); - assert_fetch_collation_request( &mut virtual_overseer, - head_c, + head_d, test_state.chain_ids[0], Some(candidate_hash), ) diff --git a/node/network/statement-distribution/src/legacy_v1/mod.rs b/node/network/statement-distribution/src/legacy_v1/mod.rs index ae58d643d19e..776c1b95f5c4 100644 --- a/node/network/statement-distribution/src/legacy_v1/mod.rs +++ b/node/network/statement-distribution/src/legacy_v1/mod.rs @@ -18,9 +18,7 @@ use parity_scale_codec::Encode; use polkadot_node_network_protocol::{ self as net_protocol, - grid_topology::{ - GridNeighbors, RequiredRouting, SessionBoundGridTopologyStorage, - }, + grid_topology::{GridNeighbors, RequiredRouting, SessionBoundGridTopologyStorage}, peer_set::{IsAuthority, PeerSet, ValidationVersion}, v1::{self as protocol_v1, StatementMetadata}, vstaging as protocol_vstaging, IfDisconnected, PeerId, UnifiedReputationChange as Rep,