diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 34fad5d42f559..5b8501f1963db 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1257,6 +1257,10 @@ where self.check_inherents(block, parent_hash, slot, create_inherent_data_providers) .await?; + if block.origin == BlockOrigin::ConsensusBroadcast { + return Ok(()); + } + // Check for equivocation and report it to the runtime if needed. let author = { let viable_epoch = query_epoch_changes( @@ -1454,6 +1458,7 @@ where mut block: BlockImportParams, ) -> Result { let hash = block.post_hash(); + let parent_hash = *block.header.parent_hash(); let number = *block.header.number(); let info = self.client.info(); @@ -1470,11 +1475,18 @@ where if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) || block_status == BlockStatus::InChain { - // When re-importing existing block strip away intermediates. - // In case of initial sync intermediates should not be present... - let _ = block.remove_intermediate::>(INTERMEDIATE_KEY); - block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); - return self.inner.import_block(block).await.map_err(Into::into) + // Calculate the weight of the block in case it is missing. + let stored_weight = aux_schema::load_block_weight(&*self.client, hash) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; + // If the stored weight is missing, it means it was skipped when the block was first + // imported. It needs to happen again, along with epoch change tracking. + if stored_weight.is_some() { + // When re-importing existing block strip away intermediates. + // In case of initial sync intermediates should not be present... + let _ = block.remove_intermediate::>(INTERMEDIATE_KEY); + block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + return self.inner.import_block(block).await.map_err(Into::into) + } } if block.with_state() { @@ -1486,36 +1498,35 @@ where ); let slot = pre_digest.slot(); - let parent_hash = *block.header.parent_hash(); - let parent_header = self - .client - .header(parent_hash) - .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? - .ok_or_else(|| { - ConsensusError::ChainLookup( - babe_err(Error::::ParentUnavailable(parent_hash, hash)).into(), - ) - })?; - - let parent_slot = find_pre_digest::(&parent_header).map(|d| d.slot()).expect( - "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \ - been verified; qed", - ); + // If there's a pending epoch we'll save the previous epoch changes here + // this way we can revert it if there's any error. + let mut old_epoch_changes = None; - // make sure that slot number is strictly increasing - if slot <= parent_slot { - return Err(ConsensusError::ClientImport( - babe_err(Error::::SlotMustIncrease(parent_slot, slot)).into(), - )) - } + let epoch_changes = if block.origin != BlockOrigin::ConsensusBroadcast { + let parent_header = self + .client + .header(parent_hash) + .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? + .ok_or_else(|| { + ConsensusError::ChainLookup( + babe_err(Error::::ParentUnavailable(parent_hash, hash)).into(), + ) + })?; + + let parent_slot = find_pre_digest::(&parent_header).map(|d| d.slot()).expect( + "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \ + been verified; qed", + ); - // if there's a pending epoch we'll save the previous epoch changes here - // this way we can revert it if there's any error - let mut old_epoch_changes = None; + // make sure that slot number is strictly increasing + if slot <= parent_slot { + return Err(ConsensusError::ClientImport( + babe_err(Error::::SlotMustIncrease(parent_slot, slot)).into(), + )) + } - // Use an extra scope to make the compiler happy, because otherwise it complains about the - // mutex, even if we dropped it... - let mut epoch_changes = { + // Use an extra scope to make the compiler happy, because otherwise it complains about + // the mutex, even if we dropped it... let mut epoch_changes = self.epoch_changes.shared_data_locked(); // check if there's any epoch change expected to happen at this slot. @@ -1537,8 +1548,12 @@ where })? }; - let intermediate = - block.remove_intermediate::>(INTERMEDIATE_KEY)?; + let intermediate = block + .remove_intermediate::>(INTERMEDIATE_KEY) + .map_err(|e| { + log::info!("XXX no intermediate for block {}", number); + e + })?; let epoch_descriptor = intermediate.epoch_descriptor; let first_in_epoch = parent_slot < epoch_descriptor.start_slot(); @@ -1597,13 +1612,14 @@ where // re-use the same data for that epoch. // Notice that we are only updating a local copy of the `Epoch`, this // makes it so that when we insert the next epoch into `EpochChanges` below - // (after incrementing it), it will use the correct epoch index and start slot. - // We do not update the original epoch that will be re-used because there might - // be other forks (that we haven't imported) where the epoch isn't skipped, and - // to import those forks we want to keep the original epoch data. Not updating - // the original epoch works because when we search the tree for which epoch to - // use for a given slot, we will search in-depth with the predicate - // `epoch.start_slot <= slot` which will still match correctly without updating + // (after incrementing it), it will use the correct epoch index and start + // slot. We do not update the original epoch that will be re-used + // because there might be other forks (that we haven't imported) where + // the epoch isn't skipped, and to import those forks we want to keep + // the original epoch data. Not updating the original epoch works + // because when we search the tree for which epoch to use for a given + // slot, we will search in-depth with the predicate `epoch.start_slot + // <= slot` which will still match correctly without updating // `start_slot` to the correct value as below. let epoch = viable_epoch.as_mut(); let prev_index = epoch.epoch_index; @@ -1711,7 +1727,10 @@ where }; // Release the mutex, but it stays locked - epoch_changes.release_mutex() + Some(epoch_changes.release_mutex()) + } else { + block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + None }; let import_result = self.inner.import_block(block).await; @@ -1719,7 +1738,9 @@ where // revert to the original epoch changes in case there's an error // importing the block if import_result.is_err() { - if let Some(old_epoch_changes) = old_epoch_changes { + if let (Some(mut epoch_changes), Some(old_epoch_changes)) = + (epoch_changes, old_epoch_changes) + { *epoch_changes.upgrade() = old_epoch_changes; } } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index be14d780b2e06..6b59ced666400 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -91,6 +91,8 @@ pub struct IncomingBlock { pub skip_execution: bool, /// Re-validate existing block. pub import_existing: bool, + /// Re-validate existing block. + pub allow_missing_parent: bool, /// Do not compute new state, but rather set it to the given set. pub state: Option>, } @@ -329,6 +331,15 @@ pub(crate) async fn verify_single_block_metered>( let hash = block.hash; let parent_hash = *header.parent_hash(); + if matches!(block_origin, BlockOrigin::ConsensusBroadcast) { + return Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters { + import_block: BlockImportParams::new(block_origin, header), + hash: block.hash, + block_origin: peer, + verification_time: Duration::ZERO, + })); + } + match import_handler::( number, hash, @@ -341,7 +352,7 @@ pub(crate) async fn verify_single_block_metered>( parent_hash, allow_missing_state: block.allow_missing_state, import_existing: block.import_existing, - allow_missing_parent: block.state.is_some(), + allow_missing_parent: block.state.is_some() || block.allow_missing_parent, }) .await, )? { diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index d8879731654c1..f902378bc9163 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -244,6 +244,12 @@ async fn block_import_process( let res = import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await; + for (res, hash) in &res.results { + if let Err(e) = res { + log::info!("XXX Block import failed for hash = {:?} with error: {}", hash, e,); + } + } + result_sender.blocks_processed(res.imported, res.block_count, res.results); } } diff --git a/substrate/client/consensus/grandpa/src/import.rs b/substrate/client/consensus/grandpa/src/import.rs index e2343409f3ea2..a27fd4829e51d 100644 --- a/substrate/client/consensus/grandpa/src/import.rs +++ b/substrate/client/consensus/grandpa/src/import.rs @@ -276,6 +276,14 @@ where hash: Block::Hash, initial_sync: bool, ) -> Result, ConsensusError> { + if block.origin == BlockOrigin::ConsensusBroadcast { + return Ok(PendingSetChanges { + just_in_case: None, + applied_changes: AppliedChanges::None, + do_pause: false, + }) + } + // when we update the authorities, we need to hold the lock // until the block is written to prevent a race if we need to restore // the old authority set on error or panic. diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 05696bdc74d56..367aa1eace11a 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -337,6 +337,14 @@ impl BlocksPruning { BlocksPruning::Some(_) => false, } } + + /// True if pruning is enabled ([`BlocksPruning::Some`]). + pub fn is_pruned(&self) -> bool { + match *self { + BlocksPruning::KeepAll | BlocksPruning::KeepFinalized => false, + BlocksPruning::Some(_) => true, + } + } } /// Where to find the database.. diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 7eed775e03bfb..4a86b8d9177d0 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -337,6 +337,9 @@ pub struct ChainSync { import_existing: bool, /// Block downloader block_downloader: Arc>, + /// Is block pruning enabled? This indicates that the user is not + /// interested in historical blocks, so gap sync will be skipped. + block_pruning_enabled: bool, /// Gap download process. gap_sync: Option>, /// Pending actions. @@ -889,7 +892,7 @@ where let state_request = self.state_request().into_iter().map(|(peer_id, request)| { trace!( target: LOG_TARGET, - "Created `StrategyRequest` to {peer_id}.", + "Created `StateRequest` to {peer_id}.", ); let (tx, rx) = oneshot::channel(); @@ -944,6 +947,7 @@ where block_downloader: Arc>, metrics_registry: Option<&Registry>, initial_peers: impl Iterator)>, + block_pruning_enabled: bool, ) -> Result { let mut sync = Self { client, @@ -965,6 +969,7 @@ where state_sync: None, import_existing: false, block_downloader, + block_pruning_enabled, gap_sync: None, actions: Vec::new(), metrics: metrics_registry.and_then(|r| match Metrics::register(r) { @@ -1205,6 +1210,7 @@ where import_existing: self.import_existing, skip_execution: true, state: None, + allow_missing_parent: true, } }) .collect(); @@ -1244,6 +1250,7 @@ where import_existing: self.import_existing, skip_execution: self.skip_execution(), state: None, + allow_missing_parent: false, } }) .collect() @@ -1385,6 +1392,7 @@ where allow_missing_state: true, import_existing: false, skip_execution: true, + allow_missing_parent: false, state: None, } }) @@ -1523,6 +1531,8 @@ where fn required_block_attributes(&self) -> BlockAttributes { match self.mode { + // TODO Not sure if commenting/uncommenting this actually affects the behavior + _ if self.block_pruning_enabled /*&& self.gap_sync.is_some()*/ => BlockAttributes::HEADER, ChainSyncMode::Full => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, ChainSyncMode::LightState { storage_chain_mode: false, .. } => @@ -1771,6 +1781,7 @@ where allow_missing_state: true, import_existing: self.import_existing, skip_execution: self.skip_execution(), + allow_missing_parent: false, state: None, } }) @@ -2019,6 +2030,7 @@ where allow_missing_state: true, import_existing: true, skip_execution: self.skip_execution(), + allow_missing_parent: false, state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index f5b21c67fbbd4..20da3b9e2138f 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -95,6 +95,7 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -160,6 +161,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -366,6 +368,7 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -524,6 +527,7 @@ fn can_sync_huge_fork() { proxy_block_downloader.clone(), None, std::iter::empty(), + false, ) .unwrap(); @@ -672,6 +676,7 @@ fn syncs_fork_without_duplicate_requests() { proxy_block_downloader.clone(), None, std::iter::empty(), + false, ) .unwrap(); @@ -820,6 +825,7 @@ fn removes_target_fork_on_disconnect() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -855,6 +861,7 @@ fn can_import_response_with_missing_blocks() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -896,6 +903,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -1048,6 +1056,7 @@ fn request_across_forks() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -1156,6 +1165,7 @@ fn sync_verification_failed_with_gap_filled() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); @@ -1293,6 +1303,7 @@ fn sync_gap_filled_regardless_of_blocks_origin() { Arc::new(MockBlockDownloader::new()), None, std::iter::empty(), + false, ) .unwrap(); diff --git a/substrate/client/network/sync/src/strategy/polkadot.rs b/substrate/client/network/sync/src/strategy/polkadot.rs index 03de11de7e9e9..98beb5b334c92 100644 --- a/substrate/client/network/sync/src/strategy/polkadot.rs +++ b/substrate/client/network/sync/src/strategy/polkadot.rs @@ -73,6 +73,8 @@ where pub state_request_protocol_name: ProtocolName, /// Block downloader pub block_downloader: Arc>, + /// Is block pruning enabled? + pub block_pruning_enabled: bool, } /// Proxy to specific syncing strategies used in Polkadot. @@ -382,6 +384,7 @@ where config.block_downloader.clone(), config.metrics_registry.as_ref(), std::iter::empty(), + config.block_pruning_enabled, )?; Ok(Self { config, @@ -436,6 +439,7 @@ where self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { (*peer_id, *best_hash, *best_number) }), + self.config.block_pruning_enabled, ) { Ok(chain_sync) => chain_sync, Err(e) => { @@ -466,6 +470,7 @@ where self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { (*peer_id, *best_hash, *best_number) }), + self.config.block_pruning_enabled, ) { Ok(chain_sync) => chain_sync, Err(e) => { diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index 1abbb96ccd907..79841d241dab5 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -229,6 +229,7 @@ impl StateStrategy { allow_missing_state: true, import_existing: true, skip_execution: true, + allow_missing_parent: true, state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index c815ca1b86d6a..29bc0da0c3854 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -407,6 +407,7 @@ where let proof_to_incoming_block = |(header, justifications): (B::Header, Justifications)| -> IncomingBlock { + log::info!("XXX warp sync importing header {}", header.number()); IncomingBlock { hash: header.hash(), header: Some(header), @@ -421,6 +422,7 @@ where // Shouldn't already exist in the database. import_existing: false, state: None, + allow_missing_parent: false, } }; @@ -437,7 +439,7 @@ where *last_hash = new_last_hash; self.total_proof_bytes += response.0.len() as u64; self.actions.push(SyncingAction::ImportBlocks { - origin: BlockOrigin::NetworkInitialSync, + origin: BlockOrigin::ConsensusBroadcast, blocks: proofs.into_iter().map(proof_to_incoming_block).collect(), }); }, @@ -452,7 +454,7 @@ where self.total_proof_bytes += response.0.len() as u64; self.phase = Phase::TargetBlock(header); self.actions.push(SyncingAction::ImportBlocks { - origin: BlockOrigin::NetworkInitialSync, + origin: BlockOrigin::ConsensusBroadcast, blocks: proofs.into_iter().map(proof_to_incoming_block).collect(), }); }, diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 79a0a42138bf7..954afe0e3f396 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -918,6 +918,7 @@ pub trait TestNetFactory: Default + Sized + Send { state_request_protocol_name: state_request_protocol_config.name.clone(), block_downloader: block_relay_params.downloader, min_peers_to_start_warp_sync: None, + block_pruning_enabled: false, }; // Initialize syncing strategy. let syncing_strategy = Box::new( diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 74d94c30cd69b..25899704a6a48 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -1007,6 +1007,7 @@ where fork_id, &mut net_config, warp_sync_config, + config.blocks_pruning.is_pruned(), block_downloader, client.clone(), &spawn_handle, @@ -1280,6 +1281,8 @@ where pub metrics_registry: Option<&'a Registry>, /// Metrics. pub metrics: NotificationMetrics, + /// Is block pruning enabled? + pub block_pruning_enabled: bool, } /// Build default syncing engine using [`build_default_block_downloader`] and @@ -1312,6 +1315,7 @@ where spawn_handle, metrics_registry, metrics, + block_pruning_enabled, } = config; let block_downloader = build_default_block_downloader( @@ -1328,6 +1332,7 @@ where fork_id, net_config, warp_sync_config, + block_pruning_enabled, block_downloader, client.clone(), spawn_handle, @@ -1395,6 +1400,7 @@ pub fn build_polkadot_syncing_strategy( fork_id: Option<&str>, net_config: &mut FullNetworkConfiguration::Hash, Net>, warp_sync_config: Option>, + block_pruning_enabled: bool, block_downloader: Arc>, client: Arc, spawn_handle: &SpawnTaskHandle, @@ -1467,6 +1473,7 @@ where metrics_registry: metrics_registry.cloned(), state_request_protocol_name, block_downloader, + block_pruning_enabled, }; Ok(Box::new(PolkadotSyncingStrategy::new( syncing_config, diff --git a/substrate/client/service/src/chain_ops/import_blocks.rs b/substrate/client/service/src/chain_ops/import_blocks.rs index 8e759faa0775d..f93ccffae7370 100644 --- a/substrate/client/service/src/chain_ops/import_blocks.rs +++ b/substrate/client/service/src/chain_ops/import_blocks.rs @@ -172,6 +172,7 @@ fn import_block_to_queue( import_existing: force, state: None, skip_execution: false, + allow_missing_parent: false, }], ); } diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 5f55499b32f06..6ffebdac0e910 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -502,6 +502,13 @@ where *self.importing_block.write() = Some(hash); + let create_gap = if origin == BlockOrigin::ConsensusBroadcast { + // Never create gaps for consensus imported blocks, because this import origin is used + // during warp sync, and the following gap sync needs to import all blocks. + false + } else { + create_gap + }; operation.op.set_create_gap(create_gap); let result = self.execute_and_import_block( @@ -801,6 +808,10 @@ where Self: ProvideRuntimeApi, >::Api: CoreApi + ApiExt, { + if import_block.origin == BlockOrigin::ConsensusBroadcast { + return Ok(PrepareStorageChangesResult::Import(None)) + } + let parent_hash = import_block.header.parent_hash(); let state_action = std::mem::replace(&mut import_block.state_action, StateAction::Skip); let (enact_state, storage_changes) = match (self.block_status(*parent_hash)?, state_action)