From c0396144469392874361ecfa5827521c0802b134 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 15 Nov 2022 00:19:31 -0800 Subject: [PATCH 01/30] Passed candidate events from scraper to participation --- .../dispute-coordinator/src/initialized.rs | 6 ++--- .../src/participation/mod.rs | 25 ++++++++++++++++++- .../src/participation/tests.rs | 1 + .../dispute-coordinator/src/scraping/mod.rs | 19 ++++++++------ 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 901a6d863ed2..ad2030838c55 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -190,7 +190,7 @@ impl Initialized { if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation - .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) + .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf), Vec::new()) .await?; } @@ -269,9 +269,9 @@ impl Initialized { update: ActiveLeavesUpdate, now: u64, ) -> Result<()> { - let on_chain_votes = + let (on_chain_votes, candidate_events) = self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; - self.participation.process_active_leaves_update(ctx, &update).await?; + self.participation.process_active_leaves_update(ctx, &update, candidate_events).await?; if let Some(new_leaf) = update.activated { match self diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 874f37e63213..df9e4b5dbf6e 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -31,7 +31,7 @@ use polkadot_node_subsystem::{ overseer, ActiveLeavesUpdate, RecoveryError, }; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; -use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; +use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex, CandidateEvent}; use crate::LOG_TARGET; @@ -195,6 +195,7 @@ impl Participation { &mut self, ctx: &mut Context, update: &ActiveLeavesUpdate, + candidate_events: Vec<(Vec, BlockNumber)>, ) -> FatalResult<()> { if let Some(activated) = &update.activated { match self.recent_block { @@ -208,6 +209,8 @@ impl Participation { }, Some(_) => {}, } + + self.handle_candidate_events(candidate_events); } Ok(()) } @@ -245,6 +248,26 @@ impl Participation { } Ok(()) } + + /// Update priority queue and best effort queue to account for + /// freshly backed or included disputed candidates + fn handle_candidate_events(&mut self, candidate_events : Vec<(Vec, BlockNumber)>) { + for (events_for_block, block_number) in candidate_events { + for event in events_for_block { + match event { + CandidateEvent::CandidateIncluded(receipt, _, _, _) => { + let candidate_hash = receipt.hash(); + }, + CandidateEvent::CandidateBacked(receipt, _, _, _) => { + let candidate_hash = receipt.hash(); + }, + _ => { + // skip the rest + }, + } + } + } + } } async fn participate( diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 03772b1918dc..cde2834d691a 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -101,6 +101,7 @@ async fn activate_leaf( number: block_number, status: LeafStatus::Fresh, }), + Vec::new(), ) .await } diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 99a6e68cdfb5..5ff37beca198 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -114,7 +114,7 @@ impl ChainScraper { }; let update = ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() }; - let votes = s.process_active_leaves_update(sender, &update).await?; + let (votes, _) = s.process_active_leaves_update(sender, &update).await?; Ok((s, votes)) } @@ -137,13 +137,13 @@ impl ChainScraper { &mut self, sender: &mut Sender, update: &ActiveLeavesUpdate, - ) -> Result> + ) -> Result<(Vec, Vec<(Vec, BlockNumber)>)> where Sender: overseer::DisputeCoordinatorSenderTrait, { let activated = match update.activated.as_ref() { Some(activated) => activated, - None => return Ok(Vec::new()), + None => return Ok((Vec::new(), Vec::new())), }; // Fetch ancestry up to last finalized block. @@ -157,11 +157,13 @@ impl ChainScraper { let block_hashes = std::iter::once(activated.hash).chain(ancestors); + let mut candidate_events: Vec<(Vec, BlockNumber)> = Vec::new(); let mut on_chain_votes = Vec::new(); for (block_number, block_hash) in block_numbers.zip(block_hashes) { gum::trace!(?block_number, ?block_hash, "In ancestor processing."); - self.process_candidate_events(sender, block_number, block_hash).await?; + let events_for_block = self.process_candidate_events(sender, block_number, block_hash).await?; + candidate_events.push((events_for_block, block_number)); if let Some(votes) = get_on_chain_votes(sender, block_hash).await? { on_chain_votes.push(votes); @@ -170,7 +172,7 @@ impl ChainScraper { self.last_observed_blocks.put(activated.hash, ()); - Ok(on_chain_votes) + Ok((on_chain_votes, candidate_events)) } /// Prune finalized candidates. @@ -201,12 +203,13 @@ impl ChainScraper { sender: &mut Sender, block_number: BlockNumber, block_hash: Hash, - ) -> Result<()> + ) -> Result> where Sender: overseer::DisputeCoordinatorSenderTrait, { + let events = get_candidate_events(sender, block_hash).await?; // Get included and backed events: - for ev in get_candidate_events(sender, block_hash).await? { + for ev in &events { match ev { CandidateEvent::CandidateIncluded(receipt, _, _, _) => { let candidate_hash = receipt.hash(); @@ -233,7 +236,7 @@ impl ChainScraper { }, } } - Ok(()) + Ok(events) } /// Returns ancestors of `head` in the descending order, stopping From 7666c67f96b63d8a3892ea0222b5c92269b7dfb9 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Wed, 16 Nov 2022 15:03:37 -0800 Subject: [PATCH 02/30] First draft PR 5875 --- .../dispute-coordinator/src/initialized.rs | 95 ++++++++++++++++++- .../src/participation/mod.rs | 25 +---- .../src/participation/queues/mod.rs | 2 +- .../src/participation/tests.rs | 1 - .../dispute-coordinator/src/scraping/mod.rs | 6 +- 5 files changed, 96 insertions(+), 33 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index ad2030838c55..1545181d0c8e 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -42,7 +42,7 @@ use polkadot_node_subsystem_util::rolling_session_window::{ use polkadot_primitives::v2::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, + ValidDisputeStatementKind, ValidatorId, ValidatorIndex, CandidateEvent, }; use crate::{ @@ -190,7 +190,7 @@ impl Initialized { if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation - .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf), Vec::new()) + .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) .await?; } @@ -271,7 +271,7 @@ impl Initialized { ) -> Result<()> { let (on_chain_votes, candidate_events) = self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; - self.participation.process_active_leaves_update(ctx, &update, candidate_events).await?; + self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { match self @@ -319,6 +319,9 @@ impl Initialized { }, ); } + + // Decrement spam slots for freshly backed or included candidates + self.handle_backed_included_events(ctx, overlay_db, candidate_events).await; } Ok(()) @@ -826,8 +829,10 @@ impl Initialized { let new_state = import_result.new_state(); let is_included = self.scraper.is_candidate_included(&candidate_hash); + let is_backed = self.scraper.is_candidate_backed(&candidate_hash); - let potential_spam = !is_included && !new_state.is_confirmed() && !new_state.has_own_vote(); + let potential_spam = !is_included && !is_backed + && !new_state.is_confirmed() && !new_state.has_own_vote(); gum::trace!( target: LOG_TARGET, @@ -1193,6 +1198,88 @@ impl Initialized { Ok(()) } + + /// Firstly decrements spam slots for validators who voted on potential spam + /// candidates that are newly backed or included, and therefore no longer + /// potential spam. + /// + /// Secondly queues participation if needed for freshly backed or included + /// candidates. Backed candidates are added to best_effort queue while + /// included are added to priority queue. + async fn handle_backed_included_events( + &mut self, + ctx: &mut Context, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, + candidate_events: Vec + ) + { + let session = self.rolling_session_window.latest_session(); + // Populate events contents, which allow us to queue participation without + // duplicated code + let mut events_contents: Vec<(CandidateReceipt, ParticipationPriority)> = Vec::new(); + for event in candidate_events { + match event { + CandidateEvent::CandidateBacked(receipt, _, _, _) => { + events_contents.push((receipt, ParticipationPriority::BestEffort)); + }, + CandidateEvent::CandidateIncluded(receipt, _, _, _) => { + events_contents.push((receipt, ParticipationPriority::Priority)); + } + _ => (), + } + } + + for (receipt, priority) in events_contents { + // Clear spam slots + self.spam_slots.clear(&(session, receipt.hash())); + + // Queue participation for freshly backed candidate. If dispute + // is already in priority queue don't try to queue participation + // in best_effort. Don't try to participate if candidate receipt + // can't be recovered from vote state, if we already voted, or + // if the candidate is not disputed. + let env = + match CandidateEnvironment::new(&*self.keystore, &self.rolling_session_window, session) + { + None => { + gum::warn!( + target: LOG_TARGET, + session, + "We are lacking a `SessionInfo` for handling import of statements." + ); + + continue // We skip queueing participation if not possible + }, + Some(env) => env, + }; + let votes_result = overlay_db.load_candidate_votes(session, &receipt.hash()); + if let Ok(maybe_votes) = votes_result { + if let Some(votes) = + maybe_votes.map(CandidateVotes::from) + { + let vote_state = CandidateVoteState::new(votes, &env); + let has_own_vote = vote_state.has_own_vote(); + let is_disputed = vote_state.is_disputed(); + let is_included = self.scraper.is_candidate_included(&receipt.hash()); + + if !has_own_vote && + is_disputed && + (priority == ParticipationPriority::Priority || !is_included) + { + let r = self + .participation + .queue_participation( + ctx, + priority, + ParticipationRequest::new(receipt, session), + ) + .await; + let _ = log_error(r); + } + } + } + } + } } /// Messages to be handled in this subsystem. diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index df9e4b5dbf6e..874f37e63213 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -31,7 +31,7 @@ use polkadot_node_subsystem::{ overseer, ActiveLeavesUpdate, RecoveryError, }; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; -use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex, CandidateEvent}; +use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; use crate::LOG_TARGET; @@ -195,7 +195,6 @@ impl Participation { &mut self, ctx: &mut Context, update: &ActiveLeavesUpdate, - candidate_events: Vec<(Vec, BlockNumber)>, ) -> FatalResult<()> { if let Some(activated) = &update.activated { match self.recent_block { @@ -209,8 +208,6 @@ impl Participation { }, Some(_) => {}, } - - self.handle_candidate_events(candidate_events); } Ok(()) } @@ -248,26 +245,6 @@ impl Participation { } Ok(()) } - - /// Update priority queue and best effort queue to account for - /// freshly backed or included disputed candidates - fn handle_candidate_events(&mut self, candidate_events : Vec<(Vec, BlockNumber)>) { - for (events_for_block, block_number) in candidate_events { - for event in events_for_block { - match event { - CandidateEvent::CandidateIncluded(receipt, _, _, _) => { - let candidate_hash = receipt.hash(); - }, - CandidateEvent::CandidateBacked(receipt, _, _, _) => { - let candidate_hash = receipt.hash(); - }, - _ => { - // skip the rest - }, - } - } - } - } } async fn participate( diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 3ec217628625..26ad3bdfee79 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -81,7 +81,7 @@ pub struct ParticipationRequest { } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum ParticipationPriority { BestEffort, Priority, diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index cde2834d691a..03772b1918dc 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -101,7 +101,6 @@ async fn activate_leaf( number: block_number, status: LeafStatus::Fresh, }), - Vec::new(), ) .await } diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 5ff37beca198..86641546043c 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -137,7 +137,7 @@ impl ChainScraper { &mut self, sender: &mut Sender, update: &ActiveLeavesUpdate, - ) -> Result<(Vec, Vec<(Vec, BlockNumber)>)> + ) -> Result<(Vec, Vec)> where Sender: overseer::DisputeCoordinatorSenderTrait, { @@ -157,13 +157,13 @@ impl ChainScraper { let block_hashes = std::iter::once(activated.hash).chain(ancestors); - let mut candidate_events: Vec<(Vec, BlockNumber)> = Vec::new(); + let mut candidate_events: Vec = Vec::new(); let mut on_chain_votes = Vec::new(); for (block_number, block_hash) in block_numbers.zip(block_hashes) { gum::trace!(?block_number, ?block_hash, "In ancestor processing."); let events_for_block = self.process_candidate_events(sender, block_number, block_hash).await?; - candidate_events.push((events_for_block, block_number)); + candidate_events.extend(events_for_block); if let Some(votes) = get_on_chain_votes(sender, block_hash).await? { on_chain_votes.push(votes); From 1cdc72dc63de962f586634896a18dcc0883137c6 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Wed, 16 Nov 2022 15:26:42 -0800 Subject: [PATCH 03/30] Added support for timestamp in changes --- node/core/dispute-coordinator/src/initialized.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index bbf564635c24..c39c82aaa5be 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -321,7 +321,7 @@ impl Initialized { } // Decrement spam slots for freshly backed or included candidates - self.handle_backed_included_events(ctx, overlay_db, candidate_events).await; + self.handle_backed_included_events(ctx, overlay_db, now, candidate_events).await; } Ok(()) @@ -1203,7 +1203,8 @@ impl Initialized { &mut self, ctx: &mut Context, overlay_db: &mut OverlayedBackend<'_, impl Backend>, - candidate_events: Vec + now: u64, + candidate_events: Vec, ) { let session = self.rolling_session_window.latest_session(); @@ -1250,7 +1251,7 @@ impl Initialized { if let Some(votes) = maybe_votes.map(CandidateVotes::from) { - let vote_state = CandidateVoteState::new(votes, &env); + let vote_state = CandidateVoteState::new(votes, &env, now); let has_own_vote = vote_state.has_own_vote(); let is_disputed = vote_state.is_disputed(); let is_included = self.scraper.is_candidate_included(&receipt.hash()); From e9d12733c62739f6b863fb1f3651f94ad7cafc04 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 18 Nov 2022 13:06:25 -0800 Subject: [PATCH 04/30] Some necessary refactoring --- .../dispute-coordinator/src/initialized.rs | 104 +++++++++--------- 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index c39c82aaa5be..0bfc85303246 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -1208,67 +1208,65 @@ impl Initialized { ) { let session = self.rolling_session_window.latest_session(); - // Populate events contents, which allow us to queue participation without - // duplicated code - let mut events_contents: Vec<(CandidateReceipt, ParticipationPriority)> = Vec::new(); for event in candidate_events { - match event { + // Filter out events we don't care about and repackage information + let maybe_event_contents = match event { CandidateEvent::CandidateBacked(receipt, _, _, _) => { - events_contents.push((receipt, ParticipationPriority::BestEffort)); - }, + Some((receipt, false)) + } CandidateEvent::CandidateIncluded(receipt, _, _, _) => { - events_contents.push((receipt, ParticipationPriority::Priority)); + Some((receipt, true)) } - _ => (), - } - } + _ => None + }; - for (receipt, priority) in events_contents { - // Clear spam slots - self.spam_slots.clear(&(session, receipt.hash())); - - // Queue participation for freshly backed candidate. If dispute - // is already in priority queue don't try to queue participation - // in best_effort. Don't try to participate if candidate receipt - // can't be recovered from vote state, if we already voted, or - // if the candidate is not disputed. - let env = - match CandidateEnvironment::new(&*self.keystore, &self.rolling_session_window, session) - { - None => { - gum::warn!( - target: LOG_TARGET, - session, - "We are lacking a `SessionInfo` for handling import of statements." - ); + if let Some((receipt, queue_participation)) = maybe_event_contents { + // Clear spam slots + self.spam_slots.clear(&(session, receipt.hash())); - continue // We skip queueing participation if not possible - }, - Some(env) => env, - }; - let votes_result = overlay_db.load_candidate_votes(session, &receipt.hash()); - if let Ok(maybe_votes) = votes_result { - if let Some(votes) = - maybe_votes.map(CandidateVotes::from) + // End after clearing spam slots for backing event. Participation + // is handled in process_on_chain_votes. + if !queue_participation { continue; } + + // Queue participation for freshly included candidate. Don't + // try to participate if candidate receipt can't be recovered + // from vote state, if we already voted, or if the candidate + // is not disputed. + let env = + match CandidateEnvironment::new(&*self.keystore, &self.rolling_session_window, session) { - let vote_state = CandidateVoteState::new(votes, &env, now); - let has_own_vote = vote_state.has_own_vote(); - let is_disputed = vote_state.is_disputed(); - let is_included = self.scraper.is_candidate_included(&receipt.hash()); - - if !has_own_vote && - is_disputed && - (priority == ParticipationPriority::Priority || !is_included) + None => { + gum::warn!( + target: LOG_TARGET, + session, + "We are lacking a `SessionInfo` for handling import of statements." + ); + + continue // We skip queueing participation if not possible + }, + Some(env) => env, + }; + let votes_result = overlay_db.load_candidate_votes(session, &receipt.hash()); + if let Ok(maybe_votes) = votes_result { + if let Some(votes) = + maybe_votes.map(CandidateVotes::from) { - let r = self - .participation - .queue_participation( - ctx, - priority, - ParticipationRequest::new(receipt, session), - ) - .await; - let _ = log_error(r); + let vote_state = CandidateVoteState::new(votes, &env, now); + let has_own_vote = vote_state.has_own_vote(); + let is_disputed = vote_state.is_disputed(); + + if !has_own_vote && is_disputed + { + let r = self + .participation + .queue_participation( + ctx, + ParticipationPriority::Priority, + ParticipationRequest::new(receipt, session), + ) + .await; + let _ = log_error(r); + } } } } From 4eebacd548655d88a064ef4daebaee8226cd33ba Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 18 Nov 2022 16:22:38 -0800 Subject: [PATCH 05/30] Removed SessionIndex from unconfirmed_disputes key --- .../dispute-coordinator/src/initialized.rs | 4 ++-- node/core/dispute-coordinator/src/lib.rs | 2 +- .../dispute-coordinator/src/spam_slots.rs | 19 +++++++++---------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 0bfc85303246..b2773f988bbf 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -849,7 +849,7 @@ impl Initialized { if !potential_spam { // Former spammers have not been spammers after all: - self.spam_slots.clear(&(session, candidate_hash)); + self.spam_slots.clear(&candidate_hash); // Potential spam: } else if !import_result.new_invalid_voters().is_empty() { @@ -1222,7 +1222,7 @@ impl Initialized { if let Some((receipt, queue_participation)) = maybe_event_contents { // Clear spam slots - self.spam_slots.clear(&(session, receipt.hash())); + self.spam_slots.clear(&receipt.hash()); // End after clearing spam slots for backing event. Participation // is handled in process_on_chain_votes. diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 09d6c621b999..04b20b02e22a 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -336,7 +336,7 @@ impl DisputeCoordinatorSubsystem { let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash()); if !status.is_confirmed_concluded() && !is_included { - unconfirmed_disputes.insert((session, *candidate_hash), voted_indices); + unconfirmed_disputes.insert(*candidate_hash, (session, voted_indices)); } // Participate for all non-concluded disputes which do not have a diff --git a/node/core/dispute-coordinator/src/spam_slots.rs b/node/core/dispute-coordinator/src/spam_slots.rs index c0619bf3a1a5..b5aa91e028bb 100644 --- a/node/core/dispute-coordinator/src/spam_slots.rs +++ b/node/core/dispute-coordinator/src/spam_slots.rs @@ -54,7 +54,7 @@ pub struct SpamSlots { } /// Unconfirmed disputes to be passed at initialization. -pub type UnconfirmedDisputes = HashMap<(SessionIndex, CandidateHash), BTreeSet>; +pub type UnconfirmedDisputes = HashMap)>; impl SpamSlots { /// Recover `SpamSlots` from state on startup. @@ -62,7 +62,7 @@ impl SpamSlots { /// Initialize based on already existing active disputes. pub fn recover_from_state(unconfirmed_disputes: UnconfirmedDisputes) -> Self { let mut slots: HashMap<(SessionIndex, ValidatorIndex), SpamCount> = HashMap::new(); - for ((session, _), validators) in unconfirmed_disputes.iter() { + for (_, (session, validators)) in unconfirmed_disputes.iter() { for validator in validators { let spam_vote_count = slots.entry((*session, *validator)).or_default(); *spam_vote_count += 1; @@ -97,9 +97,9 @@ impl SpamSlots { if *spam_vote_count >= MAX_SPAM_VOTES { return false } - let validators = self.unconfirmed.entry((session, candidate)).or_default(); + let validators = self.unconfirmed.entry(candidate).or_default(); - if validators.insert(validator) { + if validators.1.insert(validator) { // We only increment spam slots once per candidate, as each validator has to provide an // opposing vote for sending out its own vote. Therefore, receiving multiple votes for // a single candidate is expected and should not get punished here. @@ -114,14 +114,13 @@ impl SpamSlots { /// This effectively reduces the spam slot count for all validators participating in a dispute /// for that candidate. You should call this function once a dispute became obsolete or got /// confirmed and thus votes for it should no longer be treated as potential spam. - pub fn clear(&mut self, key: &(SessionIndex, CandidateHash)) { - if let Some(validators) = self.unconfirmed.remove(key) { - let (session, _) = key; + pub fn clear(&mut self, key: &CandidateHash) { + if let Some((session, validators)) = self.unconfirmed.remove(key) { for validator in validators { - if let Some(spam_vote_count) = self.slots.remove(&(*session, validator)) { + if let Some(spam_vote_count) = self.slots.remove(&(session,validator)) { let new = spam_vote_count - 1; if new > 0 { - self.slots.insert((*session, validator), new); + self.slots.insert((session, validator), new); } } } @@ -129,7 +128,7 @@ impl SpamSlots { } /// Prune all spam slots for sessions older than the given index. pub fn prune_old(&mut self, oldest_index: SessionIndex) { - self.unconfirmed.retain(|(session, _), _| *session >= oldest_index); + self.unconfirmed.retain(| _, (session, _)| *session >= oldest_index); self.slots.retain(|(session, _), _| *session >= oldest_index); } } From 45477d2e9294434e398dffbe4ccaabac68091517 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Sat, 19 Nov 2022 12:00:55 -0800 Subject: [PATCH 06/30] Removed duplicate logic in import statements --- node/core/dispute-coordinator/src/initialized.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index b2773f988bbf..42ba445ec142 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -832,9 +832,14 @@ impl Initialized { let is_included = self.scraper.is_candidate_included(&candidate_hash); let is_backed = self.scraper.is_candidate_backed(&candidate_hash); - + let has_own_vote = new_state.has_own_vote(); + let is_disputed = new_state.is_disputed(); + let has_controlled_indices = !env.controlled_indices().is_empty(); + let is_confirmed = new_state.is_confirmed(); let potential_spam = !is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote(); + // We participate only in disputes which are included, backed or confirmed + let allow_participation = is_included || is_backed || is_confirmed; gum::trace!( target: LOG_TARGET, @@ -876,14 +881,6 @@ impl Initialized { } } - let has_own_vote = new_state.has_own_vote(); - let is_disputed = new_state.is_disputed(); - let has_controlled_indices = !env.controlled_indices().is_empty(); - let is_backed = self.scraper.is_candidate_backed(&candidate_hash); - let is_confirmed = new_state.is_confirmed(); - // We participate only in disputes which are included, backed or confirmed - let allow_participation = is_included || is_backed || is_confirmed; - // Participate in dispute if we did not cast a vote before and actually have keys to cast a // local vote. Disputes should fall in one of the categories below, otherwise we will refrain // from participation: From 04b214c9d5f3f8a0e9d275b84fbc5172df79c784 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Sat, 19 Nov 2022 14:07:28 -0800 Subject: [PATCH 07/30] Replaced queue_participation call with re-prio --- .../dispute-coordinator/src/initialized.rs | 67 ++----------------- .../src/participation/mod.rs | 25 ++++++- .../src/participation/queues/mod.rs | 20 ++++++ 3 files changed, 51 insertions(+), 61 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 42ba445ec142..19ad45ef0029 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -271,6 +271,7 @@ impl Initialized { ) -> Result<()> { let (on_chain_votes, candidate_events) = self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; + self.participation.prioritize_newly_included(ctx, &candidate_events).await; self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { @@ -321,7 +322,7 @@ impl Initialized { } // Decrement spam slots for freshly backed or included candidates - self.handle_backed_included_events(ctx, overlay_db, now, candidate_events).await; + self.reduce_spam_on_backed_included(candidate_events); } Ok(()) @@ -1189,83 +1190,29 @@ impl Initialized { Ok(()) } - /// Firstly decrements spam slots for validators who voted on potential spam + /// Decrements spam slots for validators who voted on potential spam /// candidates that are newly backed or included, and therefore no longer /// potential spam. - /// - /// Secondly queues participation if needed for freshly backed or included - /// candidates. Backed candidates are added to best_effort queue while - /// included are added to priority queue. - async fn handle_backed_included_events( + fn reduce_spam_on_backed_included( &mut self, - ctx: &mut Context, - overlay_db: &mut OverlayedBackend<'_, impl Backend>, - now: u64, candidate_events: Vec, ) { - let session = self.rolling_session_window.latest_session(); for event in candidate_events { // Filter out events we don't care about and repackage information let maybe_event_contents = match event { CandidateEvent::CandidateBacked(receipt, _, _, _) => { - Some((receipt, false)) + Some(receipt) } CandidateEvent::CandidateIncluded(receipt, _, _, _) => { - Some((receipt, true)) + Some(receipt) } _ => None }; - if let Some((receipt, queue_participation)) = maybe_event_contents { + if let Some(receipt) = maybe_event_contents { // Clear spam slots self.spam_slots.clear(&receipt.hash()); - - // End after clearing spam slots for backing event. Participation - // is handled in process_on_chain_votes. - if !queue_participation { continue; } - - // Queue participation for freshly included candidate. Don't - // try to participate if candidate receipt can't be recovered - // from vote state, if we already voted, or if the candidate - // is not disputed. - let env = - match CandidateEnvironment::new(&*self.keystore, &self.rolling_session_window, session) - { - None => { - gum::warn!( - target: LOG_TARGET, - session, - "We are lacking a `SessionInfo` for handling import of statements." - ); - - continue // We skip queueing participation if not possible - }, - Some(env) => env, - }; - let votes_result = overlay_db.load_candidate_votes(session, &receipt.hash()); - if let Ok(maybe_votes) = votes_result { - if let Some(votes) = - maybe_votes.map(CandidateVotes::from) - { - let vote_state = CandidateVoteState::new(votes, &env, now); - let has_own_vote = vote_state.has_own_vote(); - let is_disputed = vote_state.is_disputed(); - - if !has_own_vote && is_disputed - { - let r = self - .participation - .queue_participation( - ctx, - ParticipationPriority::Priority, - ParticipationRequest::new(receipt, session), - ) - .await; - let _ = log_error(r); - } - } - } } } } diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 874f37e63213..a023479df574 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -31,7 +31,7 @@ use polkadot_node_subsystem::{ overseer, ActiveLeavesUpdate, RecoveryError, }; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; -use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; +use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex, CandidateEvent}; use crate::LOG_TARGET; @@ -212,6 +212,29 @@ impl Participation { Ok(()) } + /// Reprioritizes participation requests for disputes that are freshly included + pub async fn prioritize_newly_included(&mut self, ctx: &mut Context, events: &Vec) { + for event in events { + // Filter the incoming events list for candidate inclusions + let maybe_event_contents = match event { + CandidateEvent::CandidateIncluded(receipt, _, _, _) => { + Some(receipt) + } + _ => None + }; + + if let Some(receipt) = maybe_event_contents { + let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; + if let Err(queue_error) = r { + match queue_error { + QueueError::PriorityFull => return, // Avoid working through the rest of the vec + _ => (), + } + } + } + } + } + /// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached. async fn dequeue_until_capacity( &mut self, diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 643111e7bd75..784e2845c2c3 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -103,6 +103,8 @@ pub enum QueueError { BestEffortFull, #[error("Request could not be queued, because priority queue was already full.")] PriorityFull, + #[error("A comparator could not be generated for the given request.")] + CouldNotGenerateComparator, } impl ParticipationRequest { @@ -159,6 +161,24 @@ impl Queues { self.pop_best_effort().map(|d| d.1) } + /// Reprioritizes any participation requests pertaining to the + /// passed candidates from best effort to priority. + pub async fn prioritize_if_present( + &mut self, + sender: &mut impl overseer::DisputeCoordinatorSenderTrait, + receipt: &CandidateReceipt, + ) -> std::result::Result<(), QueueError>{ + if self.priority.len() >= PRIORITY_QUEUE_SIZE { + return Err(QueueError::PriorityFull) + } + + let comparator = CandidateComparator::new(sender, receipt).await.map_err(|_e| QueueError::CouldNotGenerateComparator)?; + if let Some(request) = self.best_effort.remove(&comparator){ + self.priority.insert(comparator, request); + } + Ok(()) + } + fn queue_with_comparator( &mut self, comparator: CandidateComparator, From 58f6371138fb525f9c8a572ecf82372aa4c9fec1 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 21 Nov 2022 10:41:20 -0600 Subject: [PATCH 08/30] Simplifying refactor. Backed were already handled --- .../dispute-coordinator/src/initialized.rs | 32 ++++++------------- .../src/participation/mod.rs | 26 +++++---------- .../dispute-coordinator/src/scraping/mod.rs | 18 ++++++----- 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 19ad45ef0029..1fc69a91b32b 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -269,9 +269,9 @@ impl Initialized { update: ActiveLeavesUpdate, now: u64, ) -> Result<()> { - let (on_chain_votes, candidate_events) = + let (on_chain_votes, included_receipts) = self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; - self.participation.prioritize_newly_included(ctx, &candidate_events).await; + self.participation.prioritize_newly_included(ctx, &included_receipts).await; self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { @@ -322,7 +322,7 @@ impl Initialized { } // Decrement spam slots for freshly backed or included candidates - self.reduce_spam_on_backed_included(candidate_events); + self.reduce_spam_from_included(&included_receipts); } Ok(()) @@ -1191,30 +1191,18 @@ impl Initialized { } /// Decrements spam slots for validators who voted on potential spam - /// candidates that are newly backed or included, and therefore no longer + /// candidates that are newly included, and therefore no longer /// potential spam. - fn reduce_spam_on_backed_included( + fn reduce_spam_from_included( &mut self, - candidate_events: Vec, + included_receipts: &Vec, ) { - for event in candidate_events { - // Filter out events we don't care about and repackage information - let maybe_event_contents = match event { - CandidateEvent::CandidateBacked(receipt, _, _, _) => { - Some(receipt) - } - CandidateEvent::CandidateIncluded(receipt, _, _, _) => { - Some(receipt) - } - _ => None - }; - - if let Some(receipt) = maybe_event_contents { - // Clear spam slots + included_receipts + .iter() + .for_each(|receipt| { self.spam_slots.clear(&receipt.hash()); - } - } + }); } } diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index a023479df574..718b6280b13f 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -31,7 +31,7 @@ use polkadot_node_subsystem::{ overseer, ActiveLeavesUpdate, RecoveryError, }; use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; -use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex, CandidateEvent}; +use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; use crate::LOG_TARGET; @@ -213,23 +213,13 @@ impl Participation { } /// Reprioritizes participation requests for disputes that are freshly included - pub async fn prioritize_newly_included(&mut self, ctx: &mut Context, events: &Vec) { - for event in events { - // Filter the incoming events list for candidate inclusions - let maybe_event_contents = match event { - CandidateEvent::CandidateIncluded(receipt, _, _, _) => { - Some(receipt) - } - _ => None - }; - - if let Some(receipt) = maybe_event_contents { - let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; - if let Err(queue_error) = r { - match queue_error { - QueueError::PriorityFull => return, // Avoid working through the rest of the vec - _ => (), - } + pub async fn prioritize_newly_included(&mut self, ctx: &mut Context, included_receipts: &Vec) { + for receipt in included_receipts { + let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; + if let Err(queue_error) = r { + match queue_error { + QueueError::PriorityFull => return, // Avoid working through the rest of the vec + _ => (), } } } diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 86641546043c..5df6d52026c1 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -26,7 +26,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util::runtime::{get_candidate_events, get_on_chain_votes}; use polkadot_primitives::v2::{ - BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes, + BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes, CandidateReceipt, }; use crate::{ @@ -137,7 +137,7 @@ impl ChainScraper { &mut self, sender: &mut Sender, update: &ActiveLeavesUpdate, - ) -> Result<(Vec, Vec)> + ) -> Result<(Vec, Vec)> where Sender: overseer::DisputeCoordinatorSenderTrait, { @@ -157,13 +157,13 @@ impl ChainScraper { let block_hashes = std::iter::once(activated.hash).chain(ancestors); - let mut candidate_events: Vec = Vec::new(); + let mut included_receipts: Vec = Vec::new(); let mut on_chain_votes = Vec::new(); for (block_number, block_hash) in block_numbers.zip(block_hashes) { gum::trace!(?block_number, ?block_hash, "In ancestor processing."); let events_for_block = self.process_candidate_events(sender, block_number, block_hash).await?; - candidate_events.extend(events_for_block); + included_receipts.extend(events_for_block); if let Some(votes) = get_on_chain_votes(sender, block_hash).await? { on_chain_votes.push(votes); @@ -172,7 +172,7 @@ impl ChainScraper { self.last_observed_blocks.put(activated.hash, ()); - Ok((on_chain_votes, candidate_events)) + Ok((on_chain_votes, included_receipts)) } /// Prune finalized candidates. @@ -203,13 +203,14 @@ impl ChainScraper { sender: &mut Sender, block_number: BlockNumber, block_hash: Hash, - ) -> Result> + ) -> Result> where Sender: overseer::DisputeCoordinatorSenderTrait, { let events = get_candidate_events(sender, block_hash).await?; + let mut included_receipts: Vec = Vec::new(); // Get included and backed events: - for ev in &events { + for ev in events { match ev { CandidateEvent::CandidateIncluded(receipt, _, _, _) => { let candidate_hash = receipt.hash(); @@ -220,6 +221,7 @@ impl ChainScraper { "Processing included event" ); self.included_candidates.insert(block_number, candidate_hash); + included_receipts.push(receipt); }, CandidateEvent::CandidateBacked(receipt, _, _, _) => { let candidate_hash = receipt.hash(); @@ -236,7 +238,7 @@ impl ChainScraper { }, } } - Ok(events) + Ok(included_receipts) } /// Returns ancestors of `head` in the descending order, stopping From efa28708315b21bb6c4d1573b3d704bba7ecbd88 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 21 Nov 2022 15:59:00 -0600 Subject: [PATCH 09/30] Removed unneeded spam slots logic --- .../dispute-coordinator/src/initialized.rs | 20 +------------------ node/core/dispute-coordinator/src/tests.rs | 2 +- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 1fc69a91b32b..c64c9968adee 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -42,7 +42,7 @@ use polkadot_node_subsystem_util::rolling_session_window::{ use polkadot_primitives::v2::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, CandidateEvent, + ValidDisputeStatementKind, ValidatorId, ValidatorIndex, }; use crate::{ @@ -320,9 +320,6 @@ impl Initialized { }, ); } - - // Decrement spam slots for freshly backed or included candidates - self.reduce_spam_from_included(&included_receipts); } Ok(()) @@ -1189,21 +1186,6 @@ impl Initialized { Ok(()) } - - /// Decrements spam slots for validators who voted on potential spam - /// candidates that are newly included, and therefore no longer - /// potential spam. - fn reduce_spam_from_included( - &mut self, - included_receipts: &Vec, - ) - { - included_receipts - .iter() - .for_each(|receipt| { - self.spam_slots.clear(&receipt.hash()); - }); - } } /// Messages to be handled in this subsystem. diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 4d32620946e0..a0d6c784aee0 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -1287,7 +1287,7 @@ fn backing_statements_import_works_and_no_spam() { }) .await; - // Result should be valid, because our node participated, so spam slots are cleared: + // Import should be valid, as spam slots were not filled assert_matches!(confirmation_rx.await, Ok(ImportStatementsResult::ValidImport)); virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; From c0ee1fe7cae264fb0d5167e283ac2336202611d3 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 21 Nov 2022 18:33:11 -0600 Subject: [PATCH 10/30] Implementers guide edits --- .../src/node/disputes/dispute-coordinator.md | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index 07fc647a711c..2b9f60dd4043 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -369,9 +369,7 @@ is either the dispute is completely made up or we are out of sync with the other nodes in terms of last finalized block. The former is very unlikely. If we are adding a dispute in best-effort it should already be either confirmed or the candidate is backed. In the latter case we will promote the dispute to the -priority queue once we learn about the new block. NOTE: this is still work in -progress and is tracked by [this issue] -(https://github.com/paritytech/polkadot/issues/5875). +priority queue once we learn about the new block. #### Import @@ -389,14 +387,26 @@ increment a counter for each signing participant of explicit `invalid` votes. What votes do we treat as a potential spam? A vote will increase a spam slot if and only if all of the following condidions are satisfied: * the candidate under dispute is not included on any chain +* the candidate under dispute is not backed on any chain * the dispute is not confirmed * we haven't casted a vote for the dispute +Whenever any vote on a dispute is imported these conditions are checked. If the +dispute is found not to be potential spam, then spam slots for the disputed candidate hash are cleared. This decrements the spam count for every validator +which had voted invalid. + +To keep spam slots from filling up unnecessarily we want to clear spam slots +whenever a candidate is seen to be backed or included. Fortunately this behavior +is acheived by clearing slots on vote import as described above. Because on chain +backing votes are processed when a block backing the disputed candidate is discovered, spam slots are cleared for every backed candidate. Included +candidates have typically also been seen as backed, so decrementing spam slots is +handled in that case as well. + The reason this works is because we only need to worry about actual dispute votes. Import of backing votes are already rate limited and concern only real -candidates for approval votes a similar argument holds (if they come from +candidates. For approval votes a similar argument holds (if they come from approval-voting), but we also don't import them until a dispute already -concluded. For actual dispute votes, we need two opposing votes, so there must be +concluded. For actual dispute votes we need two opposing votes, so there must be an explicit `invalid` vote in the import. Only a third of the validators can be malicious, so spam disk usage is limited to `2*vote_size*n/3*NUM_SPAM_SLOTS`, with `n` being the number of validators. @@ -499,16 +509,14 @@ We only ever care about disputes for candidates that have been included on at least some chain (became available). This is because the availability system was designed for precisely that: Only with inclusion (availability) we have guarantees about the candidate to actually be available. Because only then we -have guarantees that malicious backers can be reliably checked and slashed. The -system was also designed for non included candidates to not pose any threat to -the system. +have guarantees that malicious backers can be reliably checked and slashed. Also, by design non included candidates do not pose any threat to the system. One could think of an (additional) dispute system to make it possible to dispute any candidate that has been proposed by a validator, no matter whether it got successfully included or even backed. Unfortunately, it would be very brittle (no availability) and also spam protection would be way harder than for the -disputes handled by the dispute-coordinator. In fact all described spam handling -strategies above would simply be not available. +disputes handled by the dispute-coordinator. In fact, all the spam handling +strategies described above would simply be unavailable. It is worth thinking about who could actually raise such disputes anyway: Approval checkers certainly not, as they will only ever check once availability From 8b27aef43bc1a077ff764a15d08596f0f83e26b7 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 21 Nov 2022 21:37:30 -0600 Subject: [PATCH 11/30] Undid the spam slots refactor --- .../dispute-coordinator/src/initialized.rs | 2 +- node/core/dispute-coordinator/src/lib.rs | 2 +- .../dispute-coordinator/src/spam_slots.rs | 19 ++++++++++--------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index c64c9968adee..17b7b1e80716 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -852,7 +852,7 @@ impl Initialized { if !potential_spam { // Former spammers have not been spammers after all: - self.spam_slots.clear(&candidate_hash); + self.spam_slots.clear(&(session, candidate_hash)); // Potential spam: } else if !import_result.new_invalid_voters().is_empty() { diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 04b20b02e22a..09d6c621b999 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -336,7 +336,7 @@ impl DisputeCoordinatorSubsystem { let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash()); if !status.is_confirmed_concluded() && !is_included { - unconfirmed_disputes.insert(*candidate_hash, (session, voted_indices)); + unconfirmed_disputes.insert((session, *candidate_hash), voted_indices); } // Participate for all non-concluded disputes which do not have a diff --git a/node/core/dispute-coordinator/src/spam_slots.rs b/node/core/dispute-coordinator/src/spam_slots.rs index b5aa91e028bb..c0619bf3a1a5 100644 --- a/node/core/dispute-coordinator/src/spam_slots.rs +++ b/node/core/dispute-coordinator/src/spam_slots.rs @@ -54,7 +54,7 @@ pub struct SpamSlots { } /// Unconfirmed disputes to be passed at initialization. -pub type UnconfirmedDisputes = HashMap)>; +pub type UnconfirmedDisputes = HashMap<(SessionIndex, CandidateHash), BTreeSet>; impl SpamSlots { /// Recover `SpamSlots` from state on startup. @@ -62,7 +62,7 @@ impl SpamSlots { /// Initialize based on already existing active disputes. pub fn recover_from_state(unconfirmed_disputes: UnconfirmedDisputes) -> Self { let mut slots: HashMap<(SessionIndex, ValidatorIndex), SpamCount> = HashMap::new(); - for (_, (session, validators)) in unconfirmed_disputes.iter() { + for ((session, _), validators) in unconfirmed_disputes.iter() { for validator in validators { let spam_vote_count = slots.entry((*session, *validator)).or_default(); *spam_vote_count += 1; @@ -97,9 +97,9 @@ impl SpamSlots { if *spam_vote_count >= MAX_SPAM_VOTES { return false } - let validators = self.unconfirmed.entry(candidate).or_default(); + let validators = self.unconfirmed.entry((session, candidate)).or_default(); - if validators.1.insert(validator) { + if validators.insert(validator) { // We only increment spam slots once per candidate, as each validator has to provide an // opposing vote for sending out its own vote. Therefore, receiving multiple votes for // a single candidate is expected and should not get punished here. @@ -114,13 +114,14 @@ impl SpamSlots { /// This effectively reduces the spam slot count for all validators participating in a dispute /// for that candidate. You should call this function once a dispute became obsolete or got /// confirmed and thus votes for it should no longer be treated as potential spam. - pub fn clear(&mut self, key: &CandidateHash) { - if let Some((session, validators)) = self.unconfirmed.remove(key) { + pub fn clear(&mut self, key: &(SessionIndex, CandidateHash)) { + if let Some(validators) = self.unconfirmed.remove(key) { + let (session, _) = key; for validator in validators { - if let Some(spam_vote_count) = self.slots.remove(&(session,validator)) { + if let Some(spam_vote_count) = self.slots.remove(&(*session, validator)) { let new = spam_vote_count - 1; if new > 0 { - self.slots.insert((session, validator), new); + self.slots.insert((*session, validator), new); } } } @@ -128,7 +129,7 @@ impl SpamSlots { } /// Prune all spam slots for sessions older than the given index. pub fn prune_old(&mut self, oldest_index: SessionIndex) { - self.unconfirmed.retain(| _, (session, _)| *session >= oldest_index); + self.unconfirmed.retain(|(session, _), _| *session >= oldest_index); self.slots.retain(|(session, _), _| *session >= oldest_index); } } From 319ea05004c60197f9a7bfcecf6b6531edb718e2 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 22 Nov 2022 16:01:58 -0600 Subject: [PATCH 12/30] Added comments and implementers guide edit --- node/core/dispute-coordinator/src/initialized.rs | 7 ++++++- .../src/node/disputes/dispute-coordinator.md | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 17b7b1e80716..76da471d328f 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -417,6 +417,8 @@ impl Initialized { }) .collect(); + // Importantly, handling import statements for backing votes also + // clears spam slots for any newly backed candidates let import_result = self .handle_import_statements( ctx, @@ -850,8 +852,11 @@ impl Initialized { "Is spam?" ); + // This check is responsible for all clearing of spam slots. It runs + // whenever a vote is imported from on or off chain, and decrements + // slots whenever a candidate is newly backed, confirmed, or has our + // own vote. if !potential_spam { - // Former spammers have not been spammers after all: self.spam_slots.clear(&(session, candidate_hash)); // Potential spam: diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index 2b9f60dd4043..04c4da5a0d9c 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -399,8 +399,8 @@ To keep spam slots from filling up unnecessarily we want to clear spam slots whenever a candidate is seen to be backed or included. Fortunately this behavior is acheived by clearing slots on vote import as described above. Because on chain backing votes are processed when a block backing the disputed candidate is discovered, spam slots are cleared for every backed candidate. Included -candidates have typically also been seen as backed, so decrementing spam slots is -handled in that case as well. +candidates have also been seen as backed on the same fork, so decrementing spam +slots is handled in that case as well. The reason this works is because we only need to worry about actual dispute votes. Import of backing votes are already rate limited and concern only real From 26c9ebe6ef129aa4f96a0a7bd48b598dcd7d6c9c Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 25 Nov 2022 14:17:40 -0500 Subject: [PATCH 13/30] Added test for participation upon backing --- node/core/dispute-coordinator/src/tests.rs | 131 +++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index a0d6c784aee0..7d9ccc5fe64f 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3193,3 +3193,134 @@ fn participation_for_included_candidates() { }) }); } + +/// Shows that importing backing votes when a backing event is being processed +/// results in participation. +#[test] +fn local_participation_in_dispute_for_backed_candidate() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + let candidate_receipt = make_valid_candidate_receipt(); + let candidate_hash = candidate_receipt.hash(); + + // Step 1: Show that we don't participate when not backed, confirmed, or included + + // activate leaf - without candidate backed event + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![], + ) + .await; + + // generate two votes + let valid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(1), + candidate_hash, + session, + true, + ) + .await; + + let invalid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(2), + candidate_hash, + session, + false, + ) + .await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation: None, + }, + }) + .await; + + handle_approval_vote_request(&mut virtual_overseer, &candidate_hash, HashMap::new()) + .await; + + assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); + + // Step 2: Show that once backing votes are processed we participate + + // Activate leaf: With candidate backed event + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![make_candidate_backed_event(candidate_receipt.clone())], + ) + .await; + + let backing_valid = test_state + .issue_backing_statement_with_index(ValidatorIndex(3), candidate_hash, session).await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (backing_valid, ValidatorIndex(3)) + ], + pending_confirmation: None, + }, + }) + .await; + + participation_with_distribution( + &mut virtual_overseer, + &candidate_hash, + candidate_receipt.commitments_hash, + ) + .await; + + // Check for our 1 active dispute + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ActiveDisputes(tx), + }) + .await; + + assert_eq!(rx.await.unwrap().len(), 1); + + // check if we have participated (casted a vote) + let (tx, rx) = oneshot::channel(); + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::QueryCandidateVotes( + vec![(session, candidate_hash)], + tx, + ), + }) + .await; + + let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); + assert_eq!(votes.valid.len(), 3); // 3 => 1 initial vote, 1 backing vote, and our vote + assert_eq!(votes.invalid.len(), 1); + + // Wrap up + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + + test_state + }) + }); +} \ No newline at end of file From e1c356c8583a4af2aef1624d2392d0332e375a6c Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 1 Dec 2022 17:18:00 -0500 Subject: [PATCH 14/30] Round of fixes + ran fmt --- .../src/participation/mod.rs | 8 +++---- .../src/participation/queues/mod.rs | 2 +- .../dispute-coordinator/src/scraping/mod.rs | 3 ++- node/core/dispute-coordinator/src/tests.rs | 22 +++++++------------ 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 718b6280b13f..21c5f9e03a22 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -216,11 +216,9 @@ impl Participation { pub async fn prioritize_newly_included(&mut self, ctx: &mut Context, included_receipts: &Vec) { for receipt in included_receipts { let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; - if let Err(queue_error) = r { - match queue_error { - QueueError::PriorityFull => return, // Avoid working through the rest of the vec - _ => (), - } + if let Err(QueueError::PriorityFull) = r + { + return; // Avoid working through the rest of the vec } } } diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 784e2845c2c3..68f97d3c240f 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -67,7 +67,7 @@ pub struct ParticipationRequest { } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub enum ParticipationPriority { BestEffort, Priority, diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 5df6d52026c1..483b68de828c 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -132,7 +132,8 @@ impl ChainScraper { /// /// and updates current heads, so we can query candidates for all non finalized blocks. /// - /// Returns: On chain vote for the leaf and any ancestors we might not yet have seen. + /// Returns: On chain votes and included candidate receipts for the leaf and any + /// ancestors we might not yet have seen. pub async fn process_active_leaves_update( &mut self, sender: &mut Sender, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 7d9ccc5fe64f..6a749caa9525 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -331,7 +331,7 @@ impl TestState { // No queries, if subsystem knows about this session already. if self.known_session == Some(session) { - continue + continue; } self.known_session = Some(session); @@ -3208,15 +3208,10 @@ fn local_participation_in_dispute_for_backed_candidate() { let candidate_hash = candidate_receipt.hash(); // Step 1: Show that we don't participate when not backed, confirmed, or included - + // activate leaf - without candidate backed event test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![], - ) + .activate_leaf_at_session(&mut virtual_overseer, session, 1, vec![]) .await; // generate two votes @@ -3270,21 +3265,20 @@ fn local_participation_in_dispute_for_backed_candidate() { .await; let backing_valid = test_state - .issue_backing_statement_with_index(ValidatorIndex(3), candidate_hash, session).await; + .issue_backing_statement_with_index(ValidatorIndex(3), candidate_hash, session) + .await; virtual_overseer .send(FromOrchestra::Communication { msg: DisputeCoordinatorMessage::ImportStatements { candidate_receipt: candidate_receipt.clone(), session, - statements: vec![ - (backing_valid, ValidatorIndex(3)) - ], + statements: vec![(backing_valid, ValidatorIndex(3))], pending_confirmation: None, }, }) .await; - + participation_with_distribution( &mut virtual_overseer, &candidate_hash, @@ -3323,4 +3317,4 @@ fn local_participation_in_dispute_for_backed_candidate() { test_state }) }); -} \ No newline at end of file +} From a7615e51967f958eaa910ffb470f071ad72d8536 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 1 Dec 2022 17:27:33 -0500 Subject: [PATCH 15/30] Round of changes + fmt --- node/core/dispute-coordinator/src/initialized.rs | 10 +++++----- .../core/dispute-coordinator/src/participation/mod.rs | 11 +++++++---- .../src/participation/queues/mod.rs | 10 ++++++---- node/core/dispute-coordinator/src/scraping/mod.rs | 7 ++++--- node/core/dispute-coordinator/src/tests.rs | 2 +- 5 files changed, 23 insertions(+), 17 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 76da471d328f..acf596f62dc5 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -417,7 +417,7 @@ impl Initialized { }) .collect(); - // Importantly, handling import statements for backing votes also + // Importantly, handling import statements for backing votes also // clears spam slots for any newly backed candidates let import_result = self .handle_import_statements( @@ -836,8 +836,8 @@ impl Initialized { let is_disputed = new_state.is_disputed(); let has_controlled_indices = !env.controlled_indices().is_empty(); let is_confirmed = new_state.is_confirmed(); - let potential_spam = !is_included && !is_backed - && !new_state.is_confirmed() && !new_state.has_own_vote(); + let potential_spam = + !is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote(); // We participate only in disputes which are included, backed or confirmed let allow_participation = is_included || is_backed || is_confirmed; @@ -852,8 +852,8 @@ impl Initialized { "Is spam?" ); - // This check is responsible for all clearing of spam slots. It runs - // whenever a vote is imported from on or off chain, and decrements + // This check is responsible for all clearing of spam slots. It runs + // whenever a vote is imported from on or off chain, and decrements // slots whenever a candidate is newly backed, confirmed, or has our // own vote. if !potential_spam { diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 21c5f9e03a22..fec47ddbf021 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -213,12 +213,15 @@ impl Participation { } /// Reprioritizes participation requests for disputes that are freshly included - pub async fn prioritize_newly_included(&mut self, ctx: &mut Context, included_receipts: &Vec) { + pub async fn prioritize_newly_included( + &mut self, + ctx: &mut Context, + included_receipts: &Vec, + ) { for receipt in included_receipts { let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; - if let Err(QueueError::PriorityFull) = r - { - return; // Avoid working through the rest of the vec + if let Err(QueueError::PriorityFull) = r { + return // Avoid working through the rest of the vec } } } diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 68f97d3c240f..0fc446c1cdc1 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -164,16 +164,18 @@ impl Queues { /// Reprioritizes any participation requests pertaining to the /// passed candidates from best effort to priority. pub async fn prioritize_if_present( - &mut self, + &mut self, sender: &mut impl overseer::DisputeCoordinatorSenderTrait, receipt: &CandidateReceipt, - ) -> std::result::Result<(), QueueError>{ + ) -> std::result::Result<(), QueueError> { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } - let comparator = CandidateComparator::new(sender, receipt).await.map_err(|_e| QueueError::CouldNotGenerateComparator)?; - if let Some(request) = self.best_effort.remove(&comparator){ + let comparator = CandidateComparator::new(sender, receipt) + .await + .map_err(|_e| QueueError::CouldNotGenerateComparator)?; + if let Some(request) = self.best_effort.remove(&comparator) { self.priority.insert(comparator, request); } Ok(()) diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 483b68de828c..602743cc4406 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -26,7 +26,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util::runtime::{get_candidate_events, get_on_chain_votes}; use polkadot_primitives::v2::{ - BlockNumber, CandidateEvent, CandidateHash, Hash, ScrapedOnChainVotes, CandidateReceipt, + BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash, ScrapedOnChainVotes, }; use crate::{ @@ -132,7 +132,7 @@ impl ChainScraper { /// /// and updates current heads, so we can query candidates for all non finalized blocks. /// - /// Returns: On chain votes and included candidate receipts for the leaf and any + /// Returns: On chain votes and included candidate receipts for the leaf and any /// ancestors we might not yet have seen. pub async fn process_active_leaves_update( &mut self, @@ -163,7 +163,8 @@ impl ChainScraper { for (block_number, block_hash) in block_numbers.zip(block_hashes) { gum::trace!(?block_number, ?block_hash, "In ancestor processing."); - let events_for_block = self.process_candidate_events(sender, block_number, block_hash).await?; + let events_for_block = + self.process_candidate_events(sender, block_number, block_hash).await?; included_receipts.extend(events_for_block); if let Some(votes) = get_on_chain_votes(sender, block_hash).await? { diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 6a749caa9525..52294ac97f9d 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -331,7 +331,7 @@ impl TestState { // No queries, if subsystem knows about this session already. if self.known_session == Some(session) { - continue; + continue } self.known_session = Some(session); From 87cb5a24211993927e8e407c96993fd97dfe4909 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Sat, 3 Dec 2022 09:43:10 -0800 Subject: [PATCH 16/30] Error handling draft --- .../dispute-coordinator/src/initialized.rs | 9 ++-- .../src/participation/mod.rs | 15 +++++-- .../src/participation/queues/mod.rs | 15 ++++--- .../dispute-coordinator/src/scraping/mod.rs | 42 ++++++++++++++----- 4 files changed, 57 insertions(+), 24 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 8ebaeb1d71f2..3cec92d43cda 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -269,9 +269,12 @@ impl Initialized { update: ActiveLeavesUpdate, now: u64, ) -> Result<()> { - let (on_chain_votes, included_receipts) = + let scraped_updates = self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; - self.participation.prioritize_newly_included(ctx, &included_receipts).await; + let errors = self.participation.prioritize_newly_included(ctx, &scraped_updates.included_receipts).await; + for error in errors { + log_error(error)?; + } self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { @@ -309,7 +312,7 @@ impl Initialized { // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel. - for votes in on_chain_votes { + for votes in scraped_updates.on_chain_votes { let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err( |error| { gum::warn!( diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index acf8898f9bce..bd9da511d4f3 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -217,13 +217,22 @@ impl Participation { &mut self, ctx: &mut Context, included_receipts: &Vec, - ) { + ) -> Vec> { + let mut errors: Vec> = Vec::new(); for receipt in included_receipts { let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; - if let Err(QueueError::PriorityFull) = r { - return // Avoid working through the rest of the vec + match r { + Ok(priority_full) => { + if priority_full == true { + return errors // Avoid working through the rest of the vec + } + }, + Err(error) => { + errors.push(Err(error)); // Don't want to stop reprioritizing included receipts if just one fails + } } } + errors } /// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached. diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index c66ffb8ea3d8..9971d86719d3 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -103,8 +103,6 @@ pub enum QueueError { BestEffortFull, #[error("Request could not be queued, because priority queue was already full.")] PriorityFull, - #[error("A comparator could not be generated for the given request.")] - CouldNotGenerateComparator, } impl ParticipationRequest { @@ -163,22 +161,23 @@ impl Queues { /// Reprioritizes any participation requests pertaining to the /// passed candidates from best effort to priority. + /// + /// Returns: Either a bool telling the caller whether the priority queue is now full + /// or an error resulting from the failed creation of a comparator. pub async fn prioritize_if_present( &mut self, sender: &mut impl overseer::DisputeCoordinatorSenderTrait, receipt: &CandidateReceipt, - ) -> std::result::Result<(), QueueError> { + ) -> Result { if self.priority.len() >= PRIORITY_QUEUE_SIZE { - return Err(QueueError::PriorityFull) + return Ok(true) } - let comparator = CandidateComparator::new(sender, receipt) - .await - .map_err(|_e| QueueError::CouldNotGenerateComparator)?; + .await?; if let Some(request) = self.best_effort.remove(&comparator) { self.priority.insert(comparator, request); } - Ok(()) + Ok(false) } fn queue_with_comparator( diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 602743cc4406..81d8ad4e2323 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -51,6 +51,27 @@ const LRU_OBSERVED_BLOCKS_CAPACITY: NonZeroUsize = match NonZeroUsize::new(20) { None => panic!("Observed blocks cache size must be non-zero"), }; +/// ScrapedUpdates +/// +/// Updates to on_chain_votes and included receipts for new active leaf and its unprocessed +/// ancestors. +/// +/// on_chain_votes: New votes as seen on chain +/// included_receipts: Newly included parachain block candidate receipts as seen on chain +pub struct ScrapedUpdates { + pub on_chain_votes: Vec, + pub included_receipts: Vec, +} + +impl ScrapedUpdates { + pub fn new() -> Self { + Self { + on_chain_votes: Vec::new(), + included_receipts: Vec::new(), + } + } +} + /// Chain scraper /// /// Scrapes unfinalized chain in order to collect information from blocks. @@ -114,8 +135,8 @@ impl ChainScraper { }; let update = ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() }; - let (votes, _) = s.process_active_leaves_update(sender, &update).await?; - Ok((s, votes)) + let updates = s.process_active_leaves_update(sender, &update).await?; + Ok((s, updates.on_chain_votes)) } /// Check whether we have seen a candidate included on any chain. @@ -138,13 +159,13 @@ impl ChainScraper { &mut self, sender: &mut Sender, update: &ActiveLeavesUpdate, - ) -> Result<(Vec, Vec)> + ) -> Result where Sender: overseer::DisputeCoordinatorSenderTrait, { let activated = match update.activated.as_ref() { Some(activated) => activated, - None => return Ok((Vec::new(), Vec::new())), + None => return Ok(ScrapedUpdates::new()), }; // Fetch ancestry up to last finalized block. @@ -158,23 +179,22 @@ impl ChainScraper { let block_hashes = std::iter::once(activated.hash).chain(ancestors); - let mut included_receipts: Vec = Vec::new(); - let mut on_chain_votes = Vec::new(); + let mut scraped_updates = ScrapedUpdates::new(); for (block_number, block_hash) in block_numbers.zip(block_hashes) { gum::trace!(?block_number, ?block_hash, "In ancestor processing."); - let events_for_block = + let receipts_for_block = self.process_candidate_events(sender, block_number, block_hash).await?; - included_receipts.extend(events_for_block); + scraped_updates.included_receipts.extend(receipts_for_block); if let Some(votes) = get_on_chain_votes(sender, block_hash).await? { - on_chain_votes.push(votes); + scraped_updates.on_chain_votes.push(votes); } } self.last_observed_blocks.put(activated.hash, ()); - Ok((on_chain_votes, included_receipts)) + Ok(scraped_updates) } /// Prune finalized candidates. @@ -200,6 +220,8 @@ impl ChainScraper { /// Process candidate events of a block. /// /// Keep track of all included and backed candidates. + /// + /// Returns freshly included candidate receipts async fn process_candidate_events( &mut self, sender: &mut Sender, From 595fe63ab02a921cbc5864f2448cc2b7fb126076 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 5 Dec 2022 13:22:27 -0800 Subject: [PATCH 17/30] Changed errors to bubble up from reprioritization --- .../dispute-coordinator/src/initialized.rs | 9 +++++---- .../src/participation/mod.rs | 17 +++-------------- .../src/participation/queues/mod.rs | 19 +++++++++++++------ .../dispute-coordinator/src/scraping/mod.rs | 15 ++++++--------- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 3cec92d43cda..3f85eba05982 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -271,10 +271,11 @@ impl Initialized { ) -> Result<()> { let scraped_updates = self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; - let errors = self.participation.prioritize_newly_included(ctx, &scraped_updates.included_receipts).await; - for error in errors { - log_error(error)?; - } + log_error( + self.participation + .prioritize_newly_included(ctx, &scraped_updates.included_receipts) + .await, + )?; self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index bd9da511d4f3..ec46ca635d52 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -217,22 +217,11 @@ impl Participation { &mut self, ctx: &mut Context, included_receipts: &Vec, - ) -> Vec> { - let mut errors: Vec> = Vec::new(); + ) -> Result<()> { for receipt in included_receipts { - let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await; - match r { - Ok(priority_full) => { - if priority_full == true { - return errors // Avoid working through the rest of the vec - } - }, - Err(error) => { - errors.push(Err(error)); // Don't want to stop reprioritizing included receipts if just one fails - } - } + self.queue.prioritize_if_present(ctx.sender(), receipt).await?; } - errors + Ok(()) } /// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached. diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 9971d86719d3..00f40cae5806 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -161,23 +161,30 @@ impl Queues { /// Reprioritizes any participation requests pertaining to the /// passed candidates from best effort to priority. - /// + /// /// Returns: Either a bool telling the caller whether the priority queue is now full /// or an error resulting from the failed creation of a comparator. pub async fn prioritize_if_present( &mut self, sender: &mut impl overseer::DisputeCoordinatorSenderTrait, receipt: &CandidateReceipt, - ) -> Result { + ) -> Result<()> { + let comparator = CandidateComparator::new(sender, receipt).await?; + self.prioritize_with_comparator(comparator)?; + Ok(()) + } + + fn prioritize_with_comparator( + &mut self, + comparator: CandidateComparator, + ) -> std::result::Result<(), QueueError> { if self.priority.len() >= PRIORITY_QUEUE_SIZE { - return Ok(true) + return Err(QueueError::PriorityFull) } - let comparator = CandidateComparator::new(sender, receipt) - .await?; if let Some(request) = self.best_effort.remove(&comparator) { self.priority.insert(comparator, request); } - Ok(false) + Ok(()) } fn queue_with_comparator( diff --git a/node/core/dispute-coordinator/src/scraping/mod.rs b/node/core/dispute-coordinator/src/scraping/mod.rs index 81d8ad4e2323..3fe00f022744 100644 --- a/node/core/dispute-coordinator/src/scraping/mod.rs +++ b/node/core/dispute-coordinator/src/scraping/mod.rs @@ -52,10 +52,10 @@ const LRU_OBSERVED_BLOCKS_CAPACITY: NonZeroUsize = match NonZeroUsize::new(20) { }; /// ScrapedUpdates -/// -/// Updates to on_chain_votes and included receipts for new active leaf and its unprocessed +/// +/// Updates to on_chain_votes and included receipts for new active leaf and its unprocessed /// ancestors. -/// +/// /// on_chain_votes: New votes as seen on chain /// included_receipts: Newly included parachain block candidate receipts as seen on chain pub struct ScrapedUpdates { @@ -65,10 +65,7 @@ pub struct ScrapedUpdates { impl ScrapedUpdates { pub fn new() -> Self { - Self { - on_chain_votes: Vec::new(), - included_receipts: Vec::new(), - } + Self { on_chain_votes: Vec::new(), included_receipts: Vec::new() } } } @@ -220,8 +217,8 @@ impl ChainScraper { /// Process candidate events of a block. /// /// Keep track of all included and backed candidates. - /// - /// Returns freshly included candidate receipts + /// + /// Returns freshly included candidate receipts async fn process_candidate_events( &mut self, sender: &mut Sender, From 38c698618b4f5fc386e9b3d164ebec0f5025f298 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 5 Dec 2022 14:07:52 -0800 Subject: [PATCH 18/30] Starting to construct new test --- node/core/dispute-coordinator/src/tests.rs | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 52294ac97f9d..7241906e2bb1 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3318,3 +3318,78 @@ fn local_participation_in_dispute_for_backed_candidate() { }) }); } + +/// Shows that when a candidate_included event is scraped from the chain we +/// reprioritize any participation requests pertaining to that candidate. +/// This involves moving the request for this candidate from the best effort +/// queue to the priority queue. +#[test] +fn participation_requests_reprioritized_for_newly_included() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // Making our two candidates. Since participation is secondarily ordered + // by candidate hash, we take advantage of this to produce two candidates + // for which the initial participation order is known. But our plan is to + // upset this ordering by marking last_candidate as included. This should + // place last candidate in the priority queue, resulting in a different + // order of participation. + let first_candidate_receipt = make_valid_candidate_receipt(); + let first_candidate_hash = first_candidate_receipt.hash(); + let last_candidate_receipt = make_invalid_candidate_receipt(); + let last_candidate_hash = last_candidate_receipt.hash(); + + // We participate in least hash first. Make sure our hashes have the correct ordering + assert!(first_candidate_hash < last_candidate_hash); + + // activate leaf - without candidate included event + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 1, vec![]) + .await; + + // generate two votes per candidate + let first_valid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(1), + first_candidate_hash, + session, + true, + ) + .await; + + let first_invalid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(2), + first_candidate_hash, + session, + false, + ) + .await; + + let last_valid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(1), + last_candidate_hash, + session, + true, + ) + .await; + + let last_invalid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(2), + last_candidate_hash, + session, + false, + ) + .await; + + // Wrap up + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + + test_state + }) + }); +} From afd63f63c9bd29594aaa0fc729b58fbce34f202f Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 6 Dec 2022 11:24:00 -0800 Subject: [PATCH 19/30] Clarifying participation function rename --- node/core/dispute-coordinator/src/initialized.rs | 2 +- node/core/dispute-coordinator/src/participation/mod.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 3f85eba05982..dbf23fc32477 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -273,7 +273,7 @@ impl Initialized { self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; log_error( self.participation - .prioritize_newly_included(ctx, &scraped_updates.included_receipts) + .bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts) .await, )?; self.participation.process_active_leaves_update(ctx, &update).await?; diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index ec46ca635d52..787173283173 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -212,8 +212,9 @@ impl Participation { Ok(()) } - /// Reprioritizes participation requests for disputes that are freshly included - pub async fn prioritize_newly_included( + /// Moving any request concerning the given candidates from best-effort to + /// priority, ignoring any candidates that don't have any queued participation requests. + pub async fn bump_to_priority_for_candidates( &mut self, ctx: &mut Context, included_receipts: &Vec, From efd7088572468c4fa140077456c00b2801e1075f Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 9 Dec 2022 09:57:32 -0800 Subject: [PATCH 20/30] Reprio test draft --- .../src/participation/mod.rs | 3 + node/core/dispute-coordinator/src/tests.rs | 183 ++++++++++++++++-- 2 files changed, 167 insertions(+), 19 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 787173283173..bb701b81f4a7 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -51,7 +51,10 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; /// This should be a relatively low value, while we might have a speedup once we fetched the data, /// due to multi-core architectures, but the fetching itself can not be improved by parallel /// requests. This means that higher numbers make it harder for a single dispute to resolve fast. +#[cfg(not(test))] const MAX_PARALLEL_PARTICIPATIONS: usize = 3; +#[cfg(test)] +const MAX_PARALLEL_PARTICIPATIONS: usize = 1; /// Keep track of disputes we need to participate in. /// diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 7241906e2bb1..98eb58add5fa 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3330,23 +3330,47 @@ fn participation_requests_reprioritized_for_newly_included() { let session = 1; test_state.handle_resume_sync(&mut virtual_overseer, session).await; - // Making our two candidates. Since participation is secondarily ordered - // by candidate hash, we take advantage of this to produce two candidates - // for which the initial participation order is known. But our plan is to - // upset this ordering by marking last_candidate as included. This should - // place last candidate in the priority queue, resulting in a different - // order of participation. - let first_candidate_receipt = make_valid_candidate_receipt(); + // Making our three candidates. Since participation is secondarily ordered by + // candidate hash, we take advantage of this to produce candidates for which + // the initial participation order is known. But our plan is to upset this + // ordering by marking last_candidate as included. This should place last + // candidate in the priority queue, resulting in a different order of + // participation. We expect to trigger participation in the first candidate + // immediately on vote import. Then we manually trigger two more participations, + // expecting the altered ordering third, then second. + let mut first_candidate_receipt = make_valid_candidate_receipt(); + // Altering this receipt so its hash will be changed + first_candidate_receipt.descriptor.pov_hash = Hash::from([ + 122, 200, 116, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, + 60, 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 237, 187, 128, + ]); let first_candidate_hash = first_candidate_receipt.hash(); - let last_candidate_receipt = make_invalid_candidate_receipt(); - let last_candidate_hash = last_candidate_receipt.hash(); - - // We participate in least hash first. Make sure our hashes have the correct ordering - assert!(first_candidate_hash < last_candidate_hash); + let second_candidate_receipt = make_valid_candidate_receipt(); + let second_candidate_hash = second_candidate_receipt.hash(); + let mut third_candidate_receipt = make_valid_candidate_receipt(); + // Altering this receipt so its hash will be changed + third_candidate_receipt.descriptor.pov_hash = Hash::from([ + 122, 200, 117, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, + 60, 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 237, 187, 128, + ]); + let third_candidate_hash = third_candidate_receipt.hash(); + + // We participate in lesser hash first. Make sure our hashes have the correct ordering + assert!(first_candidate_hash < second_candidate_hash); + assert!(second_candidate_hash < third_candidate_hash); // activate leaf - without candidate included event test_state - .activate_leaf_at_session(&mut virtual_overseer, session, 1, vec![]) + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![ + make_candidate_backed_event(first_candidate_receipt.clone()), + make_candidate_backed_event(second_candidate_receipt.clone()), + make_candidate_backed_event(third_candidate_receipt.clone()), + ], + ) .await; // generate two votes per candidate @@ -3368,24 +3392,145 @@ fn participation_requests_reprioritized_for_newly_included() { ) .await; - let last_valid_vote = test_state + let second_valid_vote = test_state .issue_explicit_statement_with_index( - ValidatorIndex(1), - last_candidate_hash, + ValidatorIndex(3), + second_candidate_hash, session, true, ) .await; - let last_invalid_vote = test_state + let second_invalid_vote = test_state .issue_explicit_statement_with_index( - ValidatorIndex(2), - last_candidate_hash, + ValidatorIndex(4), + second_candidate_hash, + session, + false, + ) + .await; + + let third_valid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(5), + third_candidate_hash, + session, + true, + ) + .await; + + let third_invalid_vote = test_state + .issue_explicit_statement_with_index( + ValidatorIndex(6), + third_candidate_hash, session, false, ) .await; + // Importing votes for the three candidates in order + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: first_candidate_receipt.clone(), + session, + statements: vec![ + (first_valid_vote, ValidatorIndex(1)), + (first_invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation: None, + }, + }) + .await; + gum::debug!("After First import!"); + + handle_approval_vote_request( + &mut virtual_overseer, + &first_candidate_hash, + HashMap::new(), + ) + .await; + + participation_with_distribution( + &mut virtual_overseer, + &first_candidate_hash, + first_candidate_receipt.commitments_hash, + ) + .await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: second_candidate_receipt.clone(), + session, + statements: vec![ + (second_valid_vote, ValidatorIndex(3)), + (second_invalid_vote, ValidatorIndex(4)), + ], + pending_confirmation: None, + }, + }) + .await; + gum::debug!("After Second import!"); + + handle_approval_vote_request( + &mut virtual_overseer, + &second_candidate_hash, + HashMap::new(), + ) + .await; + + participation_with_distribution( + &mut virtual_overseer, + &second_candidate_hash, + second_candidate_receipt.commitments_hash, + ) + .await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: third_candidate_receipt.clone(), + session, + statements: vec![ + (third_valid_vote, ValidatorIndex(5)), + (third_invalid_vote, ValidatorIndex(6)), + ], + pending_confirmation: None, + }, + }) + .await; + + handle_approval_vote_request( + &mut virtual_overseer, + &third_candidate_hash, + HashMap::new(), + ) + .await; + + participation_with_distribution( + &mut virtual_overseer, + &third_candidate_hash, + third_candidate_receipt.commitments_hash, + ) + .await; + + // Issue candidate included event for third candidate. This should place + // the corresponding third participation request in the priority queue. + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![make_candidate_included_event(third_candidate_receipt.clone())], + ) + .timeout(TEST_TIMEOUT) + .await; + + assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); + + // Trigger participation by importing another vote + // Wrap up virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; From e9eb4e850bbdf7087a327a69cadcdf4998a2c200 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 13 Dec 2022 13:29:23 -0800 Subject: [PATCH 21/30] Very rough bump to priority queue test draft --- node/core/dispute-coordinator/src/tests.rs | 356 +++++++++------------ 1 file changed, 148 insertions(+), 208 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 8141f4e6fedd..545e605cfdf5 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -28,16 +28,19 @@ use futures::{ channel::oneshot, future::{self, BoxFuture}, }; +use futures_timer::Delay; use polkadot_node_subsystem_util::database::Database; use polkadot_node_primitives::{ - DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, + DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, APPROVAL_EXECUTION_TIMEOUT, + AvailableData, BlockData, PoV, ValidationResult, }; use polkadot_node_subsystem::{ messages::{ ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage, - DisputeDistributionMessage, ImportStatementsResult, + DisputeDistributionMessage, ImportStatementsResult, AvailabilityRecoveryMessage, + CandidateValidationMessage, }, overseer::FromOrchestra, OverseerSignal, @@ -50,7 +53,9 @@ use sp_core::{sr25519::Pair, testing::TaskExecutor, Pair as PairT}; use sp_keyring::Sr25519Keyring; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; -use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash}; +use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, + dummy_candidate_commitments +}; use polkadot_node_primitives::{Timestamp, ACTIVE_DURATION_SECS}; use polkadot_node_subsystem::{ jaeger, @@ -64,7 +69,8 @@ use polkadot_primitives::v2::{ ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec, MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, SigningContext, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, + ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, PersistedValidationData, + ValidationCode, }; use crate::{ @@ -3057,23 +3063,15 @@ fn local_participation_in_dispute_for_backed_candidate() { .await; // generate two votes - let valid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(1), - candidate_hash, - session, - true, - ) - .await; - - let invalid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(2), - candidate_hash, - session, - false, - ) - .await; + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(1), + ValidatorIndex(2), + candidate_hash, + session, + VoteType::Explicit, + ) + .await; virtual_overseer .send(FromOrchestra::Communication { @@ -3150,7 +3148,7 @@ fn local_participation_in_dispute_for_backed_candidate() { .await; let (_, _, votes) = rx.await.unwrap().get(0).unwrap().clone(); - assert_eq!(votes.valid.len(), 3); // 3 => 1 initial vote, 1 backing vote, and our vote + assert_eq!(votes.valid.raw().len(), 3); // 3 => 1 initial vote, 1 backing vote, and our vote assert_eq!(votes.invalid.len(), 1); // Wrap up @@ -3171,208 +3169,96 @@ fn participation_requests_reprioritized_for_newly_included() { Box::pin(async move { let session = 1; test_state.handle_resume_sync(&mut virtual_overseer, session).await; + let mut hash_info: HashMap = HashMap::new(); - // Making our three candidates. Since participation is secondarily ordered by - // candidate hash, we take advantage of this to produce candidates for which - // the initial participation order is known. But our plan is to upset this - // ordering by marking last_candidate as included. This should place last - // candidate in the priority queue, resulting in a different order of - // participation. We expect to trigger participation in the first candidate - // immediately on vote import. Then we manually trigger two more participations, - // expecting the altered ordering third, then second. - let mut first_candidate_receipt = make_valid_candidate_receipt(); - // Altering this receipt so its hash will be changed - first_candidate_receipt.descriptor.pov_hash = Hash::from([ - 122, 200, 116, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, - 60, 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 237, 187, 128, - ]); - let first_candidate_hash = first_candidate_receipt.hash(); - let second_candidate_receipt = make_valid_candidate_receipt(); - let second_candidate_hash = second_candidate_receipt.hash(); - let mut third_candidate_receipt = make_valid_candidate_receipt(); - // Altering this receipt so its hash will be changed - third_candidate_receipt.descriptor.pov_hash = Hash::from([ - 122, 200, 117, 29, 232, 183, 20, 109, 138, 86, 23, 253, 70, 41, 20, 85, 127, 230, - 60, 38, 90, 127, 28, 16, 231, 218, 227, 40, 88, 237, 187, 128, - ]); - let third_candidate_hash = third_candidate_receipt.hash(); - - // We participate in lesser hash first. Make sure our hashes have the correct ordering - assert!(first_candidate_hash < second_candidate_hash); - assert!(second_candidate_hash < third_candidate_hash); - - // activate leaf - without candidate included event - test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![ - make_candidate_backed_event(first_candidate_receipt.clone()), - make_candidate_backed_event(second_candidate_receipt.clone()), - make_candidate_backed_event(third_candidate_receipt.clone()), - ], - ) - .await; + for repetition in 1..=20u8 { + // Building candidate receipts + let mut candidate_receipt = make_valid_candidate_receipt(); + candidate_receipt.descriptor.pov_hash = Hash::from( + [repetition; 32], // Altering this receipt so its hash will be changed + ); + let candidate_hash = candidate_receipt.hash(); + hash_info.insert(candidate_hash, (repetition, candidate_receipt.clone())); - // generate two votes per candidate - let first_valid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(1), - first_candidate_hash, - session, - true, - ) - .await; + // Mark all candidates as backed, so their participation requests make it to best effort + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![make_candidate_backed_event(candidate_receipt.clone())], + ) + .await; + } - let first_invalid_vote = test_state - .issue_explicit_statement_with_index( + for repetition in 1..=20u8 { + // Building candidate receipts + let mut candidate_receipt = make_valid_candidate_receipt(); + candidate_receipt.descriptor.pov_hash = Hash::from( + [repetition; 32], // Altering this receipt so its hash will be changed + ); + let candidate_hash = candidate_receipt.hash(); + + // Create votes for candidates + let (valid_vote, invalid_vote) = generate_opposing_votes_pair( + &test_state, + ValidatorIndex(1), ValidatorIndex(2), - first_candidate_hash, - session, - false, - ) - .await; - - let second_valid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(3), - second_candidate_hash, - session, - true, - ) - .await; - - let second_invalid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(4), - second_candidate_hash, - session, - false, - ) - .await; - - let third_valid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(5), - third_candidate_hash, - session, - true, - ) - .await; - - let third_invalid_vote = test_state - .issue_explicit_statement_with_index( - ValidatorIndex(6), - third_candidate_hash, + candidate_hash, session, - false, + VoteType::Explicit, ) .await; - // Importing votes for the three candidates in order - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: first_candidate_receipt.clone(), - session, - statements: vec![ - (first_valid_vote, ValidatorIndex(1)), - (first_invalid_vote, ValidatorIndex(2)), - ], - pending_confirmation: None, - }, - }) - .await; - gum::debug!("After First import!"); - - handle_approval_vote_request( - &mut virtual_overseer, - &first_candidate_hash, - HashMap::new(), - ) - .await; - - participation_with_distribution( - &mut virtual_overseer, - &first_candidate_hash, - first_candidate_receipt.commitments_hash, - ) - .await; - - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: second_candidate_receipt.clone(), - session, - statements: vec![ - (second_valid_vote, ValidatorIndex(3)), - (second_invalid_vote, ValidatorIndex(4)), - ], - pending_confirmation: None, - }, - }) - .await; - gum::debug!("After Second import!"); - - handle_approval_vote_request( - &mut virtual_overseer, - &second_candidate_hash, - HashMap::new(), - ) - .await; + // Import votes for candidates + virtual_overseer + .send(FromOrchestra::Communication { + msg: DisputeCoordinatorMessage::ImportStatements { + candidate_receipt: candidate_receipt.clone(), + session, + statements: vec![ + (valid_vote, ValidatorIndex(1)), + (invalid_vote, ValidatorIndex(2)), + ], + pending_confirmation: None, + }, + }) + .await; - participation_with_distribution( - &mut virtual_overseer, - &second_candidate_hash, - second_candidate_receipt.commitments_hash, - ) - .await; + // Meant to help process approval vote messages, though there's a chance early dispute + // distribution messages will come back first. We wait a very short time as to make sure + // we can drain the queue of incoming messages without accidentally draining the best + // effort queue, which we want filled. + println!("Message handle inside loop"); + while let Some(message) = virtual_overseer.recv().timeout(Duration::from_millis(50)).await { + handle_next_overseer_message(&mut virtual_overseer, message, &hash_info).await; + } - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: third_candidate_receipt.clone(), - session, - statements: vec![ - (third_valid_vote, ValidatorIndex(5)), - (third_invalid_vote, ValidatorIndex(6)), - ], - pending_confirmation: None, - }, - }) - .await; + // Mark 15th candidate as included after import + /*if repetition == 15 { + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 1, + vec![make_candidate_included_event(candidate_receipt.clone())], + ) + .await; + }*/ + } - handle_approval_vote_request( - &mut virtual_overseer, - &third_candidate_hash, - HashMap::new(), - ) - .await; + println!("Left statement import loop"); + while let Some(message) = virtual_overseer.recv().timeout(TEST_TIMEOUT).await { + handle_next_overseer_message(&mut virtual_overseer, message, &hash_info).await; + } - participation_with_distribution( - &mut virtual_overseer, - &third_candidate_hash, - third_candidate_receipt.commitments_hash, - ) - .await; + // Somehow use CandidateValidation messages to check participation order - // Issue candidate included event for third candidate. This should place - // the corresponding third participation request in the priority queue. - test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![make_candidate_included_event(third_candidate_receipt.clone())], - ) - .timeout(TEST_TIMEOUT) - .await; + // Wait a long time for participation to finish + //Delay::new(Duration::from_millis(100000)).await; assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); - // Trigger participation by importing another vote - // Wrap up virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; @@ -3380,3 +3266,57 @@ fn participation_requests_reprioritized_for_newly_included() { }) }); } + +async fn handle_next_overseer_message( + virtual_overseer: &mut VirtualOverseer, + message: AllMessages, + hash_info: &HashMap, +) { + match message { + AllMessages::ApprovalVoting(ApprovalVotingMessage::GetApprovalSignaturesForCandidate( + hash, + tx, + )) => { + let (candidate_number, candidate_receipt) = hash_info + .get(&hash) + .expect("Message should always correspond to a candidate hash in the ordering."); + println!("Got approval voting messages for candidate: {}", candidate_number); + tx.send(HashMap::new()).unwrap(); + }, + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx) + ) => { + println!("Availability recovery"); + let pov_block = PoV { block_data: BlockData(Vec::new()) }; + let available_data = AvailableData { + pov: Arc::new(pov_block), + validation_data: PersistedValidationData::default(), + }; + + tx.send(Ok(available_data)).unwrap(); + }, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + hash, + RuntimeApiRequest::ValidationCodeByHash( + _, + tx, + ) + )) => { + println!("Request validation code"); + let validation_code = ValidationCode(Vec::new()); + + tx.send(Ok(Some(validation_code))).unwrap(); + }, + AllMessages::CandidateValidation( + CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) + ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { + println!("Candidate validation"); + tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); + }, + AllMessages::DisputeDistribution(DisputeDistributionMessage::SendDispute(msg)) => { + println!("Participation complete for: {}", hash_info.get(&msg.candidate_receipt().hash()) + .expect("Participation should always correspond to a candidate hash in the ordering.").0); + }, + _ => (), + } +} \ No newline at end of file From d49bd0bd406435b3f20d94542ccfa5298c80f4b1 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 15 Dec 2022 08:30:43 -0800 Subject: [PATCH 22/30] Improving logging --- node/core/dispute-coordinator/src/tests.rs | 28 +++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 545e605cfdf5..8f5ced8b4f13 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3229,9 +3229,10 @@ fn participation_requests_reprioritized_for_newly_included() { // distribution messages will come back first. We wait a very short time as to make sure // we can drain the queue of incoming messages without accidentally draining the best // effort queue, which we want filled. - println!("Message handle inside loop"); - while let Some(message) = virtual_overseer.recv().timeout(Duration::from_millis(50)).await { - handle_next_overseer_message(&mut virtual_overseer, message, &hash_info).await; + println!("Handling overseer messages after importing candidate: {}", repetition); + while let Some(message) = virtual_overseer.recv().timeout(Duration::from_millis(100)).await { + // Possibly pick out messages of type get_approval_signatures and handle them first + handle_next_overseer_message(message, &hash_info).await; } // Mark 15th candidate as included after import @@ -3249,7 +3250,7 @@ fn participation_requests_reprioritized_for_newly_included() { println!("Left statement import loop"); while let Some(message) = virtual_overseer.recv().timeout(TEST_TIMEOUT).await { - handle_next_overseer_message(&mut virtual_overseer, message, &hash_info).await; + handle_next_overseer_message(message, &hash_info).await; } // Somehow use CandidateValidation messages to check participation order @@ -3268,7 +3269,6 @@ fn participation_requests_reprioritized_for_newly_included() { } async fn handle_next_overseer_message( - virtual_overseer: &mut VirtualOverseer, message: AllMessages, hash_info: &HashMap, ) { @@ -3277,16 +3277,19 @@ async fn handle_next_overseer_message( hash, tx, )) => { - let (candidate_number, candidate_receipt) = hash_info + let (candidate_number, _) = hash_info .get(&hash) .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Got approval voting messages for candidate: {}", candidate_number); + println!("Got approval votes for candidate: {}", candidate_number); tx.send(HashMap::new()).unwrap(); }, AllMessages::AvailabilityRecovery( - AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx) + AvailabilityRecoveryMessage::RecoverAvailableData(candidate_receipt, _, _, tx) ) => { - println!("Availability recovery"); + let (candidate_number, _) = hash_info + .get(&candidate_receipt.hash()) + .expect("Message should always correspond to a candidate hash in the ordering."); + println!("Availability recovery for candidate: {}", candidate_number); let pov_block = PoV { block_data: BlockData(Vec::new()) }; let available_data = AvailableData { pov: Arc::new(pov_block), @@ -3296,7 +3299,7 @@ async fn handle_next_overseer_message( tx.send(Ok(available_data)).unwrap(); }, AllMessages::RuntimeApi(RuntimeApiMessage::Request( - hash, + _, RuntimeApiRequest::ValidationCodeByHash( _, tx, @@ -3310,7 +3313,10 @@ async fn handle_next_overseer_message( AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { - println!("Candidate validation"); + let (candidate_number, _) = hash_info + .get(&&candidate_receipt.hash()) + .expect("Message should always correspond to a candidate hash in the ordering."); + println!("Validation triggered for candidate: {}", candidate_number); tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); }, AllMessages::DisputeDistribution(DisputeDistributionMessage::SendDispute(msg)) => { From 7546335cfc2dc8cc86b245a667196b1710f16c1f Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 15 Dec 2022 08:59:42 -0800 Subject: [PATCH 23/30] Most concise reproduction of error on third import --- node/core/dispute-coordinator/src/tests.rs | 128 +++------------------ 1 file changed, 13 insertions(+), 115 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 8f5ced8b4f13..690d45acbd55 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -28,19 +28,16 @@ use futures::{ channel::oneshot, future::{self, BoxFuture}, }; -use futures_timer::Delay; use polkadot_node_subsystem_util::database::Database; use polkadot_node_primitives::{ - DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, APPROVAL_EXECUTION_TIMEOUT, - AvailableData, BlockData, PoV, ValidationResult, + DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, }; use polkadot_node_subsystem::{ messages::{ ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage, - DisputeDistributionMessage, ImportStatementsResult, AvailabilityRecoveryMessage, - CandidateValidationMessage, + DisputeDistributionMessage, ImportStatementsResult, }, overseer::FromOrchestra, OverseerSignal, @@ -53,9 +50,7 @@ use sp_core::{sr25519::Pair, testing::TaskExecutor, Pair as PairT}; use sp_keyring::Sr25519Keyring; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; -use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, - dummy_candidate_commitments -}; +use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, }; use polkadot_node_primitives::{Timestamp, ACTIVE_DURATION_SECS}; use polkadot_node_subsystem::{ jaeger, @@ -69,8 +64,7 @@ use polkadot_primitives::v2::{ ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec, MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, SigningContext, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, PersistedValidationData, - ValidationCode, + ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, }; use crate::{ @@ -3169,35 +3163,32 @@ fn participation_requests_reprioritized_for_newly_included() { Box::pin(async move { let session = 1; test_state.handle_resume_sync(&mut virtual_overseer, session).await; - let mut hash_info: HashMap = HashMap::new(); + let mut receipts: Vec = Vec::new(); - for repetition in 1..=20u8 { + for repetition in 1..=3u8 { // Building candidate receipts let mut candidate_receipt = make_valid_candidate_receipt(); candidate_receipt.descriptor.pov_hash = Hash::from( [repetition; 32], // Altering this receipt so its hash will be changed ); - let candidate_hash = candidate_receipt.hash(); - hash_info.insert(candidate_hash, (repetition, candidate_receipt.clone())); + receipts.push(candidate_receipt.clone()); - // Mark all candidates as backed, so their participation requests make it to best effort + // Mark all candidates as backed, so their participation requests make it to best effort. + // These calls must all occur before including the candidates due to test overseer + // oddities. test_state .activate_leaf_at_session( &mut virtual_overseer, session, 1, - vec![make_candidate_backed_event(candidate_receipt.clone())], + vec![make_candidate_backed_event(candidate_receipt)], ) .await; } - for repetition in 1..=20u8 { - // Building candidate receipts - let mut candidate_receipt = make_valid_candidate_receipt(); - candidate_receipt.descriptor.pov_hash = Hash::from( - [repetition; 32], // Altering this receipt so its hash will be changed - ); + for candidate_receipt in receipts { let candidate_hash = candidate_receipt.hash(); + println!("Import of candidate with hash: {}", candidate_hash); // Create votes for candidates let (valid_vote, invalid_vote) = generate_opposing_votes_pair( @@ -3224,105 +3215,12 @@ fn participation_requests_reprioritized_for_newly_included() { }, }) .await; - - // Meant to help process approval vote messages, though there's a chance early dispute - // distribution messages will come back first. We wait a very short time as to make sure - // we can drain the queue of incoming messages without accidentally draining the best - // effort queue, which we want filled. - println!("Handling overseer messages after importing candidate: {}", repetition); - while let Some(message) = virtual_overseer.recv().timeout(Duration::from_millis(100)).await { - // Possibly pick out messages of type get_approval_signatures and handle them first - handle_next_overseer_message(message, &hash_info).await; - } - - // Mark 15th candidate as included after import - /*if repetition == 15 { - test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![make_candidate_included_event(candidate_receipt.clone())], - ) - .await; - }*/ - } - - println!("Left statement import loop"); - while let Some(message) = virtual_overseer.recv().timeout(TEST_TIMEOUT).await { - handle_next_overseer_message(message, &hash_info).await; } - // Somehow use CandidateValidation messages to check participation order - - // Wait a long time for participation to finish - //Delay::new(Duration::from_millis(100000)).await; - - assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); - // Wrap up virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; test_state }) }); -} - -async fn handle_next_overseer_message( - message: AllMessages, - hash_info: &HashMap, -) { - match message { - AllMessages::ApprovalVoting(ApprovalVotingMessage::GetApprovalSignaturesForCandidate( - hash, - tx, - )) => { - let (candidate_number, _) = hash_info - .get(&hash) - .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Got approval votes for candidate: {}", candidate_number); - tx.send(HashMap::new()).unwrap(); - }, - AllMessages::AvailabilityRecovery( - AvailabilityRecoveryMessage::RecoverAvailableData(candidate_receipt, _, _, tx) - ) => { - let (candidate_number, _) = hash_info - .get(&candidate_receipt.hash()) - .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Availability recovery for candidate: {}", candidate_number); - let pov_block = PoV { block_data: BlockData(Vec::new()) }; - let available_data = AvailableData { - pov: Arc::new(pov_block), - validation_data: PersistedValidationData::default(), - }; - - tx.send(Ok(available_data)).unwrap(); - }, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _, - RuntimeApiRequest::ValidationCodeByHash( - _, - tx, - ) - )) => { - println!("Request validation code"); - let validation_code = ValidationCode(Vec::new()); - - tx.send(Ok(Some(validation_code))).unwrap(); - }, - AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { - let (candidate_number, _) = hash_info - .get(&&candidate_receipt.hash()) - .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Validation triggered for candidate: {}", candidate_number); - tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); - }, - AllMessages::DisputeDistribution(DisputeDistributionMessage::SendDispute(msg)) => { - println!("Participation complete for: {}", hash_info.get(&msg.candidate_receipt().hash()) - .expect("Participation should always correspond to a candidate hash in the ordering.").0); - }, - _ => (), - } } \ No newline at end of file From 16e35c8a045c9674c97288c07b169b60c62d7449 Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Fri, 16 Dec 2022 17:37:34 +0200 Subject: [PATCH 24/30] Add `handle_approval_vote_request` --- node/core/dispute-coordinator/src/tests.rs | 55 ++++++++++++++++++++-- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 690d45acbd55..10ea68c07b04 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -50,7 +50,7 @@ use sp_core::{sr25519::Pair, testing::TaskExecutor, Pair as PairT}; use sp_keyring::Sr25519Keyring; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; -use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, }; +use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash}; use polkadot_node_primitives::{Timestamp, ACTIVE_DURATION_SECS}; use polkadot_node_subsystem::{ jaeger, @@ -3174,22 +3174,25 @@ fn participation_requests_reprioritized_for_newly_included() { receipts.push(candidate_receipt.clone()); // Mark all candidates as backed, so their participation requests make it to best effort. - // These calls must all occur before including the candidates due to test overseer + // These calls must all occur before including the candidates due to test overseer // oddities. test_state .activate_leaf_at_session( &mut virtual_overseer, session, + // TODO: This is in a loop, so we'll generate active leaves update multiple times for + // the same block num. I don't think this is okay, better batch all events together + // and do a single active leaves update 1, vec![make_candidate_backed_event(candidate_receipt)], ) .await; } - for candidate_receipt in receipts { + for candidate_receipt in receipts.iter() { let candidate_hash = candidate_receipt.hash(); println!("Import of candidate with hash: {}", candidate_hash); - + // Create votes for candidates let (valid_vote, invalid_vote) = generate_opposing_votes_pair( &test_state, @@ -3215,12 +3218,54 @@ fn participation_requests_reprioritized_for_newly_included() { }, }) .await; + + // Send approval votes to unblock import + handle_approval_vote_request( + &mut virtual_overseer, + &candidate_hash, + HashMap::new(), + ) + .await; } + // Generate included event for one of the candidates here + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 2, + vec![make_candidate_included_event( + receipts.last().expect("There is more than one candidate").clone(), + )], + ) + .await; + + // Unblock participation and verify ordering + participation_with_distribution( + &mut virtual_overseer, + &receipts.first().expect("There is more than one candidate").hash(), + receipts.first().expect("There is more than one candidate").commitments_hash, + ) + .await; + + participation_with_distribution( + &mut virtual_overseer, + &receipts.last().expect("There is more than one candidate").hash(), + receipts.last().expect("There is more than one candidate").commitments_hash, + ) + .await; + + // if we see this -> the ordering is correct. We might need to do participating for the 2nd + // candidate in order to exit cleanly. + println!("Horray"); + + // Remove this when the test is ready + std::thread::sleep(std::time::Duration::from_secs(180)); + // Wrap up virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; test_state }) }); -} \ No newline at end of file +} From 2954a96b97f7d4e148af773cee2daa7bc19fed47 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 16 Dec 2022 14:18:08 -0800 Subject: [PATCH 25/30] Removing reprioritization on included event test --- node/core/dispute-coordinator/src/tests.rs | 182 +-------------------- 1 file changed, 4 insertions(+), 178 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 8f5ced8b4f13..a3e02d3db553 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -28,19 +28,16 @@ use futures::{ channel::oneshot, future::{self, BoxFuture}, }; -use futures_timer::Delay; use polkadot_node_subsystem_util::database::Database; use polkadot_node_primitives::{ - DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, APPROVAL_EXECUTION_TIMEOUT, - AvailableData, BlockData, PoV, ValidationResult, + DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, }; use polkadot_node_subsystem::{ messages::{ ApprovalVotingMessage, ChainApiMessage, DisputeCoordinatorMessage, - DisputeDistributionMessage, ImportStatementsResult, AvailabilityRecoveryMessage, - CandidateValidationMessage, + DisputeDistributionMessage, ImportStatementsResult, }, overseer::FromOrchestra, OverseerSignal, @@ -53,9 +50,7 @@ use sp_core::{sr25519::Pair, testing::TaskExecutor, Pair as PairT}; use sp_keyring::Sr25519Keyring; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; -use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, - dummy_candidate_commitments -}; +use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, }; use polkadot_node_primitives::{Timestamp, ACTIVE_DURATION_SECS}; use polkadot_node_subsystem::{ jaeger, @@ -69,8 +64,7 @@ use polkadot_primitives::v2::{ ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec, MultiDisputeStatementSet, ScrapedOnChainVotes, SessionIndex, SessionInfo, SigningContext, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, PersistedValidationData, - ValidationCode, + ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorSignature, }; use crate::{ @@ -3157,172 +3151,4 @@ fn local_participation_in_dispute_for_backed_candidate() { test_state }) }); -} - -/// Shows that when a candidate_included event is scraped from the chain we -/// reprioritize any participation requests pertaining to that candidate. -/// This involves moving the request for this candidate from the best effort -/// queue to the priority queue. -#[test] -fn participation_requests_reprioritized_for_newly_included() { - test_harness(|mut test_state, mut virtual_overseer| { - Box::pin(async move { - let session = 1; - test_state.handle_resume_sync(&mut virtual_overseer, session).await; - let mut hash_info: HashMap = HashMap::new(); - - for repetition in 1..=20u8 { - // Building candidate receipts - let mut candidate_receipt = make_valid_candidate_receipt(); - candidate_receipt.descriptor.pov_hash = Hash::from( - [repetition; 32], // Altering this receipt so its hash will be changed - ); - let candidate_hash = candidate_receipt.hash(); - hash_info.insert(candidate_hash, (repetition, candidate_receipt.clone())); - - // Mark all candidates as backed, so their participation requests make it to best effort - test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![make_candidate_backed_event(candidate_receipt.clone())], - ) - .await; - } - - for repetition in 1..=20u8 { - // Building candidate receipts - let mut candidate_receipt = make_valid_candidate_receipt(); - candidate_receipt.descriptor.pov_hash = Hash::from( - [repetition; 32], // Altering this receipt so its hash will be changed - ); - let candidate_hash = candidate_receipt.hash(); - - // Create votes for candidates - let (valid_vote, invalid_vote) = generate_opposing_votes_pair( - &test_state, - ValidatorIndex(1), - ValidatorIndex(2), - candidate_hash, - session, - VoteType::Explicit, - ) - .await; - - // Import votes for candidates - virtual_overseer - .send(FromOrchestra::Communication { - msg: DisputeCoordinatorMessage::ImportStatements { - candidate_receipt: candidate_receipt.clone(), - session, - statements: vec![ - (valid_vote, ValidatorIndex(1)), - (invalid_vote, ValidatorIndex(2)), - ], - pending_confirmation: None, - }, - }) - .await; - - // Meant to help process approval vote messages, though there's a chance early dispute - // distribution messages will come back first. We wait a very short time as to make sure - // we can drain the queue of incoming messages without accidentally draining the best - // effort queue, which we want filled. - println!("Handling overseer messages after importing candidate: {}", repetition); - while let Some(message) = virtual_overseer.recv().timeout(Duration::from_millis(100)).await { - // Possibly pick out messages of type get_approval_signatures and handle them first - handle_next_overseer_message(message, &hash_info).await; - } - - // Mark 15th candidate as included after import - /*if repetition == 15 { - test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - 1, - vec![make_candidate_included_event(candidate_receipt.clone())], - ) - .await; - }*/ - } - - println!("Left statement import loop"); - while let Some(message) = virtual_overseer.recv().timeout(TEST_TIMEOUT).await { - handle_next_overseer_message(message, &hash_info).await; - } - - // Somehow use CandidateValidation messages to check participation order - - // Wait a long time for participation to finish - //Delay::new(Duration::from_millis(100000)).await; - - assert_matches!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await, None); - - // Wrap up - virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - - test_state - }) - }); -} - -async fn handle_next_overseer_message( - message: AllMessages, - hash_info: &HashMap, -) { - match message { - AllMessages::ApprovalVoting(ApprovalVotingMessage::GetApprovalSignaturesForCandidate( - hash, - tx, - )) => { - let (candidate_number, _) = hash_info - .get(&hash) - .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Got approval votes for candidate: {}", candidate_number); - tx.send(HashMap::new()).unwrap(); - }, - AllMessages::AvailabilityRecovery( - AvailabilityRecoveryMessage::RecoverAvailableData(candidate_receipt, _, _, tx) - ) => { - let (candidate_number, _) = hash_info - .get(&candidate_receipt.hash()) - .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Availability recovery for candidate: {}", candidate_number); - let pov_block = PoV { block_data: BlockData(Vec::new()) }; - let available_data = AvailableData { - pov: Arc::new(pov_block), - validation_data: PersistedValidationData::default(), - }; - - tx.send(Ok(available_data)).unwrap(); - }, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _, - RuntimeApiRequest::ValidationCodeByHash( - _, - tx, - ) - )) => { - println!("Request validation code"); - let validation_code = ValidationCode(Vec::new()); - - tx.send(Ok(Some(validation_code))).unwrap(); - }, - AllMessages::CandidateValidation( - CandidateValidationMessage::ValidateFromExhaustive(_, _, candidate_receipt, _, timeout, tx) - ) if timeout == APPROVAL_EXECUTION_TIMEOUT => { - let (candidate_number, _) = hash_info - .get(&&candidate_receipt.hash()) - .expect("Message should always correspond to a candidate hash in the ordering."); - println!("Validation triggered for candidate: {}", candidate_number); - tx.send(Ok(ValidationResult::Valid(dummy_candidate_commitments(None), PersistedValidationData::default()))).unwrap(); - }, - AllMessages::DisputeDistribution(DisputeDistributionMessage::SendDispute(msg)) => { - println!("Participation complete for: {}", hash_info.get(&msg.candidate_receipt().hash()) - .expect("Participation should always correspond to a candidate hash in the ordering.").0); - }, - _ => (), - } } \ No newline at end of file From b3b3d8a6fa4e7d8ec7147f9697c95d987cbccd25 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 16 Dec 2022 14:24:07 -0800 Subject: [PATCH 26/30] Removing unneeded test config --- node/core/dispute-coordinator/src/participation/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index bb701b81f4a7..787173283173 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -51,10 +51,7 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; /// This should be a relatively low value, while we might have a speedup once we fetched the data, /// due to multi-core architectures, but the fetching itself can not be improved by parallel /// requests. This means that higher numbers make it harder for a single dispute to resolve fast. -#[cfg(not(test))] const MAX_PARALLEL_PARTICIPATIONS: usize = 3; -#[cfg(test)] -const MAX_PARALLEL_PARTICIPATIONS: usize = 1; /// Keep track of disputes we need to participate in. /// From 72ae47a5526c7b3460cb3f094690621c9f2548fd Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 16 Dec 2022 14:27:32 -0800 Subject: [PATCH 27/30] cargo fmt --- node/core/dispute-coordinator/src/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index a3e02d3db553..4215773071ed 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -50,7 +50,7 @@ use sp_core::{sr25519::Pair, testing::TaskExecutor, Pair as PairT}; use sp_keyring::Sr25519Keyring; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; -use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash, }; +use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash}; use polkadot_node_primitives::{Timestamp, ACTIVE_DURATION_SECS}; use polkadot_node_subsystem::{ jaeger, @@ -3151,4 +3151,4 @@ fn local_participation_in_dispute_for_backed_candidate() { test_state }) }); -} \ No newline at end of file +} From e38a933dc09ab2a4e0a87857ea024f5cb68036dd Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Wed, 21 Dec 2022 15:34:50 +0200 Subject: [PATCH 28/30] Test works --- .../src/participation/mod.rs | 2 +- node/core/dispute-coordinator/src/tests.rs | 133 ++++++++++++++---- 2 files changed, 108 insertions(+), 27 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index bb701b81f4a7..7167bc7e26e8 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -54,7 +54,7 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; #[cfg(not(test))] const MAX_PARALLEL_PARTICIPATIONS: usize = 3; #[cfg(test)] -const MAX_PARALLEL_PARTICIPATIONS: usize = 1; +pub(crate) const MAX_PARALLEL_PARTICIPATIONS: usize = 1; /// Keep track of disputes we need to participate in. /// diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 10ea68c07b04..ee18e35e706a 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -169,6 +169,7 @@ struct TestState { config: Config, clock: MockClock, headers: HashMap, + block_num_to_header: HashMap, last_block: Hash, // last session the subsystem knows about. known_session: Option, @@ -225,6 +226,8 @@ impl Default for TestState { let mut headers = HashMap::new(); let _ = headers.insert(last_block, genesis_header.clone()); + let mut block_num_to_header = HashMap::new(); + let _ = block_num_to_header.insert(genesis_header.number, last_block); TestState { validators: validators.into_iter().map(|(pair, _)| pair).collect(), @@ -236,6 +239,7 @@ impl Default for TestState { config, clock: MockClock::default(), headers, + block_num_to_header, last_block, known_session: None, } @@ -262,6 +266,7 @@ impl TestState { let block_hash = block_header.hash(); let _ = self.headers.insert(block_hash, block_header.clone()); + let _ = self.block_num_to_header.insert(block_header.number, block_hash); self.last_block = block_hash; gum::debug!(?block_number, "Activating block in activate_leaf_at_session."); @@ -390,6 +395,27 @@ impl TestState { ); finished_steps.got_scraping_information = true; tx.send(Ok(0)).unwrap(); + + // If the activated block number is > 1 the scraper will ask for block ancestors. Handle this case. + if block_number > 1 { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::Ancestors{ + hash, + k, + response_channel, + }) => { + assert_eq!(hash, block_hash); // A bit restrictive, remove if it causes problems. + let target_header = self.headers.get(&hash).expect("The function is called for this block so it should exist"); + let mut response = Vec::new(); + for i in target_header.number.saturating_sub(k as u32)..target_header.number { + response.push(self.block_num_to_header.get(&i).expect("headers and block_num_to_header should always be in sync").clone()); + } + let _ = response_channel.send(Ok(response)); + } + ); + } + assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -580,7 +606,34 @@ fn test_harness(test: F) -> TestState where F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>, { - TestState::default().resume(test) + let mut test_state = TestState::default(); + + // Add two more blocks after the genesis (which is created in `default()`) + let h1 = Header { + parent_hash: test_state.last_block.clone(), + number: 1, + digest: dummy_digest(), + state_root: dummy_hash(), + extrinsics_root: dummy_hash(), + }; + let h1_hash = h1.hash(); + test_state.headers.insert(h1_hash.clone(), h1); + test_state.block_num_to_header.insert(1, h1_hash.clone()); + test_state.last_block = h1_hash; + + let h2 = Header { + parent_hash: test_state.last_block.clone(), + number: 2, + digest: dummy_digest(), + state_root: dummy_hash(), + extrinsics_root: dummy_hash(), + }; + let h2_hash = h2.hash(); + test_state.headers.insert(h2_hash.clone(), h2); + test_state.block_num_to_header.insert(2, h2_hash.clone()); + test_state.last_block = h2_hash; + + test_state.resume(test) } /// Handle participation messages. @@ -648,6 +701,18 @@ pub async fn handle_approval_vote_request( ); } +/// Handle block number request. In the context of these tests this message is required for +/// handling comparator creation for enqueuing participations. +async fn handle_get_block_number(ctx_handle: &mut VirtualOverseer, test_state: &TestState) { + assert_matches!( + ctx_handle.recv().await, + AllMessages::ChainApi( + ChainApiMessage::BlockNumber(hash, tx)) => { + tx.send(Ok(test_state.headers.get(&hash).map(|r| r.number))).unwrap(); + } + ) +} + #[test] fn too_many_unconfirmed_statements_are_considered_spam() { test_harness(|mut test_state, mut virtual_overseer| { @@ -3165,31 +3230,32 @@ fn participation_requests_reprioritized_for_newly_included() { test_state.handle_resume_sync(&mut virtual_overseer, session).await; let mut receipts: Vec = Vec::new(); + // Generate all receipts for repetition in 1..=3u8 { // Building candidate receipts let mut candidate_receipt = make_valid_candidate_receipt(); candidate_receipt.descriptor.pov_hash = Hash::from( [repetition; 32], // Altering this receipt so its hash will be changed ); + // Set consecutive parents (starting from zero). They will order the candidates for participation. + let parent_block_num: BlockNumber = repetition as BlockNumber - 1; + candidate_receipt.descriptor.relay_parent = + test_state.block_num_to_header.get(&parent_block_num).unwrap().clone(); receipts.push(candidate_receipt.clone()); + } - // Mark all candidates as backed, so their participation requests make it to best effort. - // These calls must all occur before including the candidates due to test overseer - // oddities. - test_state - .activate_leaf_at_session( - &mut virtual_overseer, - session, - // TODO: This is in a loop, so we'll generate active leaves update multiple times for - // the same block num. I don't think this is okay, better batch all events together - // and do a single active leaves update - 1, - vec![make_candidate_backed_event(candidate_receipt)], - ) - .await; + // Mark all candidates as backed, so their participation requests make it to best effort. + // These calls must all occur before including the candidates due to test overseer + // oddities. + let mut candidate_events = Vec::new(); + for r in receipts.iter() { + candidate_events.push(make_candidate_backed_event(r.clone())) } + test_state + .activate_leaf_at_session(&mut virtual_overseer, session, 1, candidate_events) + .await; - for candidate_receipt in receipts.iter() { + for (idx, candidate_receipt) in receipts.iter().enumerate() { let candidate_hash = candidate_receipt.hash(); println!("Import of candidate with hash: {}", candidate_hash); @@ -3219,13 +3285,22 @@ fn participation_requests_reprioritized_for_newly_included() { }) .await; - // Send approval votes to unblock import + // Handle corresponding messages to unblock import + // we need to handle `ApprovalVotingMessage::GetApprovalSignaturesForCandidate` for import handle_approval_vote_request( &mut virtual_overseer, &candidate_hash, HashMap::new(), ) .await; + + // We'll trigger participation for the first `MAX_PARALLEL_PARTICIPATIONS` candidates. + // The rest will be queued => we need to handle `ChainApiMessage::BlockNumber` for them. + if idx >= crate::participation::MAX_PARALLEL_PARTICIPATIONS { + // We send the `idx` as parent block number, because it is used for ordering. + // This way we get predictable ordering and participation. + handle_get_block_number(&mut virtual_overseer, &test_state).await; + } } // Generate included event for one of the candidates here @@ -3240,27 +3315,33 @@ fn participation_requests_reprioritized_for_newly_included() { ) .await; - // Unblock participation and verify ordering + // NB: The checks below are a bit racy. In theory candidate 2 can be processed even before candidate 0 and this is okay. If any + // of the asserts in the two functions after this comment fail -> rework `participation_with_distribution` to expect a set of + // commitment hashes instead of just one. + + // This is the candidate for which participation was started initially (`MAX_PARALLEL_PARTICIPATIONS` threshold was not yet hit) participation_with_distribution( &mut virtual_overseer, - &receipts.first().expect("There is more than one candidate").hash(), + &receipts.get(0).expect("There is more than one candidate").hash(), receipts.first().expect("There is more than one candidate").commitments_hash, ) .await; + // This one should have been prioritized participation_with_distribution( &mut virtual_overseer, - &receipts.last().expect("There is more than one candidate").hash(), + &receipts.get(2).expect("There is more than one candidate").hash(), receipts.last().expect("There is more than one candidate").commitments_hash, ) .await; - // if we see this -> the ordering is correct. We might need to do participating for the 2nd - // candidate in order to exit cleanly. - println!("Horray"); - - // Remove this when the test is ready - std::thread::sleep(std::time::Duration::from_secs(180)); + // And this is the last one + participation_with_distribution( + &mut virtual_overseer, + &receipts.get(1).expect("There is more than one candidate").hash(), + receipts.first().expect("There is more than one candidate").commitments_hash, + ) + .await; // Wrap up virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; From b53035caa55a4c9a59fe35cbc9d9b5cfa68aeaaf Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 22 Dec 2022 09:25:48 -0800 Subject: [PATCH 29/30] Fixing final nits --- node/core/dispute-coordinator/src/participation/queues/mod.rs | 3 --- node/core/dispute-coordinator/src/tests.rs | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 00f40cae5806..3452470efcb5 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -161,9 +161,6 @@ impl Queues { /// Reprioritizes any participation requests pertaining to the /// passed candidates from best effort to priority. - /// - /// Returns: Either a bool telling the caller whether the priority queue is now full - /// or an error resulting from the failed creation of a comparator. pub async fn prioritize_if_present( &mut self, sender: &mut impl overseer::DisputeCoordinatorSenderTrait, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 4215773071ed..15b332767d5e 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3013,7 +3013,7 @@ fn participation_for_included_candidates() { assert_eq!(rx.await.unwrap().len(), 1); - // check if we have participated (casted a vote) + // check if we have participated (cast a vote) let (tx, rx) = oneshot::channel(); virtual_overseer .send(FromOrchestra::Communication { From d7f025026d3dddf1d2ce0f1324299500504b977b Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 22 Dec 2022 12:55:03 -0800 Subject: [PATCH 30/30] Tweaks to test Tsveto figured out --- node/core/dispute-coordinator/src/participation/mod.rs | 1 + node/core/dispute-coordinator/src/tests.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 61f6d41f4fdd..7167bc7e26e8 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -51,6 +51,7 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; /// This should be a relatively low value, while we might have a speedup once we fetched the data, /// due to multi-core architectures, but the fetching itself can not be improved by parallel /// requests. This means that higher numbers make it harder for a single dispute to resolve fast. +#[cfg(not(test))] const MAX_PARALLEL_PARTICIPATIONS: usize = 3; #[cfg(test)] pub(crate) const MAX_PARALLEL_PARTICIPATIONS: usize = 1; diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 9d6429a24170..023c95d5e23c 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -3257,7 +3257,6 @@ fn participation_requests_reprioritized_for_newly_included() { for (idx, candidate_receipt) in receipts.iter().enumerate() { let candidate_hash = candidate_receipt.hash(); - println!("Import of candidate with hash: {}", candidate_hash); // Create votes for candidates let (valid_vote, invalid_vote) = generate_opposing_votes_pair(