diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5ffdf951ac1..1d86ce8e342 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7137,25 +7137,30 @@ impl BeaconChain { /// Compare columns custodied for `epoch` versus columns custodied for the head of the chain /// and return any column indices that are missing. - pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet { + pub fn get_missing_columns_for_epoch( + &self, + epoch: Epoch, + ) -> Result, String> { let custody_context = self.data_availability_checker.custody_context(); let columns_required = custody_context - .custody_columns_for_epoch(None, &self.spec) + .custody_columns_for_epoch(None, &self.spec)? .iter() .cloned() .collect::>(); let current_columns_at_epoch = custody_context - .custody_columns_for_epoch(Some(epoch), &self.spec) + .custody_columns_for_epoch(Some(epoch), &self.spec)? .iter() .cloned() .collect::>(); - columns_required + let missing_columns = columns_required .difference(¤t_columns_at_epoch) .cloned() - .collect::>() + .collect::>(); + + Ok(missing_columns) } /// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch. @@ -7442,7 +7447,7 @@ impl BeaconChain { AvailableBlockData::DataColumns(mut data_columns) => { let columns_to_custody = self.custody_columns_for_epoch(Some( block_slot.epoch(T::EthSpec::slots_per_epoch()), - )); + ))?; // Supernodes need to persist all sampled custody columns if columns_to_custody.len() != self.spec.number_of_custody_groups as usize { data_columns @@ -7485,7 +7490,7 @@ impl BeaconChain { /// Returns a list of column indices that should be sampled for a given epoch. /// Used for data availability sampling in PeerDAS. - pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] { + pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> Result<&[ColumnIndex], String> { self.data_availability_checker .custody_context() .sampling_columns_for_epoch(epoch, &self.spec) @@ -7496,7 +7501,10 @@ impl BeaconChain { /// serve them to peers. /// /// If epoch is `None`, this function computes the custody columns at head. - pub fn custody_columns_for_epoch(&self, epoch_opt: Option) -> &[ColumnIndex] { + pub fn custody_columns_for_epoch( + &self, + epoch_opt: Option, + ) -> Result<&[ColumnIndex], String> { self.data_availability_checker .custody_context() .custody_columns_for_epoch(epoch_opt, &self.spec) diff --git a/beacon_node/beacon_chain/src/custody_context.rs b/beacon_node/beacon_chain/src/custody_context.rs index a5ef3ed2f65..ddb555ccf65 100644 --- a/beacon_node/beacon_chain/src/custody_context.rs +++ b/beacon_node/beacon_chain/src/custody_context.rs @@ -313,9 +313,7 @@ impl CustodyContext { let old_custody_group_count = validator_custody_at_head; validator_custody_at_head = cgc_from_cli; - let sampling_count = spec - .sampling_size_custody_groups(cgc_from_cli) - .expect("should compute node sampling size from valid chain spec"); + let sampling_count = spec.sampling_size_custody_groups(cgc_from_cli); epoch_validator_custody_requirements.push((effective_epoch, cgc_from_cli)); @@ -469,14 +467,16 @@ impl CustodyContext { pub fn num_of_custody_groups_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); spec.sampling_size_custody_groups(custody_group_count) - .expect("should compute node sampling size from valid chain spec") } /// Returns the count of columns this node must _sample_ for a block at `epoch` to import. - pub fn num_of_data_columns_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> usize { + pub fn num_of_data_columns_to_sample( + &self, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result { let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); spec.sampling_size_columns::(custody_group_count) - .expect("should compute node sampling size from valid chain spec") } /// Returns whether the node should attempt reconstruction at a given epoch. @@ -484,7 +484,8 @@ impl CustodyContext { let min_columns_for_reconstruction = E::number_of_columns() / 2; // performing reconstruction is not necessary if sampling column count is exactly 50%, // because the node doesn't need the remaining columns. - self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction + self.num_of_data_columns_to_sample(epoch, spec) + .is_ok_and(|sample_count| sample_count > min_columns_for_reconstruction) } /// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch. @@ -495,13 +496,17 @@ impl CustodyContext { /// /// # Returns /// A slice of ordered column indices that should be sampled for this epoch based on the node's custody configuration - pub fn sampling_columns_for_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> &[ColumnIndex] { - let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec); + pub fn sampling_columns_for_epoch( + &self, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result<&[ColumnIndex], String> { + let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec)?; let all_columns_ordered = self .all_custody_columns_ordered .get() - .expect("all_custody_columns_ordered should be initialized"); - &all_columns_ordered[..num_of_columns_to_sample] + .ok_or("Custody context has not been initialised")?; + Ok(&all_columns_ordered[..num_of_columns_to_sample]) } /// Returns the ordered list of column indices that the node is assigned to custody @@ -521,7 +526,7 @@ impl CustodyContext { &self, epoch_opt: Option, spec: &ChainSpec, - ) -> &[ColumnIndex] { + ) -> Result<&[ColumnIndex], String> { let custody_group_count = if let Some(epoch) = epoch_opt { self.custody_group_count_at_epoch(epoch, spec) as usize } else { @@ -531,9 +536,9 @@ impl CustodyContext { let all_columns_ordered = self .all_custody_columns_ordered .get() - .expect("all_custody_columns_ordered should be initialized"); + .ok_or("Custody context has not been initialised")?; - &all_columns_ordered[..custody_group_count] + Ok(&all_columns_ordered[..custody_group_count]) } /// The node has completed backfill for this epoch. Update the internal records so the function @@ -699,8 +704,7 @@ mod tests { ); assert_eq!( cgc_changed.sampling_count, - spec.sampling_size_custody_groups(expected_new_cgc) - .expect("should compute sampling size"), + spec.sampling_size_custody_groups(expected_new_cgc), "sampling_count should match expected value" ); @@ -1025,7 +1029,9 @@ mod tests { fn should_init_ordered_data_columns_and_return_sampling_columns() { let spec = E::default_spec(); let custody_context = CustodyContext::::new(NodeCustodyType::Fullnode, &spec); - let sampling_size = custody_context.num_of_data_columns_to_sample(Epoch::new(0), &spec); + let sampling_size = custody_context + .num_of_data_columns_to_sample(Epoch::new(0), &spec) + .unwrap(); // initialise ordered columns let mut all_custody_groups_ordered = (0..spec.number_of_custody_groups).collect::>(); @@ -1038,8 +1044,9 @@ mod tests { ) .expect("should initialise ordered data columns"); - let actual_sampling_columns = - custody_context.sampling_columns_for_epoch(Epoch::new(0), &spec); + let actual_sampling_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .unwrap(); let expected_sampling_columns = &all_custody_groups_ordered .iter() @@ -1085,7 +1092,10 @@ mod tests { .expect("should initialise ordered data columns"); assert_eq!( - custody_context.custody_columns_for_epoch(None, &spec).len(), + custody_context + .custody_columns_for_epoch(None, &spec) + .unwrap() + .len(), spec.custody_requirement as usize ); } @@ -1101,7 +1111,10 @@ mod tests { .expect("should initialise ordered data columns"); assert_eq!( - custody_context.custody_columns_for_epoch(None, &spec).len(), + custody_context + .custody_columns_for_epoch(None, &spec) + .unwrap() + .len(), spec.number_of_custody_groups as usize ); } @@ -1127,7 +1140,10 @@ mod tests { ); assert_eq!( - custody_context.custody_columns_for_epoch(None, &spec).len(), + custody_context + .custody_columns_for_epoch(None, &spec) + .unwrap() + .len(), val_custody_units as usize ); } @@ -1147,6 +1163,7 @@ mod tests { assert_eq!( custody_context .custody_columns_for_epoch(Some(test_epoch), &spec) + .unwrap() .len(), expected_cgc as usize ); @@ -1413,6 +1430,7 @@ mod tests { assert_eq!( custody_context .custody_columns_for_epoch(Some(Epoch::new(15)), &spec) + .unwrap() .len(), final_cgc as usize, ); diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 644c4716985..713ae10449a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -258,7 +258,8 @@ impl DataAvailabilityChecker { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let sampling_columns = self .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); + .sampling_columns_for_epoch(epoch, &self.spec) + .map_err(AvailabilityCheckError::CustodyContextError)?; let verified_custody_columns = kzg_verified_columns .into_iter() .filter(|col| sampling_columns.contains(&col.index())) @@ -315,7 +316,8 @@ impl DataAvailabilityChecker { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let sampling_columns = self .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); + .sampling_columns_for_epoch(epoch, &self.spec) + .map_err(AvailabilityCheckError::CustodyContextError)?; let custody_columns = data_columns .into_iter() .filter(|col| sampling_columns.contains(&col.index())) @@ -624,7 +626,8 @@ impl DataAvailabilityChecker { let columns_to_sample = self .custody_context() - .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec); + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec) + .map_err(AvailabilityCheckError::CustodyContextError)?; // We only need to import and publish columns that we need to sample // and columns that we haven't already received @@ -904,7 +907,9 @@ mod test { &spec, ); assert_eq!( - custody_context.num_of_data_columns_to_sample(epoch, &spec), + custody_context + .num_of_data_columns_to_sample(epoch, &spec) + .unwrap(), spec.validator_custody_requirement as usize, "sampling size should be the minimal custody requirement == 8" ); @@ -939,7 +944,9 @@ mod test { .expect("should put rpc custody columns"); // THEN the sampling size for the end slot of the same epoch remains unchanged - let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let sampling_columns = custody_context + .sampling_columns_for_epoch(epoch, &spec) + .unwrap(); assert_eq!( sampling_columns.len(), spec.validator_custody_requirement as usize // 8 @@ -983,7 +990,9 @@ mod test { &spec, ); assert_eq!( - custody_context.num_of_data_columns_to_sample(epoch, &spec), + custody_context + .num_of_data_columns_to_sample(epoch, &spec) + .unwrap(), spec.validator_custody_requirement as usize, "sampling size should be the minimal custody requirement == 8" ); @@ -1017,7 +1026,9 @@ mod test { .expect("should put gossip custody columns"); // THEN the sampling size for the end slot of the same epoch remains unchanged - let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let sampling_columns = custody_context + .sampling_columns_for_epoch(epoch, &spec) + .unwrap(); assert_eq!( sampling_columns.len(), spec.validator_custody_requirement as usize // 8 @@ -1106,7 +1117,9 @@ mod test { Slot::new(0), &spec, ); - let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch, &spec); + let sampling_requirement = custody_context + .num_of_data_columns_to_sample(epoch, &spec) + .unwrap(); assert_eq!( sampling_requirement, 65, "sampling requirement should be 65" @@ -1164,7 +1177,9 @@ mod test { ); // Only the columns required for custody (65) should be imported into the cache - let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let sampling_columns = custody_context + .sampling_columns_for_epoch(epoch, &spec) + .unwrap(); let actual_cached: HashSet = da_checker .cached_data_column_indexes(&block_root) .expect("should have cached data columns") diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index c9efb7a4149..41bf45a3919 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,6 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, + CustodyContextError(String), Unexpected(String), SszTypes(ssz_types::Error), MissingBlobs, @@ -44,6 +45,7 @@ impl Error { | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) + | Error::CustodyContextError(_) | Error::SlotClockError => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 5e6322ae95a..bfb2230880f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -6,6 +6,7 @@ use crate::blob_verification::KzgVerifiedBlob; use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; +use crate::data_availability_checker::AvailabilityCheckError::CustodyContextError; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::{BeaconChainTypes, BlockProcessStatus}; @@ -558,7 +559,8 @@ impl DataAvailabilityCheckerInner { let num_expected_columns = self .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); + .num_of_data_columns_to_sample(epoch, &self.spec) + .map_err(AvailabilityCheckError::CustodyContextError)?; pending_components.span.in_scope(|| { debug!( @@ -664,9 +666,13 @@ impl DataAvailabilityCheckerInner { }; let total_column_count = T::EthSpec::number_of_columns(); - let sampling_column_count = self + let Ok(sampling_column_count) = self .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); + .num_of_data_columns_to_sample(epoch, &self.spec) + else { + return ReconstructColumnsDecision::No("custody context not initialised"); + }; + let received_column_count = pending_components.verified_data_columns.len(); if pending_components.reconstruction_started { @@ -709,7 +715,7 @@ impl DataAvailabilityCheckerInner { Ok(()) })?; - let num_expected_columns_opt = self.get_num_expected_columns(epoch); + let num_expected_columns_opt = self.get_num_expected_columns(epoch)?; pending_components.span.in_scope(|| { debug!( @@ -751,7 +757,7 @@ impl DataAvailabilityCheckerInner { Ok(()) })?; - let num_expected_columns_opt = self.get_num_expected_columns(epoch); + let num_expected_columns_opt = self.get_num_expected_columns(epoch)?; pending_components.span.in_scope(|| { debug!( @@ -768,14 +774,18 @@ impl DataAvailabilityCheckerInner { ) } - fn get_num_expected_columns(&self, epoch: Epoch) -> Option { + fn get_num_expected_columns( + &self, + epoch: Epoch, + ) -> Result, AvailabilityCheckError> { if self.spec.is_peer_das_enabled_for_epoch(epoch) { let num_of_column_samples = self .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); - Some(num_of_column_samples) + .num_of_data_columns_to_sample(epoch, &self.spec) + .map_err(CustodyContextError)?; + Ok(Some(num_of_column_samples)) } else { - None + Ok(None) } } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index 9526921da73..e296a65de30 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -8,7 +8,7 @@ use mockall::automock; use std::collections::HashSet; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{ChainSpec, ColumnIndex, Hash256, Slot}; +use types::{ChainSpec, ColumnIndex, Epoch, Hash256, Slot}; /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. pub(crate) struct FetchBlobsBeaconAdapter { @@ -121,4 +121,13 @@ impl FetchBlobsBeaconAdapter { .fork_choice_read_lock() .contains_block(block_root) } + + pub(crate) fn sampling_columns_for_epoch( + &self, + epoch: Epoch, + ) -> Result, String> { + self.chain + .sampling_columns_for_epoch(epoch) + .map(|column_indices| column_indices.to_vec()) + } } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 4c6b2d10a95..4acd86db03a 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -36,7 +36,7 @@ use tracing::{Span, debug, instrument, warn}; use types::blob_sidecar::BlobSidecarError; use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs, + BeaconStateError, Blob, BlobSidecar, EthSpec, FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, }; @@ -62,6 +62,7 @@ pub enum FetchEngineBlobError { GossipBlob(GossipBlobError), KzgError(kzg::Error), RequestFailed(ExecutionLayerError), + CustodyContextError(String), RuntimeShutdown, TokioJoin(tokio::task::JoinError), } @@ -73,14 +74,12 @@ pub async fn fetch_and_process_engine_blobs( chain: Arc>, block_root: Hash256, block: Arc>>, - custody_columns: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { fetch_and_process_engine_blobs_inner( FetchBlobsBeaconAdapter::new(chain), block_root, block, - custody_columns, publish_fn, ) .await @@ -92,7 +91,6 @@ async fn fetch_and_process_engine_blobs_inner( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>>, - custody_columns: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block @@ -125,7 +123,6 @@ async fn fetch_and_process_engine_blobs_inner( block_root, block, versioned_hashes, - custody_columns, publish_fn, ) .await @@ -240,7 +237,6 @@ async fn fetch_and_process_blobs_v2( block_root: Hash256, block: Arc>, versioned_hashes: Vec, - custody_columns_indices: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); @@ -296,15 +292,9 @@ async fn fetch_and_process_blobs_v2( } let chain_adapter = Arc::new(chain_adapter); - let custody_columns_to_import = compute_custody_columns_to_import( - &chain_adapter, - block_root, - block.clone(), - blobs, - proofs, - custody_columns_indices, - ) - .await?; + let custody_columns_to_import = + compute_custody_columns_to_import(&chain_adapter, block_root, block.clone(), blobs, proofs) + .await?; if custody_columns_to_import.is_empty() { debug!( @@ -339,12 +329,10 @@ async fn compute_custody_columns_to_import( block: Arc>>, blobs: Vec>, proofs: Vec>, - custody_columns_indices: &[ColumnIndex], ) -> Result>, FetchEngineBlobError> { let kzg = chain_adapter.kzg().clone(); let spec = chain_adapter.spec().clone(); let chain_adapter_cloned = chain_adapter.clone(); - let custody_columns_indices = custody_columns_indices.to_vec(); let current_span = Span::current(); chain_adapter .executor() @@ -366,11 +354,15 @@ async fn compute_custody_columns_to_import( // This filtering ensures we only import and publish the custody columns. // `DataAvailabilityChecker` requires a strict match on custody columns count to // consider a block available. + let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let custody_column_indices = chain_adapter_cloned + .sampling_columns_for_epoch(block_epoch) + .map_err(FetchEngineBlobError::CustodyContextError)?; let mut custody_columns = data_columns_result .map(|data_columns| { data_columns .into_iter() - .filter(|col| custody_columns_indices.contains(&col.index)) + .filter(|col| custody_column_indices.contains(&col.index)) .map(|col| { KzgVerifiedCustodyDataColumn::from_asserted_custody( KzgVerifiedDataColumn::from_execution_verified(col), diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index cbe2f78fbda..4d5987d5774 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -12,8 +12,8 @@ use maplit::hashset; use std::sync::{Arc, Mutex}; use task_executor::test_utils::TestRuntime; use types::{ - BeaconBlock, BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, - SignedBeaconBlock, SignedBeaconBlockFulu, + BeaconBlock, BeaconBlockFulu, ColumnIndex, EmptyBlock, EthSpec, ForkName, Hash256, + MainnetEthSpec, SignedBeaconBlock, SignedBeaconBlockFulu, }; type E = MainnetEthSpec; @@ -21,7 +21,6 @@ type T = EphemeralHarnessType; mod get_blobs_v2 { use super::*; - use types::ColumnIndex; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_fetch_blobs_v2_no_blobs_in_block() { @@ -37,12 +36,10 @@ mod get_blobs_v2 { mock_adapter.expect_get_blobs_v2().times(0); mock_adapter.expect_process_engine_blobs().times(0); - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, Arc::new(block), - &custody_columns, publish_fn, ) .await @@ -62,16 +59,10 @@ mod get_blobs_v2 { mock_get_blobs_v2_response(&mut mock_adapter, None); // Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); assert_eq!(processing_status, None); } @@ -90,16 +81,10 @@ mod get_blobs_v2 { mock_adapter.expect_process_engine_blobs().times(0); // Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); assert_eq!(processing_status, None); assert_eq!( @@ -123,16 +108,10 @@ mod get_blobs_v2 { mock_adapter.expect_process_engine_blobs().times(0); // Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); assert_eq!(processing_status, None); assert_eq!( @@ -160,18 +139,13 @@ mod get_blobs_v2 { .returning(|_| Some(hashset![0, 1, 2])); // No blobs should be processed mock_adapter.expect_process_engine_blobs().times(0); + mock_sampling_columns_for_epoch(&mut mock_adapter, vec![0, 1, 2]); // **WHEN**: Trigger `fetch_blobs` on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); // **THEN**: Should NOT be processed and no columns should be published. assert_eq!(processing_status, None); @@ -202,18 +176,14 @@ mod get_blobs_v2 { &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), ); + let custody_columns = vec![0u64, 1, 2]; + mock_sampling_columns_for_epoch(&mut mock_adapter, custody_columns.clone()); // Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); assert_eq!( processing_status, @@ -253,7 +223,6 @@ mod get_blobs_v1 { use super::*; use crate::block_verification_types::AsBlock; use std::collections::HashSet; - use types::ColumnIndex; const ELECTRA_FORK: ForkName = ForkName::Electra; @@ -270,12 +239,10 @@ mod get_blobs_v1 { mock_adapter.expect_get_blobs_v1().times(0); // WHEN: Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, Arc::new(block_no_blobs), - &custody_columns, publish_fn, ) .await @@ -297,16 +264,10 @@ mod get_blobs_v1 { mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]); // WHEN: Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); // THEN: No blob is processed assert_eq!(processing_status, None); @@ -343,16 +304,10 @@ mod get_blobs_v1 { ); // WHEN: Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); // THEN: Returned blobs are processed and published assert_eq!( @@ -383,16 +338,10 @@ mod get_blobs_v1 { mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); // WHEN: Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); // THEN: Returned blobs should NOT be processed or published. assert_eq!(processing_status, None); @@ -431,16 +380,10 @@ mod get_blobs_v1 { .returning(move |_, _| Some(all_blob_indices.clone())); // **WHEN**: Trigger `fetch_blobs` on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); // **THEN**: Should NOT be processed and no blobs should be published. assert_eq!(processing_status, None); @@ -475,16 +418,10 @@ mod get_blobs_v1 { ); // Trigger fetch blobs on the block - let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; - let processing_status = fetch_and_process_engine_blobs_inner( - mock_adapter, - block_root, - block, - &custody_columns, - publish_fn, - ) - .await - .expect("fetch blobs should succeed"); + let processing_status = + fetch_and_process_engine_blobs_inner(mock_adapter, block_root, block, publish_fn) + .await + .expect("fetch blobs should succeed"); // THEN all fetched blobs are processed and published assert_eq!( @@ -539,6 +476,15 @@ fn mock_process_engine_blobs_result( .return_once(move |_, _, _| result); } +fn mock_sampling_columns_for_epoch( + mock_adapter: &mut MockFetchBlobsBeaconAdapter, + sampling_indices: Vec, +) { + mock_adapter + .expect_sampling_columns_for_epoch() + .return_const(Ok(sampling_indices)); +} + fn mock_fork_choice_contains_block( mock_adapter: &mut MockFetchBlobsBeaconAdapter, block_roots: Vec, diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9601618e927..1827b099280 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2455,7 +2455,7 @@ where ) -> Result, BlockError> { Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); - let sampling_columns = self.chain.sampling_columns_for_epoch(epoch); + let sampling_columns = self.chain.sampling_columns_for_epoch(epoch).unwrap(); if blob_items.is_some_and(|(_, blobs)| !blobs.is_empty()) { // Note: this method ignores the actual custody columns and just take the first @@ -3196,6 +3196,7 @@ where let epoch = block.slot().epoch(E::slots_per_epoch()); self.chain .sampling_columns_for_epoch(epoch) + .unwrap() .iter() .copied() .collect() diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 86bdb03dafd..d8901659c40 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -78,7 +78,7 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() { let slot = Slot::new(10); let epoch = slot.epoch(E::slots_per_epoch()); random_sidecar.signed_block_header.message.slot = slot; - random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0]; + random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch).unwrap()[0]; random_sidecar }; let gossip_verified_data_column = diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 638c221a7fa..5ac0048ec82 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -5092,6 +5092,7 @@ async fn test_custody_column_filtering_regular_node() { let expected_custody_columns: HashSet<_> = harness .chain .custody_columns_for_epoch(Some(current_slot.epoch(E::slots_per_epoch()))) + .unwrap() .iter() .copied() .collect(); @@ -5183,7 +5184,8 @@ async fn test_missing_columns_after_cgc_change() { let missing_columns = harness .chain - .get_missing_columns_for_epoch(epoch_before_increase); + .get_missing_columns_for_epoch(epoch_before_increase) + .unwrap(); // We should have no missing columns assert_eq!(missing_columns.len(), 0); @@ -5205,14 +5207,16 @@ async fn test_missing_columns_after_cgc_change() { // We should have missing columns from before the cgc increase let missing_columns = harness .chain - .get_missing_columns_for_epoch(epoch_before_increase); + .get_missing_columns_for_epoch(epoch_before_increase) + .unwrap(); assert!(!missing_columns.is_empty()); // We should have no missing columns after the cgc increase let missing_columns = harness .chain - .get_missing_columns_for_epoch(epoch_after_increase); + .get_missing_columns_for_epoch(epoch_after_increase) + .unwrap(); assert!(missing_columns.is_empty()); } diff --git a/beacon_node/http_api/src/custody.rs b/beacon_node/http_api/src/custody.rs index a43b55ceca4..72fd0852a65 100644 --- a/beacon_node/http_api/src/custody.rs +++ b/beacon_node/http_api/src/custody.rs @@ -41,6 +41,9 @@ pub fn info( let custody_context = chain.data_availability_checker.custody_context(); let custody_columns = custody_context .custody_columns_for_epoch(Some(earliest_custodied_data_column_epoch), &chain.spec) + .map_err(|e| { + custom_server_error(format!("custody context has not been initialised: {e:?}")) + })? .to_vec(); let custody_group_count = custody_context .custody_group_count_at_epoch(earliest_custodied_data_column_epoch, &chain.spec); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 05a4a4b7a4a..ce5bf0eebcc 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -237,7 +237,12 @@ pub async fn publish_block>( warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_columns_indices = chain.sampling_columns_for_epoch(epoch).map_err(|_| { + // This is unreachable as beacon chain must have been initialised before http server starts + warp_utils::reject::custom_server_error( + "beacon chain has not been initialised correctly".into(), + ) + })?; let sampling_columns = gossip_verified_columns .into_iter() .filter(|data_column| sampling_columns_indices.contains(&data_column.index())) diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 33f462fa5e2..cb649bc2656 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -2031,6 +2031,7 @@ fn get_custody_columns(tester: &InteractiveTester, slot: Slot) -> HashSet NetworkGlobals { // The below `expect` calls will panic on start up if the chain spec config values used // are invalid - let sampling_size = spec - .sampling_size_custody_groups(custody_group_count) - .expect("should compute node sampling size from valid chain spec"); + let sampling_size = spec.sampling_size_custody_groups(custody_group_count); let custody_groups = get_custody_groups(node_id, sampling_size, &spec) .expect("should compute node custody groups"); @@ -277,9 +275,7 @@ mod test { spec.fulu_fork_epoch = Some(Epoch::new(0)); let custody_group_count = spec.number_of_custody_groups / 2; - let sampling_size_custody_groups = spec - .sampling_size_custody_groups(custody_group_count) - .unwrap(); + let sampling_size_custody_groups = spec.sampling_size_custody_groups(custody_group_count); let expected_sampling_subnet_count = sampling_size_custody_groups * spec.data_column_sidecar_subnet_count / spec.number_of_custody_groups; diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index bebda36d71c..932b5348ed6 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -761,8 +761,6 @@ impl NetworkBeaconProcessor { if self.chain.config.disable_get_blobs { return; } - let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - let custody_columns = self.chain.sampling_columns_for_epoch(epoch); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { if publish_blobs { @@ -787,7 +785,6 @@ impl NetworkBeaconProcessor { self.chain.clone(), block_root, block.clone(), - custody_columns, publish_fn, ) .await diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index ac24b648e05..7c8d1155595 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -438,14 +438,15 @@ impl NetworkBeaconProcessor { ) -> Result<(), (RpcErrorResponse, &'static str)> { let mut send_data_column_count = 0; // Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of. - let available_columns = self.chain.custody_columns_for_epoch(None); + // Note: this may not be available during startup. This case is rare. We skip filtering to avoid getting penalised. + let available_columns_opt = self.chain.custody_columns_for_epoch(None).ok(); for data_column_ids_by_root in request.data_column_ids.as_slice() { let indices_to_retrieve = data_column_ids_by_root .columns .iter() .copied() - .filter(|c| available_columns.contains(c)) + .filter(|c| available_columns_opt.is_none_or(|columns| columns.contains(c))) .collect::>(); match self.chain.get_data_columns_checking_all_caches( data_column_ids_by_root.block_root, @@ -1258,15 +1259,17 @@ impl NetworkBeaconProcessor { // Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of. let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch()); - let available_columns = self + // Note: this may not be available during startup. This case is rare. We skip filtering to avoid getting penalised. + let available_columns_opt = self .chain - .custody_columns_for_epoch(Some(request_start_epoch)); + .custody_columns_for_epoch(Some(request_start_epoch)) + .ok(); let indices_to_retrieve = req .columns .iter() .copied() - .filter(|c| available_columns.contains(c)) + .filter(|c| available_columns_opt.is_none_or(|columns| columns.contains(c))) .collect::>(); for root in block_roots { @@ -1374,17 +1377,18 @@ impl NetworkBeaconProcessor { epoch_opt: Option, span: Span, ) { - let non_custody_indices = { - let custody_columns = self - .chain - .data_availability_checker - .custody_context() - .custody_columns_for_epoch(epoch_opt, &self.chain.spec); - requested_indices - .iter() - .filter(|subnet_id| !custody_columns.contains(subnet_id)) - .collect::>() - }; + let non_custody_indices = self + .chain + .data_availability_checker + .custody_context() + .custody_columns_for_epoch(epoch_opt, &self.chain.spec) + .map(|custody_columns| { + requested_indices + .iter() + .filter(|subnet_id| !custody_columns.contains(subnet_id)) + .collect::>() + }) + .unwrap_or_default(); // This field is used to identify if peers are sending requests on columns we don't custody. span.record("non_custody_indices", field::debug(non_custody_indices)); diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index a9794cb5c42..32b3a0544ff 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -298,7 +298,7 @@ impl TestRig { if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let kzg = get_kzg(&chain.spec); let epoch = block.slot().epoch(E::slots_per_epoch()); - let sampling_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_indices = chain.sampling_columns_for_epoch(epoch).unwrap(); let custody_columns: DataColumnSidecarList = blobs_to_data_column_sidecars( &blobs.iter().collect_vec(), kzg_proofs.clone().into_iter().collect_vec(), @@ -1936,7 +1936,8 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() { let all_custody_columns = rig .chain - .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); + .sampling_columns_for_epoch(rig.chain.epoch().unwrap()) + .unwrap(); let available_columns: Vec = all_custody_columns.to_vec(); let requested_columns = vec![available_columns[0], available_columns[2]]; diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index bb2c6799f1d..22e22a7de9a 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -173,9 +173,13 @@ impl CustodyBackFillSync { }; // Check if we have missing columns between the da boundary and `earliest_data_column_epoch` - let missing_columns = self + let Ok(missing_columns) = self .beacon_chain - .get_missing_columns_for_epoch(da_boundary_epoch); + .get_missing_columns_for_epoch(da_boundary_epoch) + else { + // If custody context has not been initialised, we cannot determine if backfill is required. + return false; + }; if !missing_columns.is_empty() { let latest_finalized_epoch = self @@ -384,7 +388,9 @@ impl CustodyBackFillSync { // Skip all batches (Epochs) that don't have missing columns. for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) { - let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch); + let Ok(missing_columns) = self.beacon_chain.get_missing_columns_for_epoch(epoch) else { + return None; + }; if !missing_columns.is_empty() { self.to_be_downloaded = epoch; @@ -443,7 +449,11 @@ impl CustodyBackFillSync { self.include_next_batch() } Entry::Vacant(entry) => { - let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(batch_id); + let Ok(missing_columns) = self.beacon_chain.get_missing_columns_for_epoch(batch_id) + else { + // This is unreachable as we have already called this function earlier, so custody context must be initialised. + return None; + }; entry.insert(BatchInfo::new( &batch_id, CUSTODY_BACKFILL_EPOCHS_PER_BATCH, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2e0c56db23f..bc0689ff398 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -583,6 +583,7 @@ impl SyncNetworkContext { let column_indexes = self .chain .sampling_columns_for_epoch(epoch) + .map_err(RpcRequestSendError::InternalError)? .iter() .cloned() .collect(); @@ -659,15 +660,15 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .chain + .sampling_columns_for_epoch(epoch) + .map_err(RpcRequestSendError::InternalError)?; let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, - data_column_requests.map(|data_column_requests| { - ( - data_column_requests, - self.chain.sampling_columns_for_epoch(epoch).to_vec(), - ) - }), + data_column_requests + .map(|data_column_requests| (data_column_requests, sampling_columns.to_vec())), range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -1096,6 +1097,7 @@ impl SyncNetworkContext { let custody_indexes_to_fetch = self .chain .sampling_columns_for_epoch(current_epoch) + .map_err(RpcRequestSendError::InternalError)? .iter() .copied() .filter(|index| !custody_indexes_imported.contains(index)) @@ -1700,6 +1702,7 @@ impl SyncNetworkContext { let column_indexes = self .chain .sampling_columns_for_epoch(batch_id.epoch) + .map_err(RpcRequestSendError::InternalError)? .iter() .cloned() .collect(); diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index a66080ada6f..53c4928ec8d 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -817,7 +817,7 @@ impl ChainSpec { &self, custody_group_count: u64, ) -> Result { - let sampling_size_groups = self.sampling_size_custody_groups(custody_group_count)?; + let sampling_size_groups = self.sampling_size_custody_groups(custody_group_count); let columns_per_custody_group = self.data_columns_per_group::(); let sampling_size_columns = columns_per_custody_group @@ -828,8 +828,8 @@ impl ChainSpec { } /// Returns the number of custody groups to sample per slot. - pub fn sampling_size_custody_groups(&self, custody_group_count: u64) -> Result { - Ok(std::cmp::max(custody_group_count, self.samples_per_slot)) + pub fn sampling_size_custody_groups(&self, custody_group_count: u64) -> u64 { + std::cmp::max(custody_group_count, self.samples_per_slot) } /// Returns the min epoch for blob / data column sidecar requests based on the current epoch.