Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
if self.check_caches == CheckCaches::Yes {
match self.beacon_chain.get_block_process_status(&root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
Some(block)
Expand Down
9 changes: 6 additions & 3 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ pub enum BlockProcessStatus<E: EthSpec> {
/// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice.
Unknown,
/// Block is currently processing but not yet validated.
NotValidated(Arc<SignedBeaconBlock<E>>),
NotValidated(Arc<SignedBeaconBlock<E>>, BlockImportSource),
/// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting
/// missing block components.
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
Expand Down Expand Up @@ -3351,8 +3351,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

self.data_availability_checker
.put_pre_execution_block(block_root, unverified_block.block_cloned())?;
self.data_availability_checker.put_pre_execution_block(
block_root,
unverified_block.block_cloned(),
block_source,
)?;

// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use task_executor::TaskExecutor;
use tracing::{debug, error, instrument};
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch,
EthSpec, Hash256, SignedBeaconBlock, Slot,
};

mod error;
Expand Down Expand Up @@ -354,9 +354,10 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
source: BlockImportSource,
) -> Result<(), Error> {
self.availability_cache
.put_pre_execution_block(block_root, block)
.put_pre_execution_block(block_root, block, source)
}

/// Removes a pre-execution block from the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use tracing::{Span, debug, debug_span};
use types::beacon_block_body::KzgCommitments;
use types::blob_sidecar::BlobIdentifier;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec,
Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock,
BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeFixedVector, RuntimeVariableList,
SignedBeaconBlock,
};

#[derive(Clone)]
pub enum CachedBlock<E: EthSpec> {
PreExecution(Arc<SignedBeaconBlock<E>>),
PreExecution(Arc<SignedBeaconBlock<E>>, BlockImportSource),
Executed(Box<DietAvailabilityPendingExecutedBlock<E>>),
}

