diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index d4ce38927b2..c816a0b29f3 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -404,7 +404,7 @@ impl BeaconBlockStreamer { if self.check_caches == CheckCaches::Yes { match self.beacon_chain.get_block_process_status(&root) { BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated(block) + BlockProcessStatus::NotValidated(block, _) | BlockProcessStatus::ExecutionValidated(block) => { metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS); Some(block) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4f0c6aada0a..08e0d1c6745 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -334,7 +334,7 @@ pub enum BlockProcessStatus { /// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice. Unknown, /// Block is currently processing but not yet validated. - NotValidated(Arc>), + NotValidated(Arc>, BlockImportSource), /// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting /// missing block components. ExecutionValidated(Arc>), @@ -3351,8 +3351,11 @@ impl BeaconChain { ); } - self.data_availability_checker - .put_pre_execution_block(block_root, unverified_block.block_cloned())?; + self.data_availability_checker.put_pre_execution_block( + block_root, + unverified_block.block_cloned(), + block_source, + )?; // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index a0ad1c2112d..43b7d8f7ea3 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -21,8 +21,8 @@ use task_executor::TaskExecutor; use tracing::{debug, error, instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, - SignedBeaconBlock, Slot, + BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, + EthSpec, Hash256, SignedBeaconBlock, Slot, }; mod error; @@ -354,9 +354,10 @@ impl DataAvailabilityChecker { &self, block_root: Hash256, block: Arc>, + source: BlockImportSource, ) -> Result<(), Error> { self.availability_cache - .put_pre_execution_block(block_root, block) + .put_pre_execution_block(block_root, block, source) } /// Removes a pre-execution block from the cache. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index bb440096627..42f6dbd8567 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -19,13 +19,14 @@ use tracing::{Span, debug, debug_span}; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::BlobIdentifier; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, - Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock, + BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, + DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeFixedVector, RuntimeVariableList, + SignedBeaconBlock, }; #[derive(Clone)] pub enum CachedBlock { - PreExecution(Arc>), + PreExecution(Arc>, BlockImportSource), Executed(Box>), } @@ -42,7 +43,7 @@ impl CachedBlock { fn as_block(&self) -> &SignedBeaconBlock { match self { - CachedBlock::PreExecution(b) => b, + CachedBlock::PreExecution(b, _) => b, CachedBlock::Executed(b) => b.as_block(), } } @@ -135,9 +136,13 @@ impl PendingComponents { /// Inserts a pre-execution block into the cache. /// This does NOT override an existing executed block. - pub fn insert_pre_execution_block(&mut self, block: Arc>) { + pub fn insert_pre_execution_block( + &mut self, + block: Arc>, + source: BlockImportSource, + ) { if self.block.is_none() { - self.block = Some(CachedBlock::PreExecution(block)) + self.block = Some(CachedBlock::PreExecution(block, source)) } } @@ -433,7 +438,9 @@ impl DataAvailabilityCheckerInner { .peek(block_root) .and_then(|pending_components| { pending_components.block.as_ref().map(|block| match block { - CachedBlock::PreExecution(b) => BlockProcessStatus::NotValidated(b.clone()), + CachedBlock::PreExecution(b, source) => { + BlockProcessStatus::NotValidated(b.clone(), *source) + } CachedBlock::Executed(b) => { BlockProcessStatus::ExecutionValidated(b.block_cloned()) } @@ -693,11 +700,12 @@ impl DataAvailabilityCheckerInner { &self, block_root: Hash256, block: Arc>, + source: BlockImportSource, ) -> Result<(), AvailabilityCheckError> { let epoch = block.epoch(); let pending_components = self.update_or_insert_pending_components(block_root, epoch, |pending_components| { - pending_components.insert_pre_execution_block(block); + pending_components.insert_pre_execution_block(block, source); Ok(()) })?; @@ -718,7 +726,7 @@ impl DataAvailabilityCheckerInner { /// This does NOT remove an existing executed block. pub fn remove_pre_execution_block(&self, block_root: &Hash256) { // The read lock is immediately dropped so we can safely remove the block from the cache. - if let Some(BlockProcessStatus::NotValidated(_)) = self.get_cached_block(block_root) { + if let Some(BlockProcessStatus::NotValidated(_, _)) = self.get_cached_block(block_root) { self.critical.write().pop(block_root); } } @@ -1459,9 +1467,13 @@ mod pending_components_tests { let mut pending_component = >::empty(block_root, max_len); let pre_execution_block = Arc::new(pre_execution_block); - pending_component.insert_pre_execution_block(pre_execution_block.clone()); + pending_component + .insert_pre_execution_block(pre_execution_block.clone(), BlockImportSource::Gossip); assert!( - matches!(pending_component.block, Some(CachedBlock::PreExecution(_))), + matches!( + pending_component.block, + Some(CachedBlock::PreExecution(_, _)) + ), "pre execution block inserted" ); @@ -1471,7 +1483,8 @@ mod pending_components_tests { "executed block inserted" ); - pending_component.insert_pre_execution_block(pre_execution_block); + pending_component + .insert_pre_execution_block(pre_execution_block, BlockImportSource::Gossip); assert!( matches!(pending_component.block, Some(CachedBlock::Executed(_))), "executed block should remain" diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 36509d2563e..8fb3248a871 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -219,7 +219,7 @@ impl SingleBlockLookup { // can assert that this is the correct value of `blob_kzg_commitments_count`. match cx.chain.get_block_process_status(&self.block_root) { BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated(block) + BlockProcessStatus::NotValidated(block, _) | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), } }) { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 17a42957009..ac2991c1474 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -49,8 +49,8 @@ use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkContext, - Hash256, SignedBeaconBlock, Slot, + BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, + ForkContext, Hash256, SignedBeaconBlock, Slot, }; pub mod custody; @@ -835,14 +835,26 @@ impl SyncNetworkContext { match self.chain.get_block_process_status(&block_root) { // Unknown block, continue request to download BlockProcessStatus::Unknown => {} - // Block is known are currently processing, expect a future event with the result of - // processing. - BlockProcessStatus::NotValidated { .. } => { - // Lookup sync event safety: If the block is currently in the processing cache, we - // are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will - // make progress on this lookup - return Ok(LookupRequestResult::Pending("block in processing cache")); - } + // Block is known and currently processing. Imports from gossip and HTTP API insert the + // block in the da_cache. However, HTTP API is unable to notify sync when it completes + // block import. Returning `Pending` here will result in stuck lookups if the block is + // importing from sync. + BlockProcessStatus::NotValidated(_, source) => match source { + BlockImportSource::Gossip => { + // Lookup sync event safety: If the block is currently in the processing cache, we + // are guaranteed to receive a `SyncMessage::GossipBlockProcessResult` that will + // make progress on this lookup + return Ok(LookupRequestResult::Pending("block in processing cache")); + } + BlockImportSource::Lookup + | BlockImportSource::RangeSync + | BlockImportSource::HttpApi => { + // Lookup, RangeSync or HttpApi block import don't emit the GossipBlockProcessResult + // event. If a lookup happens to be created during block import from one of + // those sources just import the block twice. Otherwise the lookup will get + // stuck. Double imports are fine, they just waste resources. + } + }, // Block is fully validated. If it's not yet imported it's waiting for missing block // components. Consider this request completed and do nothing. BlockProcessStatus::ExecutionValidated { .. } => { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 27968a06351..fc641861754 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -41,8 +41,8 @@ use slot_clock::{SlotClock, TestingSlotClock}; use tokio::sync::mpsc; use tracing::info; use types::{ - BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + BeaconState, BeaconStateBase, BlobSidecar, BlockImportSource, DataColumnSidecar, EthSpec, + ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, data_column_sidecar::ColumnIndex, test_utils::{SeedableRng, TestRandom, XorShiftRng}, }; @@ -1113,7 +1113,7 @@ impl TestRig { self.harness .chain .data_availability_checker - .put_pre_execution_block(block.canonical_root(), block) + .put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip) .unwrap(); } diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index f4e4e369661..61c32dd4ac9 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -843,6 +843,7 @@ impl<'de, E: EthSpec, Payload: AbstractExecPayload> ContextDeserialize<'de, F } } +#[derive(Clone, Copy)] pub enum BlockImportSource { Gossip, Lookup,