Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 64 additions & 43 deletions substrate/client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,10 @@ where
self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
.await?;

if block.origin == BlockOrigin::ConsensusBroadcast {
return Ok(());
}

// Check for equivocation and report it to the runtime if needed.
let author = {
let viable_epoch = query_epoch_changes(
Expand Down Expand Up @@ -1454,6 +1458,7 @@ where
mut block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
let hash = block.post_hash();
let parent_hash = *block.header.parent_hash();
let number = *block.header.number();
let info = self.client.info();

Expand All @@ -1470,11 +1475,18 @@ where
if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
block_status == BlockStatus::InChain
{
// When re-importing existing block strip away intermediates.
// In case of initial sync intermediates should not be present...
let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
return self.inner.import_block(block).await.map_err(Into::into)
// Calculate the weight of the block in case it is missing.
let stored_weight = aux_schema::load_block_weight(&*self.client, hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
// If the stored weight is missing, it means it was skipped when the block was first
// imported. It needs to happen again, along with epoch change tracking.
if stored_weight.is_some() {
// When re-importing existing block strip away intermediates.
// In case of initial sync intermediates should not be present...
let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
return self.inner.import_block(block).await.map_err(Into::into)
}
Comment on lines +1478 to +1489
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the really ugly part - first warp sync imports the block, but skips a bunch of stuff, including the weight calculation. Then when re-importing we check if the weight is missing, and if it is, we do everything in the re-import that normally happens during regular block import (weight calculation and epoch changes tracking).

}

if block.with_state() {
Expand All @@ -1486,36 +1498,35 @@ where
);
let slot = pre_digest.slot();

let parent_hash = *block.header.parent_hash();
let parent_header = self
.client
.header(parent_hash)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
.ok_or_else(|| {
ConsensusError::ChainLookup(
babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
)
})?;

let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
been verified; qed",
);
// If there's a pending epoch we'll save the previous epoch changes here
// this way we can revert it if there's any error.
let mut old_epoch_changes = None;

// make sure that slot number is strictly increasing
if slot <= parent_slot {
return Err(ConsensusError::ClientImport(
babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
))
}
let epoch_changes = if block.origin != BlockOrigin::ConsensusBroadcast {
let parent_header = self
.client
.header(parent_hash)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
.ok_or_else(|| {
ConsensusError::ChainLookup(
babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
)
})?;

let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
been verified; qed",
);

// if there's a pending epoch we'll save the previous epoch changes here
// this way we can revert it if there's any error
let mut old_epoch_changes = None;
// make sure that slot number is strictly increasing
if slot <= parent_slot {
return Err(ConsensusError::ClientImport(
babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
))
}

// Use an extra scope to make the compiler happy, because otherwise it complains about the
// mutex, even if we dropped it...
let mut epoch_changes = {
// Use an extra scope to make the compiler happy, because otherwise it complains about
// the mutex, even if we dropped it...
let mut epoch_changes = self.epoch_changes.shared_data_locked();

// check if there's any epoch change expected to happen at this slot.
Expand All @@ -1537,8 +1548,12 @@ where
})?
};

let intermediate =
block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
let intermediate = block
.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)
.map_err(|e| {
log::info!("XXX no intermediate for block {}", number);
e
})?;

let epoch_descriptor = intermediate.epoch_descriptor;
let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
Expand Down Expand Up @@ -1597,13 +1612,14 @@ where
// re-use the same data for that epoch.
// Notice that we are only updating a local copy of the `Epoch`, this
// makes it so that when we insert the next epoch into `EpochChanges` below
// (after incrementing it), it will use the correct epoch index and start slot.
// We do not update the original epoch that will be re-used because there might
// be other forks (that we haven't imported) where the epoch isn't skipped, and
// to import those forks we want to keep the original epoch data. Not updating
// the original epoch works because when we search the tree for which epoch to
// use for a given slot, we will search in-depth with the predicate
// `epoch.start_slot <= slot` which will still match correctly without updating
// (after incrementing it), it will use the correct epoch index and start
// slot. We do not update the original epoch that will be re-used
// because there might be other forks (that we haven't imported) where
// the epoch isn't skipped, and to import those forks we want to keep
// the original epoch data. Not updating the original epoch works
// because when we search the tree for which epoch to use for a given
// slot, we will search in-depth with the predicate `epoch.start_slot
// <= slot` which will still match correctly without updating
// `start_slot` to the correct value as below.
let epoch = viable_epoch.as_mut();
let prev_index = epoch.epoch_index;
Expand Down Expand Up @@ -1711,15 +1727,20 @@ where
};

// Release the mutex, but it stays locked
epoch_changes.release_mutex()
Some(epoch_changes.release_mutex())
} else {
block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
None
};

let import_result = self.inner.import_block(block).await;

