diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 084a68bfeab..ef3c2f52e0f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -340,10 +340,6 @@ pub enum BlockProcessStatus { ExecutionValidated(Arc>), } -pub struct BeaconChainMetrics { - pub reqresp_pre_import_cache_len: usize, -} - pub type LightClientProducerEvent = (Hash256, Slot, SyncAggregate); pub type BeaconForkChoice = ForkChoice< @@ -363,9 +359,6 @@ pub type BeaconStore = Arc< >, >; -/// Cache gossip verified blocks to serve over ReqResp before they are imported -type ReqRespPreImportCache = HashMap>>; - /// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block /// operations and chooses a canonical head. pub struct BeaconChain { @@ -462,8 +455,6 @@ pub struct BeaconChain { pub(crate) attester_cache: Arc, /// A cache used when producing attestations whilst the head block is still being imported. pub early_attester_cache: EarlyAttesterCache, - /// Cache gossip verified blocks to serve over ReqResp before they are imported - pub reqresp_pre_import_cache: Arc>>, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. @@ -1289,18 +1280,8 @@ impl BeaconChain { /// chain. Used by sync to learn the status of a block and prevent repeated downloads / /// processing attempts. pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus { - if let Some(block) = self - .data_availability_checker - .get_execution_valid_block(block_root) - { - return BlockProcessStatus::ExecutionValidated(block); - } - - if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) { - // A block is on the `reqresp_pre_import_cache` but NOT in the - // `data_availability_checker` only if it is actively processing. We can expect a future - // event with the result of processing - return BlockProcessStatus::NotValidated(block.clone()); + if let Some(cached_block) = self.data_availability_checker.get_cached_block(block_root) { + return cached_block; } BlockProcessStatus::Unknown @@ -3054,8 +3035,7 @@ impl BeaconChain { self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob())); - let r = self.check_gossip_blob_availability_and_import(blob).await; - self.remove_notified(&block_root, r) + self.check_gossip_blob_availability_and_import(blob).await } /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was @@ -3092,15 +3072,13 @@ impl BeaconChain { data_columns.iter().map(|column| column.as_data_column()), ); - let r = self - .check_gossip_data_columns_availability_and_import( - slot, - block_root, - data_columns, - publish_fn, - ) - .await; - self.remove_notified(&block_root, r) + self.check_gossip_data_columns_availability_and_import( + slot, + block_root, + data_columns, + publish_fn, + ) + .await } /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was @@ -3139,10 +3117,8 @@ impl BeaconChain { self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); - let r = self - .check_rpc_blob_availability_and_import(slot, block_root, blobs) - .await; - self.remove_notified(&block_root, r) + self.check_rpc_blob_availability_and_import(slot, block_root, blobs) + .await } /// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`. @@ -3174,10 +3150,8 @@ impl BeaconChain { } } - let r = self - .check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output) - .await; - self.remove_notified(&block_root, r) + self.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output) + .await } fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc, block_root: &Hash256, blobs_iter: I) @@ -3270,10 +3244,8 @@ impl BeaconChain { custody_columns.iter().map(|column| column.as_ref()), ); - let r = self - .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) - .await; - self.remove_notified(&block_root, r) + self.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) + .await } pub async fn reconstruct_data_columns( @@ -3320,10 +3292,8 @@ impl BeaconChain { return Ok(None); }; - let r = self - .process_availability(slot, availability, || Ok(())) - .await; - self.remove_notified(&block_root, r) + self.process_availability(slot, availability, || Ok(())) + .await .map(|availability_processing_status| { Some((availability_processing_status, data_columns_to_publish)) }) @@ -3340,46 +3310,6 @@ impl BeaconChain { } } - /// Remove any block components from the *processing cache* if we no longer require them. If the - /// block was imported full or erred, we no longer require them. - fn remove_notified( - &self, - block_root: &Hash256, - r: Result, - ) -> Result { - let has_missing_components = - matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _))); - if !has_missing_components { - self.reqresp_pre_import_cache.write().remove(block_root); - } - r - } - - /// Wraps `process_block` in logic to cache the block's commitments in the processing cache - /// and evict if the block was imported or errored. - pub async fn process_block_with_early_caching>( - self: &Arc, - block_root: Hash256, - unverified_block: B, - block_source: BlockImportSource, - notify_execution_layer: NotifyExecutionLayer, - ) -> Result { - self.reqresp_pre_import_cache - .write() - .insert(block_root, unverified_block.block_cloned()); - - let r = self - .process_block( - block_root, - unverified_block, - notify_execution_layer, - block_source, - || Ok(()), - ) - .await; - self.remove_notified(&block_root, r) - } - /// Check for known and configured invalid block roots before processing. pub fn check_invalid_block_roots(&self, block_root: Hash256) -> Result<(), BlockError> { if self.config.invalid_block_roots.contains(&block_root) { @@ -3411,12 +3341,6 @@ impl BeaconChain { block_source: BlockImportSource, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { - // Start the Prometheus timer. - let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); - - // Increment the Prometheus counter for block processing requests. - metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); - let block_slot = unverified_block.block().slot(); // Set observed time if not already set. Usually this should be set by gossip or RPC, @@ -3431,6 +3355,15 @@ impl BeaconChain { ); } + self.data_availability_checker + .put_pre_execution_block(block_root, unverified_block.block_cloned())?; + + // Start the Prometheus timer. + let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); + + // Increment the Prometheus counter for block processing requests. + metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); + // A small closure to group the verification and import errors. let chain = self.clone(); let import_block = async move { @@ -3448,7 +3381,18 @@ impl BeaconChain { .set_time_consensus_verified(block_root, block_slot, timestamp) } - let executed_block = chain.into_executed_block(execution_pending).await?; + let executed_block = chain + .into_executed_block(execution_pending) + .await + .inspect_err(|_| { + // If the block fails execution for whatever reason (e.g. engine offline), + // and we keep it in the cache, then the node will NOT perform lookup and + // reprocess this block until the block is evicted from DA checker, causing the + // chain to get stuck temporarily if the block is canonical. Therefore we remove + // it from the cache if execution fails. + self.data_availability_checker + .remove_block_on_execution_error(&block_root); + })?; // Record the *additional* time it took to wait for execution layer verification. if let Some(timestamp) = self.slot_clock.now_duration() { @@ -3574,9 +3518,7 @@ impl BeaconChain { block: AvailabilityPendingExecutedBlock, ) -> Result { let slot = block.block.slot(); - let availability = self - .data_availability_checker - .put_pending_executed_block(block)?; + let availability = self.data_availability_checker.put_executed_block(block)?; self.process_availability(slot, availability, || Ok(())) .await } @@ -7156,12 +7098,6 @@ impl BeaconChain { ) } - pub fn metrics(&self) -> BeaconChainMetrics { - BeaconChainMetrics { - reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(), - } - } - pub(crate) fn get_blobs_or_columns_store_op( &self, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 5e7aa7d4f87..de821f5f4a7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -997,7 +997,6 @@ where validator_pubkey_cache: RwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), early_attester_cache: <_>::default(), - reqresp_pre_import_cache: <_>::default(), light_client_server_cache: LightClientServerCache::new(), light_client_server_tx: self.light_client_server_tx, shutdown_sender: self diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 307dc0e227a..f41b6e73348 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -7,7 +7,9 @@ use crate::block_verification_types::{ use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; -use crate::{BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext, metrics}; +use crate::{ + BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics, +}; use kzg::Kzg; use slot_clock::SlotClock; use std::fmt; @@ -27,6 +29,7 @@ mod error; mod overflow_lru_cache; mod state_lru_cache; +use crate::data_availability_checker::error::Error; use crate::data_column_verification::{ CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, verify_kzg_for_data_column_list, @@ -141,14 +144,12 @@ impl DataAvailabilityChecker { &self.custody_context } - /// Checks if the block root is currenlty in the availability cache awaiting import because + /// Checks if the block root is currently in the availability cache awaiting import because /// of missing components. - pub fn get_execution_valid_block( - &self, - block_root: &Hash256, - ) -> Option>> { - self.availability_cache - .get_execution_valid_block(block_root) + /// + /// Returns the cache block wrapped in a `BlockProcessStatus` enum if it exists. + pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { + self.availability_cache.get_cached_block(block_root) } /// Return the set of cached blob indexes for `block_root`. Returns None if there is no block @@ -337,12 +338,29 @@ impl DataAvailabilityChecker { /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. - pub fn put_pending_executed_block( + pub fn put_executed_block( &self, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { + self.availability_cache.put_executed_block(executed_block) + } + + /// Inserts a pre-execution block into the cache. + /// This does NOT override an existing executed block. + pub fn put_pre_execution_block( + &self, + block_root: Hash256, + block: Arc>, + ) -> Result<(), Error> { + self.availability_cache + .put_pre_execution_block(block_root, block) + } + + /// Removes a pre-execution block from the cache. + /// This does NOT remove an existing executed block. + pub fn remove_block_on_execution_error(&self, block_root: &Hash256) { self.availability_cache - .put_pending_executed_block(executed_block) + .remove_pre_execution_block(block_root); } /// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6afb680ddb8..bb440096627 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -1,6 +1,5 @@ use super::AvailableBlockData; use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache}; -use crate::BeaconChainTypes; use crate::CustodyContext; use crate::beacon_chain::BeaconStore; use crate::blob_verification::KzgVerifiedBlob; @@ -9,6 +8,7 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::{BeaconChainTypes, BlockProcessStatus}; use lighthouse_tracing::SPAN_PENDING_COMPONENTS; use lru::LruCache; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -16,12 +16,46 @@ use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; use tracing::{Span, debug, debug_span}; +use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::BlobIdentifier; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, RuntimeFixedVector, RuntimeVariableList, SignedBeaconBlock, }; +#[derive(Clone)] +pub enum CachedBlock { + PreExecution(Arc>), + Executed(Box>), +} + +impl CachedBlock { + pub fn get_commitments(&self) -> KzgCommitments { + let block = self.as_block(); + block + .message() + .body() + .blob_kzg_commitments() + .cloned() + .unwrap_or_default() + } + + fn as_block(&self) -> &SignedBeaconBlock { + match self { + CachedBlock::PreExecution(b) => b, + CachedBlock::Executed(b) => b.as_block(), + } + } + + pub fn num_blobs_expected(&self) -> usize { + self.as_block() + .message() + .body() + .blob_kzg_commitments() + .map_or(0, |commitments| commitments.len()) + } +} + /// This represents the components of a partially available block /// /// The blobs are all gossip and kzg verified. @@ -39,22 +73,25 @@ pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: RuntimeFixedVector>>, pub verified_data_columns: Vec>, - pub executed_block: Option>, + pub block: Option>, pub reconstruction_started: bool, span: Span, } impl PendingComponents { - /// Returns an immutable reference to the cached block. - pub fn get_cached_block(&self) -> &Option> { - &self.executed_block - } - /// Returns an immutable reference to the fixed vector of cached blobs. pub fn get_cached_blobs(&self) -> &RuntimeFixedVector>> { &self.verified_blobs } + #[cfg(test)] + fn get_diet_block(&self) -> Option<&DietAvailabilityPendingExecutedBlock> { + self.block.as_ref().and_then(|block| match block { + CachedBlock::Executed(block) => Some(block.as_ref()), + _ => None, + }) + } + /// Returns an immutable reference to the cached data column. pub fn get_cached_data_column( &self, @@ -66,11 +103,6 @@ impl PendingComponents { .map(|d| d.clone_arc()) } - /// Returns a mutable reference to the cached block. - pub fn get_cached_block_mut(&mut self) -> &mut Option> { - &mut self.executed_block - } - /// Returns a mutable reference to the fixed vector of cached blobs. pub fn get_cached_blobs_mut(&mut self) -> &mut RuntimeFixedVector>> { &mut self.verified_blobs @@ -96,9 +128,17 @@ impl PendingComponents { .collect() } - /// Inserts a block into the cache. - pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { - *self.get_cached_block_mut() = Some(block) + /// Inserts an executed block into the cache. + pub fn insert_executed_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { + self.block = Some(CachedBlock::Executed(Box::new(block))) + } + + /// Inserts a pre-execution block into the cache. + /// This does NOT override an existing executed block. + pub fn insert_pre_execution_block(&mut self, block: Arc>) { + if self.block.is_none() { + self.block = Some(CachedBlock::PreExecution(block)) + } } /// Inserts a blob at a specific index in the cache. @@ -128,7 +168,7 @@ impl PendingComponents { /// 1. The blob entry at the index is empty and no block exists, or /// 2. The block exists and its commitment matches the blob's commitment. pub fn merge_single_blob(&mut self, index: usize, blob: KzgVerifiedBlob) { - if let Some(cached_block) = self.get_cached_block() { + if let Some(cached_block) = &self.block { let block_commitment_opt = cached_block.get_commitments().get(index).copied(); if let Some(block_commitment) = block_commitment_opt && block_commitment == *blob.get_commitment() @@ -158,7 +198,7 @@ impl PendingComponents { /// /// Blobs that don't match the new block's commitments are evicted. pub fn merge_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { - self.insert_block(block); + self.insert_executed_block(block); let reinsert = self.get_cached_blobs_mut().take(); self.merge_blobs(reinsert); } @@ -180,7 +220,7 @@ impl PendingComponents { &Span, ) -> Result, AvailabilityCheckError>, { - let Some(block) = &self.executed_block else { + let Some(CachedBlock::Executed(block)) = &self.block else { // Block not available yet return Ok(None); }; @@ -267,7 +307,7 @@ impl PendingComponents { block, import_data, payload_verification_outcome, - } = recover(block.clone(), &self.span)?; + } = recover(*block.clone(), &self.span)?; let available_block = AvailableBlock { block_root: self.block_root, @@ -295,7 +335,7 @@ impl PendingComponents { block_root, verified_blobs: RuntimeFixedVector::new(vec![None; max_len]), verified_data_columns: vec![], - executed_block: None, + block: None, reconstruction_started: false, span, } @@ -307,9 +347,9 @@ impl PendingComponents { /// - The first data column /// Otherwise, returns None pub fn epoch(&self) -> Option { - // Get epoch from cached executed block - if let Some(executed_block) = &self.executed_block { - return Some(executed_block.as_block().epoch()); + // Get epoch from cached block + if let Some(block) = &self.block { + return Some(block.as_block().epoch()); } // Or, get epoch from first available blob @@ -326,7 +366,7 @@ impl PendingComponents { } pub fn status_str(&self, num_expected_columns_opt: Option) -> String { - let block_count = if self.executed_block.is_some() { 1 } else { 0 }; + let block_count = if self.block.is_some() { 1 } else { 0 }; if let Some(num_expected_columns) = num_expected_columns_opt { format!( "block {} data_columns {}/{}", @@ -335,7 +375,7 @@ impl PendingComponents { num_expected_columns ) } else { - let num_expected_blobs = if let Some(block) = self.get_cached_block() { + let num_expected_blobs = if let Some(block) = &self.block { &block.num_blobs_expected().to_string() } else { "?" @@ -387,18 +427,17 @@ impl DataAvailabilityCheckerInner { } /// Returns true if the block root is known, without altering the LRU ordering - pub fn get_execution_valid_block( - &self, - block_root: &Hash256, - ) -> Option>> { + pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { self.critical .read() .peek(block_root) .and_then(|pending_components| { - pending_components - .executed_block - .as_ref() - .map(|block| block.block_cloned()) + pending_components.block.as_ref().map(|block| match block { + CachedBlock::PreExecution(b) => BlockProcessStatus::NotValidated(b.clone()), + CachedBlock::Executed(b) => { + BlockProcessStatus::ExecutionValidated(b.block_cloned()) + } + }) }) } @@ -647,9 +686,46 @@ impl DataAvailabilityCheckerInner { } } + /// Inserts a pre executed block into the cache. + /// - This does NOT trigger the availability check as the block still needs to be executed. + /// - This does NOT override an existing cached block to avoid overwriting an executed block. + pub fn put_pre_execution_block( + &self, + block_root: Hash256, + block: Arc>, + ) -> Result<(), AvailabilityCheckError> { + let epoch = block.epoch(); + let pending_components = + self.update_or_insert_pending_components(block_root, epoch, |pending_components| { + pending_components.insert_pre_execution_block(block); + Ok(()) + })?; + + let num_expected_columns_opt = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "pre execution block", + status = pending_components.status_str(num_expected_columns_opt), + "Component added to data availability checker" + ); + }); + + Ok(()) + } + + /// Removes a pre-execution block from the cache. + /// This does NOT remove an existing executed block. + pub fn remove_pre_execution_block(&self, block_root: &Hash256) { + // The read lock is immediately dropped so we can safely remove the block from the cache. + if let Some(BlockProcessStatus::NotValidated(_)) = self.get_cached_block(block_root) { + self.critical.write().pop(block_root); + } + } + /// Check if we have all the blobs for a block. If we do, return the Availability variant that /// triggers import of the block. - pub fn put_pending_executed_block( + pub fn put_executed_block( &self, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { @@ -667,14 +743,7 @@ impl DataAvailabilityCheckerInner { Ok(()) })?; - let num_expected_columns_opt = if self.spec.is_peer_das_enabled_for_epoch(epoch) { - let num_of_column_samples = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); - Some(num_of_column_samples) - } else { - None - }; + let num_expected_columns_opt = self.get_num_expected_columns(epoch); pending_components.span.in_scope(|| { debug!( @@ -691,6 +760,17 @@ impl DataAvailabilityCheckerInner { ) } + fn get_num_expected_columns(&self, epoch: Epoch) -> Option { + if self.spec.is_peer_das_enabled_for_epoch(epoch) { + let num_of_column_samples = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + Some(num_of_column_samples) + } else { + None + } + } + /// maintain the cache pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { // clean up any lingering states in the state cache @@ -964,7 +1044,7 @@ mod test { ); assert!(cache.critical.read().is_empty(), "cache should be empty"); let availability = cache - .put_pending_executed_block(pending_block) + .put_executed_block(pending_block) .expect("should put block"); if blobs_expected == 0 { assert!( @@ -1031,7 +1111,7 @@ mod test { ); } let availability = cache - .put_pending_executed_block(pending_block) + .put_executed_block(pending_block) .expect("should put block"); assert!( matches!(availability, Availability::Available(_)), @@ -1093,7 +1173,7 @@ mod test { // put the block in the cache let availability = cache - .put_pending_executed_block(pending_block) + .put_executed_block(pending_block) .expect("should put block"); // grab the diet block from the cache for later testing @@ -1101,12 +1181,7 @@ mod test { .critical .read() .peek(&block_root) - .map(|pending_components| { - pending_components - .executed_block - .clone() - .expect("should exist") - }) + .and_then(|pending_components| pending_components.get_diet_block().cloned()) .expect("should exist"); pushed_diet_blocks.push_back(diet_block); @@ -1267,7 +1342,7 @@ mod pending_components_tests { } pub fn assert_cache_consistent(cache: PendingComponents, max_len: usize) { - if let Some(cached_block) = cache.get_cached_block() { + if let Some(cached_block) = &cache.block { let cached_block_commitments = cached_block.get_commitments(); for index in 0..max_len { let block_commitment = cached_block_commitments.get(index).copied(); @@ -1373,4 +1448,33 @@ mod pending_components_tests { assert_cache_consistent(cache, max_len); } + + #[test] + fn should_not_insert_pre_execution_block_if_executed_block_exists() { + let (pre_execution_block, blobs, random_blobs, max_len) = pre_setup(); + let (executed_block, _blobs, _random_blobs) = + setup_pending_components(pre_execution_block.clone(), blobs, random_blobs); + + let block_root = pre_execution_block.canonical_root(); + let mut pending_component = >::empty(block_root, max_len); + + let pre_execution_block = Arc::new(pre_execution_block); + pending_component.insert_pre_execution_block(pre_execution_block.clone()); + assert!( + matches!(pending_component.block, Some(CachedBlock::PreExecution(_))), + "pre execution block inserted" + ); + + pending_component.insert_executed_block(executed_block); + assert!( + matches!(pending_component.block, Some(CachedBlock::Executed(_))), + "executed block inserted" + ); + + pending_component.insert_pre_execution_block(pre_execution_block); + assert!( + matches!(pending_component.block, Some(CachedBlock::Executed(_))), + "executed block should remain" + ); + } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 57c236efcf6..24f9237e3c9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -10,7 +10,6 @@ use state_processing::BlockReplayer; use std::sync::Arc; use store::OnDiskConsensusContext; use tracing::{Span, debug_span, instrument}; -use types::beacon_block_body::KzgCommitments; use types::{BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// This mirrors everything in the `AvailabilityPendingExecutedBlock`, except @@ -43,15 +42,6 @@ impl DietAvailabilityPendingExecutedBlock { .map_or(0, |commitments| commitments.len()) } - pub fn get_commitments(&self) -> KzgCommitments { - self.as_block() - .message() - .body() - .blob_kzg_commitments() - .cloned() - .unwrap_or_default() - } - /// Returns the epoch corresponding to `self.slot()`. pub fn epoch(&self) -> Epoch { self.block.slot().epoch(E::slots_per_epoch()) diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 3da3cf163a4..0d34ffdcd15 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -458,12 +458,6 @@ pub static BEACON_EARLY_ATTESTER_CACHE_HITS: LazyLock> = Lazy ) }); -pub static BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE: LazyLock> = LazyLock::new(|| { - try_create_int_gauge( - "beacon_reqresp_pre_import_cache_size", - "Current count of items of the reqresp pre import cache", - ) -}); pub static BEACON_REQRESP_PRE_IMPORT_CACHE_HITS: LazyLock> = LazyLock::new(|| { try_create_int_counter( @@ -1965,7 +1959,6 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { } let attestation_stats = beacon_chain.op_pool.attestation_stats(); - let chain_metrics = beacon_chain.metrics(); // Kept duplicated for backwards compatibility set_gauge_by_usize( @@ -1973,11 +1966,6 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { beacon_chain.store.state_cache_len(), ); - set_gauge_by_usize( - &BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE, - chain_metrics.reqresp_pre_import_cache_len, - ); - let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); set_gauge_by_usize( &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index b3d717142f5..5fc94c29587 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1500,11 +1500,12 @@ impl NetworkBeaconProcessor { let result = self .chain - .process_block_with_early_caching( + .process_block( block_root, verified_block, - BlockImportSource::Gossip, NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), ) .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "block"); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index edeed7e98cf..f39b1b57d49 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -167,11 +167,12 @@ impl NetworkBeaconProcessor { let signed_beacon_block = block.block_cloned(); let result = self .chain - .process_block_with_early_caching( + .process_block( block_root, block, - BlockImportSource::Lookup, NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), ) .await; register_process_result_metrics(&result, metrics::BlockSource::Rpc, "block"); diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 2edcd12f019..27968a06351 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1079,7 +1079,7 @@ impl TestRig { .harness .chain .data_availability_checker - .put_pending_executed_block(executed_block) + .put_executed_block(executed_block) .unwrap() { Availability::Available(_) => panic!("block removed from da_checker, available"), @@ -1109,20 +1109,19 @@ impl TestRig { }; } - fn insert_block_to_processing_cache(&mut self, block: Arc>) { + fn insert_block_to_availability_cache(&mut self, block: Arc>) { self.harness .chain - .reqresp_pre_import_cache - .write() - .insert(block.canonical_root(), block); + .data_availability_checker + .put_pre_execution_block(block.canonical_root(), block) + .unwrap(); } fn simulate_block_gossip_processing_becomes_invalid(&mut self, block_root: Hash256) { self.harness .chain - .reqresp_pre_import_cache - .write() - .remove(&block_root); + .data_availability_checker + .remove_block_on_execution_error(&block_root); self.send_sync_message(SyncMessage::GossipBlockProcessResult { block_root, @@ -1135,11 +1134,6 @@ impl TestRig { block: Arc>, ) { let block_root = block.canonical_root(); - self.harness - .chain - .reqresp_pre_import_cache - .write() - .remove(&block_root); self.insert_block_to_da_checker(block); @@ -1841,7 +1835,7 @@ fn block_in_processing_cache_becomes_invalid() { let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); let block_root = block.canonical_root(); let peer_id = r.new_connected_peer(); - r.insert_block_to_processing_cache(block.clone().into()); + r.insert_block_to_availability_cache(block.clone().into()); r.trigger_unknown_block_from_attestation(block_root, peer_id); // Should trigger blob request let id = r.expect_blob_lookup_request(block_root); @@ -1867,7 +1861,7 @@ fn block_in_processing_cache_becomes_valid_imported() { let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); let block_root = block.canonical_root(); let peer_id = r.new_connected_peer(); - r.insert_block_to_processing_cache(block.clone().into()); + r.insert_block_to_availability_cache(block.clone().into()); r.trigger_unknown_block_from_attestation(block_root, peer_id); // Should trigger blob request let id = r.expect_blob_lookup_request(block_root);