Expand All @@ -42,7 +43,7 @@ impl<E: EthSpec> CachedBlock<E> {

fn as_block(&self) -> &SignedBeaconBlock<E> {
match self {
CachedBlock::PreExecution(b) => b,
CachedBlock::PreExecution(b, _) => b,
CachedBlock::Executed(b) => b.as_block(),
}
}
Expand Down Expand Up @@ -135,9 +136,13 @@ impl<E: EthSpec> PendingComponents<E> {

/// Inserts a pre-execution block into the cache.
/// This does NOT override an existing executed block.
pub fn insert_pre_execution_block(&mut self, block: Arc<SignedBeaconBlock<E>>) {
pub fn insert_pre_execution_block(
&mut self,
block: Arc<SignedBeaconBlock<E>>,
source: BlockImportSource,
) {
if self.block.is_none() {
self.block = Some(CachedBlock::PreExecution(block))
self.block = Some(CachedBlock::PreExecution(block, source))
}
}

Expand Down Expand Up @@ -433,7 +438,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.peek(block_root)
.and_then(|pending_components| {
pending_components.block.as_ref().map(|block| match block {
CachedBlock::PreExecution(b) => BlockProcessStatus::NotValidated(b.clone()),
CachedBlock::PreExecution(b, source) => {
BlockProcessStatus::NotValidated(b.clone(), *source)
}
CachedBlock::Executed(b) => {
BlockProcessStatus::ExecutionValidated(b.block_cloned())
}
Expand Down Expand Up @@ -693,11 +700,12 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
&self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
source: BlockImportSource,
) -> Result<(), AvailabilityCheckError> {
let epoch = block.epoch();
let pending_components =
self.update_or_insert_pending_components(block_root, epoch, |pending_components| {
pending_components.insert_pre_execution_block(block);
pending_components.insert_pre_execution_block(block, source);
Ok(())
})?;

Expand All @@ -718,7 +726,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
/// This does NOT remove an existing executed block.
pub fn remove_pre_execution_block(&self, block_root: &Hash256) {
// The read lock is immediately dropped so we can safely remove the block from the cache.
if let Some(BlockProcessStatus::NotValidated(_)) = self.get_cached_block(block_root) {
if let Some(BlockProcessStatus::NotValidated(_, _)) = self.get_cached_block(block_root) {
self.critical.write().pop(block_root);
}
}
Expand Down Expand Up @@ -1459,9 +1467,13 @@ mod pending_components_tests {
let mut pending_component = <PendingComponents<E>>::empty(block_root, max_len);

let pre_execution_block = Arc::new(pre_execution_block);
pending_component.insert_pre_execution_block(pre_execution_block.clone());
pending_component
.insert_pre_execution_block(pre_execution_block.clone(), BlockImportSource::Gossip);
assert!(
matches!(pending_component.block, Some(CachedBlock::PreExecution(_))),
matches!(
pending_component.block,
Some(CachedBlock::PreExecution(_, _))
),
"pre execution block inserted"
);

Expand All @@ -1471,7 +1483,8 @@ mod pending_components_tests {
"executed block inserted"
);

pending_component.insert_pre_execution_block(pre_execution_block);
pending_component
.insert_pre_execution_block(pre_execution_block, BlockImportSource::Gossip);
assert!(
matches!(pending_component.block, Some(CachedBlock::Executed(_))),
"executed block should remain"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match cx.chain.get_block_process_status(&self.block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
BlockProcessStatus::NotValidated(block, _)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
}
}) {
Expand Down
32 changes: 22 additions & 10 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use tokio::sync::mpsc;
use tracing::{Span, debug, debug_span, error, warn};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext,
Hash256, SignedBeaconBlock, Slot,
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
ForkContext, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -835,14 +835,26 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
// Block is known and currently processing. Imports from gossip and HTTP API insert the
// block in the da_cache. However, HTTP API is unable to notify sync when it completes
// block import. Returning `Pending` here will result in stuck lookups if the block is
// importing from sync.
BlockProcessStatus::NotValidated(_, source) => match source {
BlockImportSource::Gossip => {
// Lookup sync event safety: If the block is currently in the processing cache, we
// are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will
// make progress on this lookup
return Ok(LookupRequestResult::Pending("block in processing cache"));
}
BlockImportSource::Lookup
| BlockImportSource::RangeSync
| BlockImportSource::HttpApi => {
// Lookup, RangeSync or HttpApi block import don't emit the GossipBlockProcessResult
// event. If a lookup happens to be created during block import from one of
// those sources just import the block twice. Otherwise the lookup will get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment on the doc here:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the impact of duplicate imports is not as bad since michael made the fix in #8050
As described in the PR - longer term we'll have @ethDreamer's block status tracker to prevent block re-imports properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

longer term we'll have @ethDreamer's block status tracker to prevent block re-imports properly.

I noticed in this PR that we'll need to re-engineer sync a bit to for that to work. But working with Mark about it

// stuck. Double imports are fine, they just waste resources.
}
},
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated { .. } => {
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use slot_clock::{SlotClock, TestingSlotClock};
use tokio::sync::mpsc;
use tracing::info;
use types::{
BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName,
Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
BeaconState, BeaconStateBase, BlobSidecar, BlockImportSource, DataColumnSidecar, EthSpec,
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
data_column_sidecar::ColumnIndex,
test_utils::{SeedableRng, TestRandom, XorShiftRng},
};
Expand Down Expand Up @@ -1113,7 +1113,7 @@ impl TestRig {
self.harness
.chain
.data_availability_checker
.put_pre_execution_block(block.canonical_root(), block)
.put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip)
.unwrap();
}

Expand Down
1 change: 1 addition & 0 deletions consensus/types/src/beacon_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ impl<'de, E: EthSpec, Payload: AbstractExecPayload<E>> ContextDeserialize<'de, F
}
}

#[derive(Clone, Copy)]
pub enum BlockImportSource {
Gossip,
Lookup,
Expand Down
Loading