// revert to the original epoch changes in case there's an error
// importing the block
if import_result.is_err() {
if let Some(old_epoch_changes) = old_epoch_changes {
if let (Some(mut epoch_changes), Some(old_epoch_changes)) =
(epoch_changes, old_epoch_changes)
{
*epoch_changes.upgrade() = old_epoch_changes;
}
}
Expand Down
13 changes: 12 additions & 1 deletion substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pub struct IncomingBlock<B: BlockT> {
pub skip_execution: bool,
/// Re-validate existing block.
pub import_existing: bool,
/// Re-validate existing block.
pub allow_missing_parent: bool,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you want me to rename this - I'll get to it eventually 😄

/// Do not compute new state, but rather set it to the given set.
pub state: Option<ImportedState<B>>,
}
Expand Down Expand Up @@ -329,6 +331,15 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
let hash = block.hash;
let parent_hash = *header.parent_hash();

if matches!(block_origin, BlockOrigin::ConsensusBroadcast) {
return Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
import_block: BlockImportParams::new(block_origin, header),
hash: block.hash,
block_origin: peer,
verification_time: Duration::ZERO,
}));
}

match import_handler::<B>(
number,
hash,
Expand All @@ -341,7 +352,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
parent_hash,
allow_missing_state: block.allow_missing_state,
import_existing: block.import_existing,
allow_missing_parent: block.state.is_some(),
allow_missing_parent: block.state.is_some() || block.allow_missing_parent,
})
.await,
)? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ async fn block_import_process<B: BlockT>(
let res =
import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await;

for (res, hash) in &res.results {
if let Err(e) = res {
log::info!("XXX Block import failed for hash = {:?} with error: {}", hash, e,);
}
}

result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
}
Expand Down
8 changes: 8 additions & 0 deletions substrate/client/consensus/grandpa/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ where
hash: Block::Hash,
initial_sync: bool,
) -> Result<PendingSetChanges<Block>, ConsensusError> {
if block.origin == BlockOrigin::ConsensusBroadcast {
return Ok(PendingSetChanges {
just_in_case: None,
applied_changes: AppliedChanges::None,
do_pause: false,
})
}

// when we update the authorities, we need to hold the lock
// until the block is written to prevent a race if we need to restore
// the old authority set on error or panic.
Expand Down
8 changes: 8 additions & 0 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,14 @@ impl BlocksPruning {
BlocksPruning::Some(_) => false,
}
}

/// True if pruning is enabled ([`BlocksPruning::Some`]).
pub fn is_pruned(&self) -> bool {
match *self {
BlocksPruning::KeepAll | BlocksPruning::KeepFinalized => false,
BlocksPruning::Some(_) => true,
}
}
}

/// Where to find the database..
Expand Down
14 changes: 13 additions & 1 deletion substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ pub struct ChainSync<B: BlockT, Client> {
import_existing: bool,
/// Block downloader
block_downloader: Arc<dyn BlockDownloader<B>>,
/// Is block pruning enabled? This indicates that the user is not
/// interested in historical blocks, so gap sync will be skipped.
block_pruning_enabled: bool,
/// Gap download process.
gap_sync: Option<GapSync<B>>,
/// Pending actions.
Expand Down Expand Up @@ -889,7 +892,7 @@ where
let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
trace!(
target: LOG_TARGET,
"Created `StrategyRequest` to {peer_id}.",
"Created `StateRequest` to {peer_id}.",
);

let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -944,6 +947,7 @@ where
block_downloader: Arc<dyn BlockDownloader<B>>,
metrics_registry: Option<&Registry>,
initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
block_pruning_enabled: bool,
) -> Result<Self, ClientError> {
let mut sync = Self {
client,
Expand All @@ -965,6 +969,7 @@ where
state_sync: None,
import_existing: false,
block_downloader,
block_pruning_enabled,
gap_sync: None,
actions: Vec::new(),
metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
Expand Down Expand Up @@ -1205,6 +1210,7 @@ where
import_existing: self.import_existing,
skip_execution: true,
state: None,
allow_missing_parent: true,
}
})
.collect();
Expand Down Expand Up @@ -1244,6 +1250,7 @@ where
import_existing: self.import_existing,
skip_execution: self.skip_execution(),
state: None,
allow_missing_parent: false,
}
})
.collect()
Expand Down Expand Up @@ -1385,6 +1392,7 @@ where
allow_missing_state: true,
import_existing: false,
skip_execution: true,
allow_missing_parent: false,
state: None,
}
})
Expand Down Expand Up @@ -1523,6 +1531,8 @@ where

fn required_block_attributes(&self) -> BlockAttributes {
match self.mode {
// TODO Not sure if commenting/uncommenting this actually affects the behavior
_ if self.block_pruning_enabled /*&& self.gap_sync.is_some()*/ => BlockAttributes::HEADER,
ChainSyncMode::Full =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
Expand Down Expand Up @@ -1771,6 +1781,7 @@ where
allow_missing_state: true,
import_existing: self.import_existing,
skip_execution: self.skip_execution(),
allow_missing_parent: false,
state: None,
}
})
Expand Down Expand Up @@ -2019,6 +2030,7 @@ where
allow_missing_state: true,
import_existing: true,
skip_execution: self.skip_execution(),
allow_missing_parent: false,
state: Some(state),
};
debug!(target: LOG_TARGET, "State download is complete. Import is queued");
Expand Down
Loading
Loading