diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 074ae93a790..806ae2f0071 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -636,7 +636,7 @@ pub fn signature_verify_chain_segment( let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); let maybe_available_blocks = chain .data_availability_checker - .verify_kzg_for_rpc_blocks(blocks)?; + .verify_kzg_for_rpc_blocks(&blocks)?; // zip it back up let mut signature_verified_blocks = roots .into_iter() diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index dab54dc823e..d6e78ffa0f5 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -9,8 +9,8 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, - Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, ColumnIndex, Epoch, + EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: @@ -80,6 +80,70 @@ impl RpcBlock { RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => Some(data_columns), } } + + /// Returns indices of blobs or data columns that have conflicting signature with block's + /// signature. + pub fn validate_non_matching_signed_headers(&self) -> Result<(), AvailabilityCheckError> { + match &self.block { + RpcBlockInner::Block(_) => Ok(()), + RpcBlockInner::BlockAndBlobs(block, blobs) => { + let indices: Vec<_> = blobs + .iter() + .filter(|blob| &blob.signed_block_header.signature != block.signature()) + .map(|blob| blob.index) + .collect(); + if !indices.is_empty() { + return Err(AvailabilityCheckError::InvalidBlobsSignature(indices)); + } + Ok(()) + } + RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { + let indices: Vec<_> = data_columns + .iter() + .filter(|column| { + &column.as_data_column().signed_block_header.signature != block.signature() + }) + .map(|column| column.index()) + .collect(); + if !indices.is_empty() { + return Err(AvailabilityCheckError::InvalidDataColumnsSignature(indices)); + } + Ok(()) + } + } + } + + /// Returns indices of blobs that have conflicting signature with block's signature. + pub fn non_matching_blobs_signed_headers(&self) -> Option> { + match &self.block { + RpcBlockInner::Block(_) => None, + RpcBlockInner::BlockAndBlobs(block, blobs) => Some( + blobs + .iter() + .filter(|blob| &blob.signed_block_header.signature != block.signature()) + .map(|blob| blob.index) + .collect(), + ), + RpcBlockInner::BlockAndCustodyColumns(..) => None, + } + } + + /// Returns indices of custody columns that have conflicting signature with block's signature. + pub fn non_matching_custody_columns_signed_headers(&self) -> Option> { + match &self.block { + RpcBlockInner::Block(_) => None, + RpcBlockInner::BlockAndBlobs(..) => None, + RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => Some( + data_columns + .iter() + .filter(|column| { + &column.as_data_column().signed_block_header.signature != block.signature() + }) + .map(|column| column.index()) + .collect(), + ), + } + } } /// Note: This variant is intentionally private because we want to safely construct the @@ -186,6 +250,28 @@ impl RpcBlock { }) } + /// Only used for testing + pub fn __new_for_testing( + block_root: Hash256, + block: Arc>, + blobs: Option>, + custody_columns: Option>, + custody_columns_count: usize, + ) -> Self { + let inner = if let Some(blobs) = blobs { + RpcBlockInner::BlockAndBlobs(block, blobs) + } else if let Some(columns) = custody_columns { + RpcBlockInner::BlockAndCustodyColumns(block, columns) + } else { + RpcBlockInner::Block(block) + }; + Self { + block_root, + block: inner, + custody_columns_count, + } + } + #[allow(clippy::type_complexity)] pub fn deconstruct( self, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 033b472da0c..d73ad4f6bba 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -323,6 +323,9 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { + // Verify that blobs or data columns signatures match + block.validate_non_matching_signed_headers()?; + let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { @@ -391,8 +394,14 @@ impl DataAvailabilityChecker { /// check if there are any missing blobs. pub fn verify_kzg_for_rpc_blocks( &self, - blocks: Vec>, + blocks: &[RpcBlock], ) -> Result>, AvailabilityCheckError> { + // Verify that blobs or data columns signatures match + blocks + .iter() + .map(|block| block.validate_non_matching_signed_headers()) + .collect::, AvailabilityCheckError>>()?; + let mut results = Vec::with_capacity(blocks.len()); let all_blobs = blocks .iter() @@ -428,7 +437,7 @@ impl DataAvailabilityChecker { for block in blocks { let custody_columns_count = block.custody_columns_count(); - let (block_root, block, blobs, data_columns) = block.deconstruct(); + let (block_root, block, blobs, data_columns) = block.clone().deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { if let Some(blobs) = blobs { @@ -783,23 +792,6 @@ impl AvailableBlock { } = self; (block_root, block, blob_data) } - - /// Only used for testing - pub fn __clone_without_recv(&self) -> Result { - Ok(Self { - block_root: self.block_root, - block: self.block.clone(), - blob_data: match &self.blob_data { - AvailableBlockData::NoData => AvailableBlockData::NoData, - AvailableBlockData::Blobs(blobs) => AvailableBlockData::Blobs(blobs.clone()), - AvailableBlockData::DataColumns(data_columns) => { - AvailableBlockData::DataColumns(data_columns.clone()) - } - }, - blobs_available_timestamp: self.blobs_available_timestamp, - spec: self.spec.clone(), - }) - } } #[derive(Debug)] diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb5..aa93e6085ba 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -22,6 +22,10 @@ pub enum Error { BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, + /// One or more signatures in a BlobSidecar of an RpcBlock are invalid + InvalidBlobsSignature(Vec), + /// One or more signatures in a DataColumnSidecar of an RpcBlock are invalid + InvalidDataColumnsSignature(Vec), } #[derive(PartialEq, Eq)] @@ -44,7 +48,9 @@ impl Error { | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) - | Error::SlotClockError => ErrorCategory::Internal, + | Error::SlotClockError + | Error::InvalidBlobsSignature(_) + | Error::InvalidDataColumnsSignature(_) => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } | Error::ReconstructColumnsError { .. } diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 348e6d52a64..ccefc8048c9 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,4 +1,5 @@ -use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; +use crate::block_verification_types::{AvailableBlock, MaybeAvailableBlock, RpcBlock}; +use crate::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use crate::{metrics, BeaconChain, BeaconChainTypes}; use itertools::Itertools; use state_processing::{ @@ -19,6 +20,9 @@ use types::{FixedBytesExtended, Hash256, Slot}; /// It's ok if historical sync is stalled due to writes from forwards block processing. const PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(30); +type RpcBlockBatchResult = Result<(usize, Vec>), HistoricalBlockError>; +type AvailableBlockBatchResult = Result<(usize, Vec>), HistoricalBlockError>; + #[derive(Debug, IntoStaticStr)] pub enum HistoricalBlockError { /// Block root mismatch, caller should retry with different blocks. @@ -27,15 +31,17 @@ pub enum HistoricalBlockError { expected_block_root: Hash256, }, /// Bad signature, caller should retry with different blocks. - SignatureSet(SignatureSetError), - /// Bad signature, caller should retry with different blocks. - InvalidSignature, + InvalidSignature(String), + /// Unexpected error + Unexpected(String), /// Transitory error, caller should retry with the same blocks. ValidatorPubkeyCacheTimeout, /// Logic error: should never occur. IndexOutOfBounds, /// Internal store error StoreError(StoreError), + /// Faulty and internal AvailabilityCheckError + AvailabilityCheckError(AvailabilityCheckError), } impl From for HistoricalBlockError { @@ -44,41 +50,49 @@ impl From for HistoricalBlockError { } } +impl From for HistoricalBlockError { + fn from(err: SignatureSetError) -> Self { + match err { + // The encoding of the signature is invalid, peer fault + e + @ (SignatureSetError::SignatureInvalid(_) | SignatureSetError::BadBlsBytes { .. }) => { + Self::InvalidSignature(format!("{e:?}")) + } + // All these variants are internal errors or unreachable for historical block paths, + // which only check the proposer signature. + // BadBlsBytes = Unreachable + e @ (SignatureSetError::BeaconStateError(_) + | SignatureSetError::ValidatorUnknown(_) + | SignatureSetError::ValidatorPubkeyUnknown(_) + | SignatureSetError::IncorrectBlockProposer { .. } + | SignatureSetError::MismatchedPublicKeyLen { .. } + | SignatureSetError::PublicKeyDecompressionFailed + | SignatureSetError::InconsistentBlockFork(_)) => Self::Unexpected(format!("{e:?}")), + } + } +} + +impl From for HistoricalBlockError { + fn from(e: AvailabilityCheckError) -> Self { + Self::AvailabilityCheckError(e) + } +} + impl BeaconChain { - /// Store a batch of historical blocks in the database. - /// - /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block - /// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`. - /// - /// The block roots and proposer signatures are verified. If any block doesn't match the parent - /// root listed in its successor, then the whole batch will be discarded and - /// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then - /// `SignatureSetError` or `InvalidSignature` will be returned. - /// - /// To align with sync we allow some excess blocks with slots greater than or equal to - /// `oldest_block_slot` to be provided. They will be ignored without being checked. - /// - /// This function should not be called concurrently with any other function that mutates - /// the anchor info (including this function itself). If a concurrent mutation occurs that - /// would violate consistency then an `AnchorInfoConcurrentMutation` error will be returned. - /// - /// Return the number of blocks successfully imported. - pub fn import_historical_block_batch( + fn get_correct_historical_block_chain( &self, - mut blocks: Vec>, - ) -> Result { + blocks: &[RpcBlock], + ) -> RpcBlockBatchResult { let anchor_info = self.store.get_anchor_info(); - let blob_info = self.store.get_blob_info(); - let data_column_info = self.store.get_data_column_info(); // Take all blocks with slots less than the oldest block slot. let num_relevant = blocks.partition_point(|available_block| { - available_block.block().slot() < anchor_info.oldest_block_slot + available_block.as_block().slot() < anchor_info.oldest_block_slot }); let total_blocks = blocks.len(); - blocks.truncate(num_relevant); - let blocks_to_import = blocks; + let mut blocks_to_import = blocks.to_vec(); + blocks_to_import.truncate(num_relevant); if blocks_to_import.len() != total_blocks { debug!( @@ -89,30 +103,145 @@ impl BeaconChain { ); } - if blocks_to_import.is_empty() { - return Ok(0); + // Validate that the blocks are chaining ascending order. + let mut expected_block_root = anchor_info.oldest_block_parent; + let mut num_to_include = blocks_to_import.len(); + + for (i, block) in blocks_to_import.iter().enumerate().rev() { + if block.block_root() != expected_block_root { + return Err(HistoricalBlockError::MismatchedBlockRoot { + block_root: block.block_root(), + expected_block_root, + }); + } + + expected_block_root = block.as_block().message().parent_root(); + + if expected_block_root == self.genesis_block_root { + num_to_include = i; + break; + } } + // Truncate when we reach the gensis block. + blocks_to_import.drain(..num_to_include); + Ok((num_relevant, blocks_to_import)) + } + + fn verify_rpc_block_signatures( + &self, + signed_blocks: &[RpcBlock], + ) -> Result<(), HistoricalBlockError> { + // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration + // possible. For each block fetch the parent root from its successor. Slicing from index 1 + // is safe because we've already checked that `blocks_to_import` is non-empty. + let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); + let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); + let anchor_info = self.store.get_anchor_info(); + let pubkey_cache = self + .validator_pubkey_cache + .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; + let block_roots = signed_blocks + .get(1..) + .ok_or(HistoricalBlockError::IndexOutOfBounds)? + .iter() + .map(|block| block.as_block().parent_root()) + .chain(iter::once(anchor_info.oldest_block_parent)); + let signature_set = signed_blocks + .iter() + .zip_eq(block_roots) + .filter(|&(_block, block_root)| (block_root != self.genesis_block_root)) + .map(|(block, block_root)| { + let block = block.as_block(); + block_proposal_signature_set_from_parts( + block, + Some(block_root), + block.message().proposer_index(), + &self.spec.fork_at_epoch(block.message().epoch()), + self.genesis_validators_root, + |validator_index| pubkey_cache.get(validator_index).cloned().map(Cow::Owned), + &self.spec, + ) + }) + .collect::, _>>() + .map(ParallelSignatureSets::from)?; + drop(pubkey_cache); + drop(setup_timer); + + let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); + if !signature_set.verify() { + return Err(HistoricalBlockError::InvalidSignature("invalid".to_owned())); + } + drop(verify_timer); + drop(sig_timer); + + Ok(()) + } + + fn verify_blobs_and_data_columns( + &self, + blocks: &[RpcBlock], + ) -> Result>, HistoricalBlockError> { + // Check that all data columns are present <- faulty failure if missing because we have + // checked the block root is correct first. + let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); + let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); + let available_blocks = self + .data_availability_checker + .verify_kzg_for_rpc_blocks(blocks) + .map_err(HistoricalBlockError::AvailabilityCheckError)? // Map outer error + .into_iter() + .map(|maybe_block| match maybe_block { + MaybeAvailableBlock::Available(block) => Ok(block), + MaybeAvailableBlock::AvailabilityPending { .. } => { + Err(HistoricalBlockError::AvailabilityCheckError( + AvailabilityCheckError::Unexpected("block not available".to_string()), + )) + } + }) + .collect::, _>>()?; // Map inner results + + drop(verify_timer); + drop(sig_timer); + + Ok(available_blocks) + } + + fn verify_historical_block_batch( + &self, + blocks: &[RpcBlock], + ) -> AvailableBlockBatchResult { + let (num_relevant, importing_blocks) = self.get_correct_historical_block_chain(blocks)?; + + if num_relevant == 0 { + return Ok((num_relevant, vec![])); + } + self.verify_rpc_block_signatures(&importing_blocks)?; + let verified_blocks = self.verify_blobs_and_data_columns(&importing_blocks)?; + Ok((num_relevant, verified_blocks)) + } + + fn store_historical_block_batch( + &self, + blocks: Vec>, + ) -> Result<(), HistoricalBlockError> { + let anchor_info = self.store.get_anchor_info(); + let blob_info = self.store.get_blob_info(); + let data_column_info = self.store.get_data_column_info(); + let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; let mut blob_batch = Vec::::new(); - let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); - let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); - let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); + let mut cold_batch = Vec::with_capacity(blocks.len()); + let mut hot_batch = Vec::with_capacity(blocks.len()); - for available_block in blocks_to_import.into_iter().rev() { + for available_block in blocks.into_iter().rev() { let (block_root, block, block_data) = available_block.deconstruct(); - if block_root != expected_block_root { - return Err(HistoricalBlockError::MismatchedBlockRoot { - block_root, - expected_block_root, - }); - } - if !self.store.get_config().prune_payloads { // If prune-payloads is set to false, store the block which includes the execution payload self.store @@ -127,6 +256,9 @@ impl BeaconChain { ); } + // Check that block headers in the block and its blob or data column sidecars are the same. + // Store blob and data column sidecars all together to verify KZG as a batch at the end. + // Also, update the oldest blob and data column slots. match &block_data { AvailableBlockData::NoData => {} AvailableBlockData::Blobs(..) => { @@ -160,7 +292,6 @@ impl BeaconChain { prev_block_slot = block.slot(); expected_block_root = block.message().parent_root(); - signed_blocks.push(block); // If we've reached genesis, add the genesis block root to the batch for all slots // between 0 and the first block slot, and set the anchor slot to 0 to indicate @@ -179,51 +310,6 @@ impl BeaconChain { break; } } - // these were pushed in reverse order so we reverse again - signed_blocks.reverse(); - - // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration - // possible. For each block fetch the parent root from its successor. Slicing from index 1 - // is safe because we've already checked that `blocks_to_import` is non-empty. - let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); - let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; - let block_roots = signed_blocks - .get(1..) - .ok_or(HistoricalBlockError::IndexOutOfBounds)? - .iter() - .map(|block| block.parent_root()) - .chain(iter::once(anchor_info.oldest_block_parent)); - let signature_set = signed_blocks - .iter() - .zip_eq(block_roots) - .filter(|&(_block, block_root)| (block_root != self.genesis_block_root)) - .map(|(block, block_root)| { - block_proposal_signature_set_from_parts( - block, - Some(block_root), - block.message().proposer_index(), - &self.spec.fork_at_epoch(block.message().epoch()), - self.genesis_validators_root, - |validator_index| pubkey_cache.get(validator_index).cloned().map(Cow::Owned), - &self.spec, - ) - }) - .collect::, _>>() - .map_err(HistoricalBlockError::SignatureSet) - .map(ParallelSignatureSets::from)?; - drop(pubkey_cache); - drop(setup_timer); - - let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); - if !signature_set.verify() { - return Err(HistoricalBlockError::InvalidSignature); - } - drop(verify_timer); - drop(sig_timer); // Write the I/O batches to disk, writing the blocks themselves first, as it's better // for the hot DB to contain extra blocks than for the cold DB to point to blocks that @@ -284,6 +370,38 @@ impl BeaconChain { self.store_migrator.process_reconstruction(); } + Ok(()) + } + + /// Verify and store a batch of historical blocks in the database. + /// + /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block + /// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`. + /// + /// The block roots and proposer signatures are verified. If any block doesn't match the parent + /// root listed in its successor, then the whole batch will be discarded and + /// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then + /// `SignatureSetError` or `InvalidSignature` will be returned. + /// + /// The block's blob and data column sidecars are verified. The function checks their signed + /// block headers equal to their block's block header. Then, it verifies their KZG commitments + /// with proofs. KZG inclusion proof verification is not held here as it is already done by the + /// network crate. + /// + /// To align with sync we allow some excess blocks with slots greater than or equal to + /// `oldest_block_slot` to be provided. They will be ignored without being checked. + /// + /// This function should not be called concurrently with any other function that mutates + /// the anchor info (including this function itself). If a concurrent mutation occurs that + /// would violate consistency then an `AnchorInfoConcurrentMutation` error will be returned. + /// + /// Return the number of blocks successfully imported. + pub fn import_historical_block_batch( + &self, + blocks: &[RpcBlock], + ) -> Result { + let (num_relevant, verified_blocks) = self.verify_historical_block_batch(blocks)?; + self.store_historical_block_batch(verified_blocks)?; Ok(num_relevant) } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 3343dc101b5..e73de11d3c7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,9 +1,9 @@ #![cfg(not(debug_assertions))] use beacon_chain::attestation_verification::Error as AttnError; -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::builder::BeaconChainBuilder; -use beacon_chain::data_availability_checker::AvailableBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ @@ -2279,6 +2279,9 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let temp1 = tempdir().unwrap(); let full_store = get_store(&temp1); + let is_deneb = full_store.get_chain_spec().deneb_fork_epoch.is_some(); + let is_fulu = full_store.get_chain_spec().is_peer_das_scheduled(); + // TODO(das): Run a supernode so the node has full blobs stored. // This may not be required in the future if we end up implementing downloading checkpoint // blobs from p2p peers: @@ -2465,7 +2468,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .map(|s| s.beacon_block.clone()) .collect::>(); - let mut available_blocks = vec![]; + let mut rpc_blocks = vec![]; for blinded in historical_blocks { let block_root = blinded.canonical_root(); let full_block = harness @@ -2474,48 +2477,173 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .await .expect("should get block") .expect("should get block"); + let rpc_block = + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)); - if let MaybeAvailableBlock::Available(block) = harness + if let MaybeAvailableBlock::Available(_) = harness .chain .data_availability_checker - .verify_kzg_for_rpc_block( - harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), - ) + .verify_kzg_for_rpc_block(rpc_block.clone()) .expect("should verify kzg") { - available_blocks.push(block); + rpc_blocks.push(rpc_block); } } // Corrupt the signature on the 1st block to ensure that the backfill processor is checking // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. - let mut batch_with_invalid_first_block = - available_blocks.iter().map(clone_block).collect::>(); + let mut batch_with_invalid_first_block = rpc_blocks + .iter() + .map(|block| block.clone()) + .collect::>(); batch_with_invalid_first_block[0] = { - let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); - let mut corrupt_block = (*block).clone(); - *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec)) + let (block_root, mut block, blobs, columns) = rpc_blocks[0].clone().deconstruct(); + let custody_columns_count = rpc_blocks[0].custody_columns_count(); + let block_mut = Arc::make_mut(&mut block); + *block_mut.signature_mut() = Signature::empty(); + RpcBlock::__new_for_testing(block_root, block, blobs, columns, custody_columns_count) }; + // Corrupt the signature and KZG commitments on the 1st blocks with blob sidecar and data + // column sidecar to ensure that the backfill processor is checking signatures correctly. + if is_deneb { + let first_blob_or_col_index = rpc_blocks + .iter() + .position(|block| { + let (_, _, blobs, columns) = block.clone().deconstruct(); + if blobs.is_some() { + is_deneb && !is_fulu + } else if columns.is_some() { + is_fulu + } else { + false + } + }) + .unwrap(); + + let mut batch_with_invalid_header = rpc_blocks + .iter() + .map(|block| block.clone()) + .collect::>(); + batch_with_invalid_header[first_blob_or_col_index] = { + let (block_root, block, blobs, cols) = batch_with_invalid_header + [first_blob_or_col_index] + .clone() + .deconstruct(); + let custody_columns_count = + batch_with_invalid_header[first_blob_or_col_index].custody_columns_count(); + if is_fulu { + let mut sidecars = cols.unwrap(); + assert!( + !sidecars.is_empty(), + "data column sidecars shouldn't be empty" + ); + let mut sidecar = sidecars[0].clone_arc(); + let mut_sidecar = Arc::make_mut(&mut sidecar); + mut_sidecar.signed_block_header.signature = Signature::empty(); + sidecars[0] = CustodyDataColumn::from_asserted_custody(sidecar); + RpcBlock::__new_for_testing( + block_root, + block, + blobs, + Some(sidecars), + custody_columns_count, + ) + } else { + let mut sidecars = blobs.unwrap(); + assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.signed_block_header.signature = Signature::empty(); + RpcBlock::__new_for_testing( + block_root, + block, + Some(sidecars), + cols, + custody_columns_count, + ) + } + }; + + let mut batch_with_invalid_kzg = rpc_blocks + .iter() + .map(|block| block.clone()) + .collect::>(); + batch_with_invalid_kzg[first_blob_or_col_index] = { + let (block_root, block, blobs, cols) = batch_with_invalid_kzg[first_blob_or_col_index] + .clone() + .deconstruct(); + let custody_columns_count = + batch_with_invalid_kzg[first_blob_or_col_index].custody_columns_count(); + if is_fulu { + let mut sidecars = cols.unwrap(); + assert!( + !sidecars.is_empty(), + "data column sidecars shouldn't be empty" + ); + let mut sidecar = sidecars[0].clone_arc(); + let mut_sidecar = Arc::make_mut(&mut sidecar); + mut_sidecar.kzg_commitments[0] = KzgCommitment::empty_for_testing(); + sidecars[0] = CustodyDataColumn::from_asserted_custody(sidecar); + RpcBlock::__new_for_testing( + block_root, + block, + blobs, + Some(sidecars), + custody_columns_count, + ) + } else { + let mut sidecars = blobs.unwrap(); + assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.kzg_commitment = KzgCommitment::empty_for_testing(); + RpcBlock::__new_for_testing( + block_root, + block, + Some(sidecars), + cols, + custody_columns_count, + ) + } + }; + + // Importing the batch with an invalid blob/column signature block header should error. + assert!(matches!( + beacon_chain + .import_historical_block_batch(&batch_with_invalid_header) + .unwrap_err(), + HistoricalBlockError::AvailabilityCheckError(_) + )); + + // Importing the batch with an invalid blob/column KZG commitment should error. + assert!(matches!( + beacon_chain + .import_historical_block_batch(&batch_with_invalid_kzg) + .unwrap_err(), + HistoricalBlockError::AvailabilityCheckError(_) + )); + } + // Importing the invalid batch should error. assert!(matches!( beacon_chain - .import_historical_block_batch(batch_with_invalid_first_block) + .import_historical_block_batch(&batch_with_invalid_first_block) .unwrap_err(), - HistoricalBlockError::InvalidSignature + HistoricalBlockError::InvalidSignature(_) | HistoricalBlockError::AvailabilityCheckError(_) )); // Importing the batch with valid signatures should succeed. - let available_blocks_dup = available_blocks.iter().map(clone_block).collect::>(); + let rpc_blocks_dup = rpc_blocks + .iter() + .map(|block| block.clone()) + .collect::>(); beacon_chain - .import_historical_block_batch(available_blocks_dup) + .import_historical_block_batch(&rpc_blocks_dup) .unwrap(); assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0); // Resupplying the blocks should not fail, they can be safely ignored. beacon_chain - .import_historical_block_batch(available_blocks) + .import_historical_block_batch(&rpc_blocks) .unwrap(); // The forwards iterator should now match the original chain @@ -3676,7 +3804,3 @@ fn get_blocks( .map(|checkpoint| checkpoint.beacon_block_root.into()) .collect() } - -fn clone_block(block: &AvailableBlock) -> AvailableBlock { - block.__clone_without_recv().unwrap() -} diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 48ae26c8265..3869d462546 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -590,7 +590,7 @@ impl NetworkBeaconProcessor { let available_blocks = match self .chain .data_availability_checker - .verify_kzg_for_rpc_blocks(downloaded_blocks) + .verify_kzg_for_rpc_blocks(&downloaded_blocks) { Ok(blocks) => blocks .into_iter() @@ -635,7 +635,7 @@ impl NetworkBeaconProcessor { ); } - match self.chain.import_historical_block_batch(available_blocks) { + match self.chain.import_historical_block_batch(&downloaded_blocks) { Ok(imported_blocks) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, @@ -660,13 +660,21 @@ impl NetworkBeaconProcessor { // The peer is faulty if they send blocks with bad roots. Some(PeerAction::LowToleranceError) } - HistoricalBlockError::InvalidSignature - | HistoricalBlockError::SignatureSet(_) => { + HistoricalBlockError::InvalidSignature(e) => { warn!( error = ?e, "Backfill batch processing error" ); - // The peer is faulty if they bad signatures. + // The peer is faulty if they send bad signatures. + Some(PeerAction::LowToleranceError) + } + HistoricalBlockError::Unexpected(_) + | HistoricalBlockError::AvailabilityCheckError(_) => { + debug!( + error = ?e, + "Backfill blob or data column verification error" + ); + // The peer is faulty if they send bad blob or data colum sidecars. Some(PeerAction::LowToleranceError) } HistoricalBlockError::ValidatorPubkeyCacheTimeout => {