From c5289f03f3cae49418a3ab2ed8b45c295f6aa172 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 19 Jan 2022 20:04:23 +0000 Subject: [PATCH 01/26] Dont error in finality_target_with_longest_chain Signed-off-by: Andrei Sandu --- node/service/src/relay_chain_selection.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 56789e2aa60a..ff8096135d88 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -522,10 +522,25 @@ where std::any::type_name::(), ) .await; - let (subchain_number, subchain_head) = rx - .await - .map_err(Error::DetermineUndisputedChainCanceled) - .map_err(|e| ConsensusError::Other(Box::new(e)))?; + + // Try to fetch response from `dispute-coordinator`. If an error occurs we just log it + // and return `target_hash` as maximal vote. It is safer to contain this error here + // and not push it up the stack to cause additional issues in GRANDPA/BABE. + let (subchain_number, subchain_head) = + match rx.await.map_err(Error::DetermineUndisputedChainCanceled) { + Ok((lag, subchain_head)) => (lag, subchain_head), + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + "Call to `DetermineUndisputedChain` failed: {}", + e + ); + // We need to return a sane finality target. But, we are unable to ensure we are not + // finalizing something that is being disputed or has been concluded as invalid. We will be + // conservative here and not vote for finality above the ancestor passed in. + return Ok(target_hash) + }, + }; // The the total lag accounting for disputes. let lag_disputes = initial_leaf_number.saturating_sub(subchain_number); From badb22e137b0eebfe5a58a20e098f474dfb1952d Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 19 Jan 2022 20:19:36 +0000 Subject: [PATCH 02/26] fix Signed-off-by: Andrei Sandu --- node/service/src/relay_chain_selection.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index ff8096135d88..959e1c67fbc5 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -526,9 +526,15 @@ where // Try to fetch response from `dispute-coordinator`. If an error occurs we just log it // and return `target_hash` as maximal vote. It is safer to contain this error here // and not push it up the stack to cause additional issues in GRANDPA/BABE. - let (subchain_number, subchain_head) = + let (lag, subchain_head) = match rx.await.map_err(Error::DetermineUndisputedChainCanceled) { - Ok((lag, subchain_head)) => (lag, subchain_head), + // If request succeded we will receive (block number, block hash). + Ok((subchain_number, subchain_head)) => { + // The the total lag accounting for disputes. + let lag_disputes = initial_leaf_number.saturating_sub(subchain_number); + self.metrics.note_disputes_finality_lag(lag_disputes); + (lag_disputes, subchain_head) + }, Err(e) => { tracing::warn!( target: LOG_TARGET, @@ -541,11 +547,7 @@ where return Ok(target_hash) }, }; - - // The the total lag accounting for disputes. - let lag_disputes = initial_leaf_number.saturating_sub(subchain_number); - self.metrics.note_disputes_finality_lag(lag_disputes); - (lag_disputes, subchain_head) + (lag, subchain_head) } else { (lag, subchain_head) }; From f64d52c851d6013ed42a642245455461eb327cb6 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 19 Jan 2022 20:37:03 +0000 Subject: [PATCH 03/26] Add error flag Signed-off-by: Andrei Sandu --- .../dispute-coordinator/src/real/initialized.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 5b9f6d9f06c1..8af4d706929b 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -80,6 +80,7 @@ pub struct Initialized { ordering_provider: OrderingProvider, participation_receiver: WorkerMessageReceiver, metrics: Metrics, + error: bool, } impl Initialized { @@ -105,6 +106,7 @@ impl Initialized { participation, participation_receiver, metrics, + error: false, } } @@ -245,22 +247,23 @@ impl Initialized { .await?; self.participation.process_active_leaves_update(ctx, &update).await?; - let new_activations = update.activated.into_iter().map(|a| a.hash); - for new_leaf in new_activations { + if let Some(new_leaf) = update.activated { + let new_leaf = new_leaf.hash; match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await { Err(e) => { tracing::warn!( - target: LOG_TARGET, - err = ?e, - "Failed to update session cache for disputes", + target: LOG_TARGET, + err = ?e, + "Failed to update session cache for disputes", ); - continue + self.error = true; }, Ok(SessionWindowUpdate::Advanced { new_window_end: window_end, new_window_start, .. }) => { + self.error = false; let session = window_end; if self.highest_session < session { tracing::trace!( From 7e54dfe31abef96c7ece324abb452ca06eacd46f Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 12:19:14 +0000 Subject: [PATCH 04/26] Add error flag in dispute-coordinator Make sure to send errors to subsystems requesting data depending on missing session info Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 90 ++++++++++++------- .../src/rolling_session_window.rs | 6 +- 2 files changed, 59 insertions(+), 37 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 8af4d706929b..587f82c249e6 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -37,7 +37,7 @@ use polkadot_node_subsystem::{ overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SubsystemContext, }; use polkadot_node_subsystem_util::rolling_session_window::{ - RollingSessionWindow, SessionWindowUpdate, + RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable, }; use polkadot_primitives::{ v1::{ @@ -48,11 +48,12 @@ use polkadot_primitives::{ v2::SessionInfo, }; -use crate::{metrics::Metrics, real::DisputeCoordinatorSubsystem, LOG_TARGET}; - use crate::{ - error::{log_error, Fatal, FatalResult, NonFatal, NonFatalResult, Result}, + error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result}, + metrics::Metrics, + real::DisputeCoordinatorSubsystem, status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, + LOG_TARGET, }; use super::{ @@ -80,7 +81,9 @@ pub struct Initialized { ordering_provider: OrderingProvider, participation_receiver: WorkerMessageReceiver, metrics: Metrics, - error: bool, + // This tracks only rolling session window failures. + // It can be a `Vec` if the need to track more arises. + error: Option, } impl Initialized { @@ -106,7 +109,7 @@ impl Initialized { participation, participation_receiver, metrics, - error: false, + error: None, } } @@ -256,14 +259,14 @@ impl Initialized { err = ?e, "Failed to update session cache for disputes", ); - self.error = true; + self.error = Some(e); }, Ok(SessionWindowUpdate::Advanced { new_window_end: window_end, new_window_start, .. }) => { - self.error = false; + self.error = None; let session = window_end; if self.highest_session < session { tracing::trace!( @@ -536,31 +539,45 @@ impl Initialized { } }, DisputeCoordinatorMessage::RecentDisputes(tx) => { - let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); - let _ = tx.send(recent_disputes.keys().cloned().collect()); + // Return error if session information is missing. + if let Some(subsystem_error) = self.error.clone() { + return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + } else { + let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); + let _ = tx.send(recent_disputes.keys().cloned().collect()); + } }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { - let recent_disputes = - overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter(); - let _ = - tx.send(get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect()); + if let Some(subsystem_error) = self.error.clone() { + return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + } else { + let recent_disputes = + overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter(); + let _ = tx.send( + get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect(), + ); + } }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { - let mut query_output = Vec::new(); - for (session_index, candidate_hash) in query.into_iter() { - if let Some(v) = - overlay_db.load_candidate_votes(session_index, &candidate_hash)? - { - query_output.push((session_index, candidate_hash, v.into())); - } else { - tracing::debug!( - target: LOG_TARGET, - session_index, - "No votes found for candidate", - ); + if let Some(subsystem_error) = self.error.clone() { + return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + } else { + let mut query_output = Vec::new(); + for (session_index, candidate_hash) in query.into_iter() { + if let Some(v) = + overlay_db.load_candidate_votes(session_index, &candidate_hash)? + { + query_output.push((session_index, candidate_hash, v.into())); + } else { + tracing::debug!( + target: LOG_TARGET, + session_index, + "No votes found for candidate", + ); + } } + let _ = tx.send(query_output); } - let _ = tx.send(query_output); }, DisputeCoordinatorMessage::IssueLocalStatement( session, @@ -584,14 +601,19 @@ impl Initialized { block_descriptions, tx, } => { - let undisputed_chain = determine_undisputed_chain( - overlay_db, - base_number, - base_hash, - block_descriptions, - )?; + // Return error if session information is missing. + if let Some(subsystem_error) = self.error.clone() { + return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + } else { + let undisputed_chain = determine_undisputed_chain( + overlay_db, + base_number, + base_hash, + block_descriptions, + )?; - let _ = tx.send(undisputed_chain); + let _ = tx.send(undisputed_chain); + } }, } diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 95fb633349c1..1898555455d2 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -34,7 +34,7 @@ use polkadot_node_subsystem::{ use thiserror::Error; /// Sessions unavailable in state to cache. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum SessionsUnavailableKind { /// Runtime API subsystem was unavailable. RuntimeApiUnavailable(oneshot::Canceled), @@ -45,7 +45,7 @@ pub enum SessionsUnavailableKind { } /// Information about the sessions being fetched. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SessionsUnavailableInfo { /// The desired window start. pub window_start: SessionIndex, @@ -56,7 +56,7 @@ pub struct SessionsUnavailableInfo { } /// Sessions were unavailable to fetch from the state for some reason. -#[derive(Debug, Error)] +#[derive(Debug, Error, Clone)] pub struct SessionsUnavailable { /// The error kind. kind: SessionsUnavailableKind, From 94ea7be8b0a0cf6f42426176a2b57b2ba3777d2d Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 15:56:32 +0000 Subject: [PATCH 05/26] Scrape ancestors Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 47 +++++++++++++++++-- .../src/real/ordering/mod.rs | 22 ++++----- 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 587f82c249e6..e8e7732ab3e0 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -22,6 +22,7 @@ use futures::{ channel::{mpsc, oneshot}, FutureExt, StreamExt, }; +use lru::LruCache; use sc_keystore::LocalKeystore; @@ -67,6 +68,9 @@ use super::{ OverlayedBackend, }; +const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 20; +const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20; + /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming @@ -84,6 +88,8 @@ pub struct Initialized { // This tracks only rolling session window failures. // It can be a `Vec` if the need to track more arises. error: Option, + /// Latest relay blocks that have been succesfully scraped. + last_scraped_blocks: LruCache, } impl Initialized { @@ -110,6 +116,7 @@ impl Initialized { participation_receiver, metrics, error: None, + last_scraped_blocks: LruCache::new(LRU_SCRAPED_BLOCKS_CAPACITY), } } @@ -251,8 +258,7 @@ impl Initialized { self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { - let new_leaf = new_leaf.hash; - match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf).await { + match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf.hash).await { Err(e) => { tracing::warn!( target: LOG_TARGET, @@ -283,7 +289,32 @@ impl Initialized { }, Ok(SessionWindowUpdate::Unchanged) => {}, }; - self.scrape_on_chain_votes(ctx, overlay_db, new_leaf, now).await?; + + // Scrape the head if above rolling session update went well. + if self.error.is_none() { + self.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now).await?; + } + + // Try to scrape any blocks for which we could not get the current session or did not receive an + // active leaves update. Notice the `+1`, that is because `get_block_ancestors()` doesn't + // include the head and target block in the response. + let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS + 1); + let ancestors = OrderingProvider::get_block_ancestors(ctx.sender(), new_leaf.hash, new_leaf.number, target_block, &mut self.last_scraped_blocks) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + "Skipping leaf ancestors scraping due to error: {}", + err + ); + Vec::new() + }); + + // Maybe run these in parallel ? + for ancestor in ancestors.iter() { + self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await?; + } + } Ok(()) @@ -299,6 +330,11 @@ impl Initialized { new_leaf: Hash, now: u64, ) -> Result<()> { + // Avoid scraping twice. + if self.last_scraped_blocks.get(&new_leaf).is_some() { + return Ok(()) + } + // obtain the concluded disputes as well as the candidate backing votes // from the new leaf let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = { @@ -337,6 +373,8 @@ impl Initialized { }; if backing_validators_per_candidate.is_empty() && disputes.is_empty() { + // Mark this block as scraped succesfully, still we did not get anything out of this block. + self.last_scraped_blocks.put(new_leaf, ()); return Ok(()) } @@ -419,6 +457,7 @@ impl Initialized { } if disputes.is_empty() { + self.last_scraped_blocks.put(new_leaf, ()); return Ok(()) } @@ -496,6 +535,8 @@ impl Initialized { "Attempted import of on-chain statement of concluded dispute failed"), } } + + self.last_scraped_blocks.put(new_leaf, ()); Ok(()) } diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index c6f1a89904af..fa1763e7dd41 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -185,8 +185,9 @@ impl OrderingProvider { ) -> Result<()> { if let Some(activated) = update.activated.as_ref() { // Fetch ancestors of the activated leaf. - let ancestors = self - .get_block_ancestors(sender, activated.hash, activated.number) + let finalized_block_number = get_finalized_block_number(sender).await?; + + let ancestors = Self::get_block_ancestors(sender, activated.hash, activated.number, finalized_block_number, &mut self.last_observed_blocks) .await .unwrap_or_else(|err| { tracing::debug!( @@ -242,23 +243,22 @@ impl OrderingProvider { } /// Returns ancestors of `head` in the descending order, stopping - /// either at the block present in cache or the latest finalized block. + /// either at the block present in cache or at `target_ancestor`. /// /// Suited specifically for querying non-finalized chains, thus /// doesn't rely on block numbers. /// /// Both `head` and last are **not** included in the result. - async fn get_block_ancestors( - &mut self, + pub async fn get_block_ancestors( sender: &mut Sender, mut head: Hash, mut head_number: BlockNumber, + target_ancestor: BlockNumber, + lookup_cache: &mut LruCache ) -> Result> { let mut ancestors = Vec::new(); - let finalized_block_number = get_finalized_block_number(sender).await?; - - if self.last_observed_blocks.get(&head).is_some() || head_number <= finalized_block_number { + if lookup_cache.get(&head).is_some() || head_number <= target_ancestor { return Ok(ancestors) } @@ -297,10 +297,10 @@ impl OrderingProvider { let block_numbers = (earliest_block_number..head_number).rev(); for (block_number, hash) in block_numbers.zip(&hashes) { - // Return if we either met finalized/cached block or + // Return if we either met target/cached block or // hit the size limit for the returned ancestry of head. - if self.last_observed_blocks.get(hash).is_some() || - block_number <= finalized_block_number || + if lookup_cache.get(hash).is_some() || + block_number <= target_ancestor || ancestors.len() >= Self::ANCESTRY_SIZE_LIMIT { return Ok(ancestors) From b27a5d88fdeaf43dc7e57c209595516983fe482f Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 15:57:04 +0000 Subject: [PATCH 06/26] fmt Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 35 ++++++++++++------- .../src/real/ordering/mod.rs | 32 ++++++++++------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index e8e7732ab3e0..779fbda18edd 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -258,7 +258,11 @@ impl Initialized { self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { - match self.rolling_session_window.cache_session_info_for_head(ctx, new_leaf.hash).await { + match self + .rolling_session_window + .cache_session_info_for_head(ctx, new_leaf.hash) + .await + { Err(e) => { tracing::warn!( target: LOG_TARGET, @@ -294,27 +298,32 @@ impl Initialized { if self.error.is_none() { self.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now).await?; } - + // Try to scrape any blocks for which we could not get the current session or did not receive an // active leaves update. Notice the `+1`, that is because `get_block_ancestors()` doesn't // include the head and target block in the response. let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS + 1); - let ancestors = OrderingProvider::get_block_ancestors(ctx.sender(), new_leaf.hash, new_leaf.number, target_block, &mut self.last_scraped_blocks) - .await - .unwrap_or_else(|err| { - tracing::debug!( - target: LOG_TARGET, - "Skipping leaf ancestors scraping due to error: {}", - err - ); - Vec::new() - }); + let ancestors = OrderingProvider::get_block_ancestors( + ctx.sender(), + new_leaf.hash, + new_leaf.number, + target_block, + &mut self.last_scraped_blocks, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + "Skipping leaf ancestors scraping due to error: {}", + err + ); + Vec::new() + }); // Maybe run these in parallel ? for ancestor in ancestors.iter() { self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await?; } - } Ok(()) diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index fa1763e7dd41..f668c745832f 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -186,18 +186,24 @@ impl OrderingProvider { if let Some(activated) = update.activated.as_ref() { // Fetch ancestors of the activated leaf. let finalized_block_number = get_finalized_block_number(sender).await?; - - let ancestors = Self::get_block_ancestors(sender, activated.hash, activated.number, finalized_block_number, &mut self.last_observed_blocks) - .await - .unwrap_or_else(|err| { - tracing::debug!( - target: LOG_TARGET, - activated_leaf = ?activated, - "Skipping leaf ancestors due to an error: {}", - err - ); - Vec::new() - }); + + let ancestors = Self::get_block_ancestors( + sender, + activated.hash, + activated.number, + finalized_block_number, + &mut self.last_observed_blocks, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?activated, + "Skipping leaf ancestors due to an error: {}", + err + ); + Vec::new() + }); // Ancestors block numbers are consecutive in the descending order. let earliest_block_number = activated.number - ancestors.len() as u32; let block_numbers = (earliest_block_number..=activated.number).rev(); @@ -254,7 +260,7 @@ impl OrderingProvider { mut head: Hash, mut head_number: BlockNumber, target_ancestor: BlockNumber, - lookup_cache: &mut LruCache + lookup_cache: &mut LruCache, ) -> Result> { let mut ancestors = Vec::new(); From b79a345f4408c3ba337e0d1097b845364c88cebe Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 15:59:47 +0000 Subject: [PATCH 07/26] fix Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 779fbda18edd..74fa64b7ae72 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -300,9 +300,8 @@ impl Initialized { } // Try to scrape any blocks for which we could not get the current session or did not receive an - // active leaves update. Notice the `+1`, that is because `get_block_ancestors()` doesn't - // include the head and target block in the response. - let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS + 1); + // active leaves update. + let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS); let ancestors = OrderingProvider::get_block_ancestors( ctx.sender(), new_leaf.hash, From f8dd1882e088ef412df899dd68d55da0419c85e2 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 19:40:13 +0000 Subject: [PATCH 08/26] Fix naming Signed-off-by: Andrei Sandu --- .../subsystem-util/src/rolling_session_window.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 1898555455d2..5e11d2fe5446 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -35,7 +35,7 @@ use thiserror::Error; /// Sessions unavailable in state to cache. #[derive(Debug, Clone)] -pub enum SessionsUnavailableKind { +pub enum SessionsUnavailableReason { /// Runtime API subsystem was unavailable. RuntimeApiUnavailable(oneshot::Canceled), /// The runtime API itself returned an error. @@ -59,7 +59,7 @@ pub struct SessionsUnavailableInfo { #[derive(Debug, Error, Clone)] pub struct SessionsUnavailable { /// The error kind. - kind: SessionsUnavailableKind, + kind: SessionsUnavailableReason, /// The info about the session window, if any. info: Option, } @@ -229,12 +229,12 @@ async fn get_session_index_for_head( Ok(Ok(s)) => Ok(s), Ok(Err(e)) => return Err(SessionsUnavailable { - kind: SessionsUnavailableKind::RuntimeApi(e), + kind: SessionsUnavailableReason::RuntimeApi(e), info: None, }), Err(e) => return Err(SessionsUnavailable { - kind: SessionsUnavailableKind::RuntimeApiUnavailable(e), + kind: SessionsUnavailableReason::RuntimeApiUnavailable(e), info: None, }), } @@ -245,7 +245,7 @@ async fn load_all_sessions( block_hash: Hash, start: SessionIndex, end_inclusive: SessionIndex, -) -> Result, SessionsUnavailableKind> { +) -> Result, SessionsUnavailableReason> { let mut v = Vec::new(); for i in start..=end_inclusive { let (tx, rx) = oneshot::channel(); @@ -257,9 +257,9 @@ async fn load_all_sessions( let session_info = match rx.await { Ok(Ok(Some(s))) => s, - Ok(Ok(None)) => return Err(SessionsUnavailableKind::Missing(i)), - Ok(Err(e)) => return Err(SessionsUnavailableKind::RuntimeApi(e)), - Err(canceled) => return Err(SessionsUnavailableKind::RuntimeApiUnavailable(canceled)), + Ok(Ok(None)) => return Err(SessionsUnavailableReason::Missing(i)), + Ok(Err(e)) => return Err(SessionsUnavailableReason::RuntimeApi(e)), + Err(canceled) => return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)), }; v.push(session_info); From ffccad7aa1eb4af8c81dd6b8b43467058fa0a48a Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 20:15:59 +0000 Subject: [PATCH 09/26] review feedback Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 118 +++++++++++------- node/service/src/relay_chain_selection.rs | 2 +- 2 files changed, 73 insertions(+), 47 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 74fa64b7ae72..d958c5dc9ea2 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -68,7 +68,7 @@ use super::{ OverlayedBackend, }; -const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 20; +const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 40; const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20; /// After the first active leaves update we transition to `Initialized` state. @@ -296,7 +296,13 @@ impl Initialized { // Scrape the head if above rolling session update went well. if self.error.is_none() { - self.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now).await?; + let _ = self.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now).await.map_err(|err| { + tracing::warn!( + target: LOG_TARGET, + "Skipping scraping block #{}({}) due to error: {}", + new_leaf.number, new_leaf.hash, err + ); + }); } // Try to scrape any blocks for which we could not get the current session or did not receive an @@ -319,9 +325,16 @@ impl Initialized { Vec::new() }); - // Maybe run these in parallel ? + // We could do this in parallel, but we don't want to overindex on the wasm instances + // usage. for ancestor in ancestors.iter() { - self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await?; + let _ = self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await.map_err(|err| { + tracing::warn!( + target: LOG_TARGET, + "Skipping scraping block {} due to error: {}", + *ancestor, err + ); + }); } } @@ -381,7 +394,8 @@ impl Initialized { }; if backing_validators_per_candidate.is_empty() && disputes.is_empty() { - // Mark this block as scraped succesfully, still we did not get anything out of this block. + // This block is not interesting as it doesnt contain any backing votes or disputes. We'll + // mark it here as scraped to prevent further processing. self.last_scraped_blocks.put(new_leaf, ()); return Ok(()) } @@ -589,44 +603,49 @@ impl Initialized { }, DisputeCoordinatorMessage::RecentDisputes(tx) => { // Return error if session information is missing. - if let Some(subsystem_error) = self.error.clone() { - return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + self.ensure_no_errors()?; + + let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { + disputes } else { - let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); - let _ = tx.send(recent_disputes.keys().cloned().collect()); - } + std::collections::BTreeMap::new() + }; + + let _ = tx.send(recent_disputes.keys().cloned().collect()); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { - if let Some(subsystem_error) = self.error.clone() { - return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + // Return error if session information is missing. + self.ensure_no_errors()?; + + let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { + disputes } else { - let recent_disputes = - overlay_db.load_recent_disputes()?.unwrap_or_default().into_iter(); - let _ = tx.send( - get_active_with_status(recent_disputes, now).map(|(k, _)| k).collect(), - ); - } + std::collections::BTreeMap::new() + }; + + let _ = tx.send( + get_active_with_status(recent_disputes.into_iter(), now).map(|(k, _)| k).collect(), + ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { - if let Some(subsystem_error) = self.error.clone() { - return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) - } else { - let mut query_output = Vec::new(); - for (session_index, candidate_hash) in query.into_iter() { - if let Some(v) = - overlay_db.load_candidate_votes(session_index, &candidate_hash)? - { - query_output.push((session_index, candidate_hash, v.into())); - } else { - tracing::debug!( - target: LOG_TARGET, - session_index, - "No votes found for candidate", - ); - } + // Return error if session information is missing. + self.ensure_no_errors()?; + + let mut query_output = Vec::new(); + for (session_index, candidate_hash) in query { + if let Some(v) = + overlay_db.load_candidate_votes(session_index, &candidate_hash)? + { + query_output.push((session_index, candidate_hash, v.into())); + } else { + tracing::debug!( + target: LOG_TARGET, + session_index, + "No votes found for candidate", + ); } - let _ = tx.send(query_output); } + let _ = tx.send(query_output); }, DisputeCoordinatorMessage::IssueLocalStatement( session, @@ -651,24 +670,31 @@ impl Initialized { tx, } => { // Return error if session information is missing. - if let Some(subsystem_error) = self.error.clone() { - return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) - } else { - let undisputed_chain = determine_undisputed_chain( - overlay_db, - base_number, - base_hash, - block_descriptions, - )?; + self.ensure_no_errors()?; + + let undisputed_chain = determine_undisputed_chain( + overlay_db, + base_number, + base_hash, + block_descriptions, + )?; - let _ = tx.send(undisputed_chain); - } + let _ = tx.send(undisputed_chain); }, } Ok(Box::new(|| Ok(()))) } + // Helper function for checking subsystem errors in mesasge processing. + fn ensure_no_errors(&self) -> Result<()> { + if let Some(subsystem_error) = self.error.clone() { + return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + } + + Ok(()) + } + async fn handle_import_statements( &mut self, ctx: &mut impl SubsystemContext, diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 959e1c67fbc5..c5a407cd76ae 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -536,7 +536,7 @@ where (lag_disputes, subchain_head) }, Err(e) => { - tracing::warn!( + tracing::error!( target: LOG_TARGET, "Call to `DetermineUndisputedChain` failed: {}", e From 5c75c41a75ca6c670f3f0d6933ef4691ef51434c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 20:16:25 +0000 Subject: [PATCH 10/26] fmt Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index d958c5dc9ea2..12929b8bf17a 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -296,13 +296,18 @@ impl Initialized { // Scrape the head if above rolling session update went well. if self.error.is_none() { - let _ = self.scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now).await.map_err(|err| { - tracing::warn!( - target: LOG_TARGET, - "Skipping scraping block #{}({}) due to error: {}", - new_leaf.number, new_leaf.hash, err - ); - }); + let _ = self + .scrape_on_chain_votes(ctx, overlay_db, new_leaf.hash, now) + .await + .map_err(|err| { + tracing::warn!( + target: LOG_TARGET, + "Skipping scraping block #{}({}) due to error: {}", + new_leaf.number, + new_leaf.hash, + err + ); + }); } // Try to scrape any blocks for which we could not get the current session or did not receive an @@ -328,13 +333,16 @@ impl Initialized { // We could do this in parallel, but we don't want to overindex on the wasm instances // usage. for ancestor in ancestors.iter() { - let _ = self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await.map_err(|err| { - tracing::warn!( - target: LOG_TARGET, - "Skipping scraping block {} due to error: {}", - *ancestor, err - ); - }); + let _ = self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await.map_err( + |err| { + tracing::warn!( + target: LOG_TARGET, + "Skipping scraping block {} due to error: {}", + *ancestor, + err + ); + }, + ); } } @@ -610,7 +618,7 @@ impl Initialized { } else { std::collections::BTreeMap::new() }; - + let _ = tx.send(recent_disputes.keys().cloned().collect()); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { @@ -624,7 +632,9 @@ impl Initialized { }; let _ = tx.send( - get_active_with_status(recent_disputes.into_iter(), now).map(|(k, _)| k).collect(), + get_active_with_status(recent_disputes.into_iter(), now) + .map(|(k, _)| k) + .collect(), ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { @@ -671,7 +681,7 @@ impl Initialized { } => { // Return error if session information is missing. self.ensure_no_errors()?; - + let undisputed_chain = determine_undisputed_chain( overlay_db, base_number, From c7997a2318e363a55d86ddb79b6e5889ffc4a525 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 20 Jan 2022 20:27:50 +0000 Subject: [PATCH 11/26] =?UTF-8?q?=F0=9F=92=AC=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 12929b8bf17a..045821d0e69b 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -88,7 +88,7 @@ pub struct Initialized { // This tracks only rolling session window failures. // It can be a `Vec` if the need to track more arises. error: Option, - /// Latest relay blocks that have been succesfully scraped. + /// Latest relay blocks that have been successfully scraped. last_scraped_blocks: LruCache, } From 762024cf7316d8560dcc2f39523e3128aa37cf30 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 21 Jan 2022 10:53:12 +0000 Subject: [PATCH 12/26] consume Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 045821d0e69b..42c920ccf920 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -332,13 +332,13 @@ impl Initialized { // We could do this in parallel, but we don't want to overindex on the wasm instances // usage. - for ancestor in ancestors.iter() { - let _ = self.scrape_on_chain_votes(ctx, overlay_db, *ancestor, now).await.map_err( + for ancestor in ancestors { + let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err( |err| { tracing::warn!( target: LOG_TARGET, "Skipping scraping block {} due to error: {}", - *ancestor, + ancestor, err ); }, From f765694aec70b490154a3135fc26d8b592eb5765 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 24 Jan 2022 11:50:05 +0000 Subject: [PATCH 13/26] fix tests Signed-off-by: Andrei Sandu --- .../src/real/ordering/mod.rs | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index f668c745832f..93518ed5c988 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -184,26 +184,43 @@ impl OrderingProvider { update: &ActiveLeavesUpdate, ) -> Result<()> { if let Some(activated) = update.activated.as_ref() { - // Fetch ancestors of the activated leaf. - let finalized_block_number = get_finalized_block_number(sender).await?; - - let ancestors = Self::get_block_ancestors( - sender, - activated.hash, - activated.number, - finalized_block_number, - &mut self.last_observed_blocks, - ) - .await - .unwrap_or_else(|err| { - tracing::debug!( - target: LOG_TARGET, - activated_leaf = ?activated, - "Skipping leaf ancestors due to an error: {}", - err - ); - Vec::new() - }); + // Fetch last finalized block. + let ancestors = match get_finalized_block_number(sender).await { + Ok(block_number) => { + // Fetch ancestry up to last finalized block. + Self::get_block_ancestors( + sender, + activated.hash, + activated.number, + block_number, + &mut self.last_observed_blocks, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?activated, + "Skipping leaf ancestors due to an error: {}", + err + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }) + }, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?activated, + "Failed to retrieve last finalized block number: {}", + err + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }, + }; + // Ancestors block numbers are consecutive in the descending order. let earliest_block_number = activated.number - ancestors.len() as u32; let block_numbers = (earliest_block_number..=activated.number).rev(); From f7276df4a9a0c180ac4a7d4382decad9a1088a37 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 24 Jan 2022 11:51:39 +0000 Subject: [PATCH 14/26] typo Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 42c920ccf920..d6f1d4f158f5 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -696,7 +696,7 @@ impl Initialized { Ok(Box::new(|| Ok(()))) } - // Helper function for checking subsystem errors in mesasge processing. + // Helper function for checking subsystem errors in message processing. fn ensure_no_errors(&self) -> Result<()> { if let Some(subsystem_error) = self.error.clone() { return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) From b16ce9cc357603207de432258c09d9cd845c46fd Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 24 Jan 2022 14:01:38 +0000 Subject: [PATCH 15/26] review fixes Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 19 +++++++++++-------- node/core/dispute-coordinator/src/real/mod.rs | 13 ++++++++++++- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index d6f1d4f158f5..24671ffe321e 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -16,7 +16,10 @@ //! Dispute coordinator subsystem in initialized state (after first active leaf is received). -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; use futures::{ channel::{mpsc, oneshot}, @@ -611,24 +614,24 @@ impl Initialized { }, DisputeCoordinatorMessage::RecentDisputes(tx) => { // Return error if session information is missing. - self.ensure_no_errors()?; + self.ensure_available_session_info()?; let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes } else { - std::collections::BTreeMap::new() + BTreeMap::new() }; let _ = tx.send(recent_disputes.keys().cloned().collect()); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { // Return error if session information is missing. - self.ensure_no_errors()?; + self.ensure_available_session_info()?; let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes } else { - std::collections::BTreeMap::new() + BTreeMap::new() }; let _ = tx.send( @@ -639,7 +642,7 @@ impl Initialized { }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { // Return error if session information is missing. - self.ensure_no_errors()?; + self.ensure_available_session_info()?; let mut query_output = Vec::new(); for (session_index, candidate_hash) in query { @@ -680,7 +683,7 @@ impl Initialized { tx, } => { // Return error if session information is missing. - self.ensure_no_errors()?; + self.ensure_available_session_info()?; let undisputed_chain = determine_undisputed_chain( overlay_db, @@ -697,7 +700,7 @@ impl Initialized { } // Helper function for checking subsystem errors in message processing. - fn ensure_no_errors(&self) -> Result<()> { + fn ensure_available_session_info(&self) -> Result<()> { if let Some(subsystem_error) = self.error.clone() { return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) } diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index 5446d91ddc45..733b36c645a1 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -124,6 +124,7 @@ where Context: overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { + println!("|-------------------------|"); let future = async { let backend = DbBackend::new(self.store.clone(), self.config.column_config()); self.run(ctx, backend, Box::new(SystemClock)) @@ -144,6 +145,8 @@ impl DisputeCoordinatorSubsystem { keystore: Arc, metrics: Metrics, ) -> Self { + println!("*** |-------------------------|"); + Self { store, config, keystore, metrics } } @@ -159,7 +162,15 @@ impl DisputeCoordinatorSubsystem { Context: SubsystemContext, B: Backend + 'static, { - let res = self.initialize(&mut ctx, backend, &*clock).await?; + println!("|-----------run-----------|"); + let res = match self.initialize(&mut ctx, backend, &*clock).await { + Ok(res) => res, + Err(err) => { + println!("|-----------{:?}-----------|", err); + return Err(err) + }, + }; + println!("|-------initialized-------|"); let (participations, first_leaf, initialized, backend) = match res { // Concluded: From 6b437d8a263682b094519fb4cfc816cac6e90aff Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 24 Jan 2022 14:02:25 +0000 Subject: [PATCH 16/26] Bump scraped blocks LRU capacity Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 24671ffe321e..b3a484e2eb2f 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -71,7 +71,7 @@ use super::{ OverlayedBackend, }; -const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 40; +const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 100; const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20; /// After the first active leaves update we transition to `Initialized` state. From 7dc4ec48f460bceb5fa874345fe954e2003a4e05 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 24 Jan 2022 19:59:34 +0000 Subject: [PATCH 17/26] =?UTF-8?q?=F0=9F=A7=AF=20=F0=9F=94=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index 733b36c645a1..a9adc93d8624 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -145,8 +145,6 @@ impl DisputeCoordinatorSubsystem { keystore: Arc, metrics: Metrics, ) -> Self { - println!("*** |-------------------------|"); - Self { store, config, keystore, metrics } } From ed6664a04e3a9951fd3f9fbca4b88254f7d41ea5 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 16:17:18 +0000 Subject: [PATCH 18/26] remove prints Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/mod.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index a9adc93d8624..2bed63262bc7 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -124,7 +124,6 @@ where Context: overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { - println!("|-------------------------|"); let future = async { let backend = DbBackend::new(self.store.clone(), self.config.column_config()); self.run(ctx, backend, Box::new(SystemClock)) @@ -160,15 +159,10 @@ impl DisputeCoordinatorSubsystem { Context: SubsystemContext, B: Backend + 'static, { - println!("|-----------run-----------|"); let res = match self.initialize(&mut ctx, backend, &*clock).await { Ok(res) => res, - Err(err) => { - println!("|-----------{:?}-----------|", err); - return Err(err) - }, + Err(err) => return Err(err), }; - println!("|-------initialized-------|"); let (participations, first_leaf, initialized, backend) = match res { // Concluded: From 978d280c5349e315820eff03b46e7fb28cbf2c92 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 16:19:45 +0000 Subject: [PATCH 19/26] Increase scraped blocks cache size Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index b3a484e2eb2f..6a6133d3de97 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -71,7 +71,8 @@ use super::{ OverlayedBackend, }; -const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 100; +// The capacity here should be max unfinalized depth x active heads. +const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500 * 5; const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20; /// After the first active leaves update we transition to `Initialized` state. From 718d34326d36c6a04670e3b7ad727037e9a560ff Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 17:14:53 +0000 Subject: [PATCH 20/26] more review fixes Signed-off-by: Andrei Sandu --- .../dispute-coordinator/src/real/initialized.rs | 16 ++++++++-------- .../dispute-coordinator/src/real/ordering/mod.rs | 8 ++++---- node/service/src/relay_chain_selection.rs | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 6a6133d3de97..d5c961389e20 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -71,9 +71,9 @@ use super::{ OverlayedBackend, }; -// The capacity here should be max unfinalized depth x active heads. -const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500 * 5; -const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 20; +// The capacity and scrape depth are equal to the maximum allowed unfinalized depth. +const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500; +const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500; /// After the first active leaves update we transition to `Initialized` state. /// @@ -334,16 +334,16 @@ impl Initialized { Vec::new() }); - // We could do this in parallel, but we don't want to overindex on the wasm instances - // usage. + // The `runtime-api` subsystem has an internal queue which serializes the execution, + // so there is no point in running these in parallel. for ancestor in ancestors { let _ = self.scrape_on_chain_votes(ctx, overlay_db, ancestor, now).await.map_err( |err| { tracing::warn!( target: LOG_TARGET, - "Skipping scraping block {} due to error: {}", - ancestor, - err + hash = ?ancestor, + error = ?err, + "Skipping scraping block due to error", ); }, ); diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index 93518ed5c988..a153166337c1 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -200,8 +200,8 @@ impl OrderingProvider { tracing::debug!( target: LOG_TARGET, activated_leaf = ?activated, - "Skipping leaf ancestors due to an error: {}", - err + error = ?err, + "Skipping leaf ancestors due to an error", ); // We assume this is a spurious error so we'll move forward with an // empty ancestry. @@ -212,8 +212,8 @@ impl OrderingProvider { tracing::debug!( target: LOG_TARGET, activated_leaf = ?activated, - "Failed to retrieve last finalized block number: {}", - err + error = ?err, + "Failed to retrieve last finalized block number", ); // We assume this is a spurious error so we'll move forward with an // empty ancestry. diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index c5a407cd76ae..7c1d3b36c04c 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -54,7 +54,7 @@ use std::sync::Arc; /// /// This is a safety net that should be removed at some point in the future. // Until it's not, make sure to also update `MAX_HEADS_LOOK_BACK` in `approval-voting` -// when changing its value. +// and `MAX_BATCH_SCRAPE_ANCESTORS` in `dispute-coordinator` when changing its value. const MAX_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 500; const LOG_TARGET: &str = "parachain::chain-selection"; From 876d209e23fb6eae5f629905b5644683d729491b Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 17:16:02 +0000 Subject: [PATCH 21/26] another fix Signed-off-by: Andrei Sandu --- node/service/src/relay_chain_selection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 7c1d3b36c04c..59f97ab7e2a3 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -538,8 +538,8 @@ where Err(e) => { tracing::error!( target: LOG_TARGET, - "Call to `DetermineUndisputedChain` failed: {}", - e + error = ?e, + "Call to `DetermineUndisputedChain` failed", ); // We need to return a sane finality target. But, we are unable to ensure we are not // finalizing something that is being disputed or has been concluded as invalid. We will be From ef544aaff06a9d8cac0da0c628e7c4972bddaaf5 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 20:18:39 +0000 Subject: [PATCH 22/26] fix target_ancestor Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index d5c961389e20..d7fa9fc70056 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -316,7 +316,9 @@ impl Initialized { // Try to scrape any blocks for which we could not get the current session or did not receive an // active leaves update. - let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS); + // `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to + // pass in the parent number. + let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS + 1); let ancestors = OrderingProvider::get_block_ancestors( ctx.sender(), new_leaf.hash, From a47920f8c790368f5e63a764a9ecbc74e2f19198 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 20:31:34 +0000 Subject: [PATCH 23/26] Scrape up to max finalized block Signed-off-by: Andrei Sandu --- .../src/real/initialized.rs | 59 ++++++++++++------- .../src/real/ordering/mod.rs | 4 +- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index d7fa9fc70056..3ffa4cf21186 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -55,7 +55,7 @@ use polkadot_primitives::{ use crate::{ error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result}, metrics::Metrics, - real::DisputeCoordinatorSubsystem, + real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem}, status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, LOG_TARGET, }; @@ -73,7 +73,6 @@ use super::{ // The capacity and scrape depth are equal to the maximum allowed unfinalized depth. const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500; -const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500; /// After the first active leaves update we transition to `Initialized` state. /// @@ -316,25 +315,43 @@ impl Initialized { // Try to scrape any blocks for which we could not get the current session or did not receive an // active leaves update. - // `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to - // pass in the parent number. - let target_block = new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS + 1); - let ancestors = OrderingProvider::get_block_ancestors( - ctx.sender(), - new_leaf.hash, - new_leaf.number, - target_block, - &mut self.last_scraped_blocks, - ) - .await - .unwrap_or_else(|err| { - tracing::debug!( - target: LOG_TARGET, - "Skipping leaf ancestors scraping due to error: {}", - err - ); - Vec::new() - }); + let ancestors = match get_finalized_block_number(ctx.sender()).await { + Ok(block_number) => { + // Fetch ancestry up to and including the last finalized block. + // `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to + // pass in it's parent. + OrderingProvider::get_block_ancestors( + ctx.sender(), + new_leaf.hash, + new_leaf.number, + block_number.saturating_sub(1), + &mut self.last_scraped_blocks, + ) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?new_leaf, + error = ?err, + "Skipping leaf ancestors due to an error", + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }) + }, + Err(err) => { + tracing::debug!( + target: LOG_TARGET, + activated_leaf = ?new_leaf, + error = ?err, + "Skipping leaf ancestors scraping", + ); + // We assume this is a spurious error so we'll move forward with an + // empty ancestry. + Vec::new() + }, + }; // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel. diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index a153166337c1..52650a9cd252 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -368,7 +368,9 @@ async fn get_block_number( send_message_fatal(sender, ChainApiMessage::BlockNumber(relay_parent, tx), rx).await } -async fn get_finalized_block_number(sender: &mut impl SubsystemSender) -> FatalResult { +pub async fn get_finalized_block_number( + sender: &mut impl SubsystemSender, +) -> FatalResult { let (number_tx, number_rx) = oneshot::channel(); send_message_fatal(sender, ChainApiMessage::FinalizedBlockNumber(number_tx), number_rx).await } From 593d635826e7b3957894f9948a9452a6668dfba8 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 25 Jan 2022 20:32:09 +0000 Subject: [PATCH 24/26] undo comment change Signed-off-by: Andrei Sandu --- node/service/src/relay_chain_selection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 59f97ab7e2a3..e84a51bc8a07 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -54,7 +54,7 @@ use std::sync::Arc; /// /// This is a safety net that should be removed at some point in the future. // Until it's not, make sure to also update `MAX_HEADS_LOOK_BACK` in `approval-voting` -// and `MAX_BATCH_SCRAPE_ANCESTORS` in `dispute-coordinator` when changing its value. +// when changing its value. const MAX_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 500; const LOG_TARGET: &str = "parachain::chain-selection"; From 1f224b82e49caa740b5082b254f830d1da8f0a5d Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 26 Jan 2022 12:42:51 +0000 Subject: [PATCH 25/26] Limit ancestry lookup to last finalized block or max finality lag Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/initialized.rs | 7 +++++++ node/service/src/relay_chain_selection.rs | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 3ffa4cf21186..4946729467f8 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -73,6 +73,8 @@ use super::{ // The capacity and scrape depth are equal to the maximum allowed unfinalized depth. const LRU_SCRAPED_BLOCKS_CAPACITY: usize = 500; +// This is in sync with `MAX_FINALITY_LAG` in relay chain selection. +const MAX_BATCH_SCRAPE_ANCESTORS: u32 = 500; /// After the first active leaves update we transition to `Initialized` state. /// @@ -317,6 +319,11 @@ impl Initialized { // active leaves update. let ancestors = match get_finalized_block_number(ctx.sender()).await { Ok(block_number) => { + // Limit our search to last finalized block, or up to max finality lag. + let block_number = std::cmp::max( + block_number, + new_leaf.number.saturating_sub(MAX_BATCH_SCRAPE_ANCESTORS), + ); // Fetch ancestry up to and including the last finalized block. // `get_block_ancestors()` doesn't include the target block in the ancestry, so we'll need to // pass in it's parent. diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index e84a51bc8a07..59f97ab7e2a3 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -54,7 +54,7 @@ use std::sync::Arc; /// /// This is a safety net that should be removed at some point in the future. // Until it's not, make sure to also update `MAX_HEADS_LOOK_BACK` in `approval-voting` -// when changing its value. +// and `MAX_BATCH_SCRAPE_ANCESTORS` in `dispute-coordinator` when changing its value. const MAX_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 500; const LOG_TARGET: &str = "parachain::chain-selection"; From 3ab8f6016587e2287d53db19b617b6007158fe0f Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 26 Jan 2022 13:06:03 +0000 Subject: [PATCH 26/26] debug damage Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/real/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index 2bed63262bc7..6d6d7be85abc 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -159,10 +159,7 @@ impl DisputeCoordinatorSubsystem { Context: SubsystemContext, B: Backend + 'static, { - let res = match self.initialize(&mut ctx, backend, &*clock).await { - Ok(res) => res, - Err(err) => return Err(err), - }; + let res = self.initialize(&mut ctx, backend, &*clock).await?; let (participations, first_leaf, initialized, backend) = match res { // Concluded: @@ -206,6 +203,10 @@ impl DisputeCoordinatorSubsystem { }, }; + // Before we move to the initialized state we need to check if we got at + // least on finality notification to prevent large ancestry block scraping, + // when the node is syncing. + let mut overlay_db = OverlayedBackend::new(&mut backend); let (participations, spam_slots, ordering_provider) = match self .handle_startup(