From 3e5604d9e842e682971a1447ba9e436e74112e33 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Thu, 24 Apr 2025 04:07:56 +0900 Subject: [PATCH 1/5] work so far --- .../beacon_chain/src/historical_blocks.rs | 38 ++++++++++++++++++- .../network_beacon_processor/sync_methods.rs | 11 +++++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 348e6d52a64..469f63dc04a 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,6 +1,9 @@ +use crate::blob_verification::verify_kzg_for_blob_list; use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; +use crate::data_column_verification::verify_kzg_for_data_column_list; use crate::{metrics, BeaconChain, BeaconChainTypes}; use itertools::Itertools; +use kzg::Error as KzgError; use state_processing::{ per_block_processing::ParallelSignatureSets, signature_sets::{block_proposal_signature_set_from_parts, Error as SignatureSetError}, @@ -26,10 +29,14 @@ pub enum HistoricalBlockError { block_root: Hash256, expected_block_root: Hash256, }, + /// Signed block header mismatch with a blob or data column sidecar. + MismatchedBlockHeader, /// Bad signature, caller should retry with different blocks. SignatureSet(SignatureSetError), /// Bad signature, caller should retry with different blocks. InvalidSignature, + /// KZG verification for blobs or data columns failed. + BlobOrDataColumnKzgError(KzgError), /// Transitory error, caller should retry with the same blocks. ValidatorPubkeyCacheTimeout, /// Logic error: should never occur. @@ -47,6 +54,7 @@ impl From for HistoricalBlockError { impl BeaconChain { /// Store a batch of historical blocks in the database. /// + /// TODO: the ascending order is never checked anywhere. /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block /// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`. /// @@ -54,6 +62,11 @@ impl BeaconChain { /// root listed in its successor, then the whole batch will be discarded and /// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then /// `SignatureSetError` or `InvalidSignature` will be returned. + /// + /// The block's blob and data column sidecars are verified. The function checks their signed + /// block headers equal to their block's block header. Then, it verifies their KZG commitments + /// with proofs. KZG inclusion proof verification is not held here as it is already done by the + /// network crate. /// /// To align with sync we allow some excess blocks with slots greater than or equal to /// `oldest_block_slot` to be provided. They will be ignored without being checked. @@ -102,6 +115,8 @@ impl BeaconChain { let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); + let mut all_blobs = Vec::new(); + let mut all_data_columns = Vec::new(); for available_block in blocks_to_import.into_iter().rev() { let (block_root, block, block_data) = available_block.deconstruct(); @@ -127,12 +142,27 @@ impl BeaconChain { ); } + // Check that block headers in the block and its blob or data column sidecars are the same. + // Store blob and data column sidecars all together to verify KZG as a batch at the end. + // Also, update the oldest blob and data column slots. match &block_data { AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(..) => { + AvailableBlockData::Blobs(blobs) => { + for blob in blobs { + if blob.signed_block_header != block.signed_block_header() { + return Err(HistoricalBlockError::MismatchedBlockHeader); + } + } + all_blobs.extend(blobs.clone()); new_oldest_blob_slot = Some(block.slot()); } - AvailableBlockData::DataColumns(_) => { + AvailableBlockData::DataColumns(data_columns) => { + for data_column in data_columns { + if data_column.signed_block_header != block.signed_block_header() { + return Err(HistoricalBlockError::MismatchedBlockHeader); + } + } + all_data_columns.extend(data_columns.clone()); new_oldest_data_column_slot = Some(block.slot()); } } @@ -182,6 +212,7 @@ impl BeaconChain { // these were pushed in reverse order so we reverse again signed_blocks.reverse(); + // TODO: add metrics for blobs and column verification // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration // possible. For each block fetch the parent root from its successor. Slicing from index 1 // is safe because we've already checked that `blocks_to_import` is non-empty. @@ -225,6 +256,9 @@ impl BeaconChain { drop(verify_timer); drop(sig_timer); + verify_kzg_for_blob_list(all_blobs.iter(), self.kzg.as_ref()).map_err(|e| HistoricalBlockError::BlobOrDataColumnKzgError(e))?; + verify_kzg_for_data_column_list(all_data_columns.iter(), self.kzg.as_ref()).map_err(|e| HistoricalBlockError::BlobOrDataColumnKzgError(e))?; + // Write the I/O batches to disk, writing the blocks themselves first, as it's better // for the hot DB to contain extra blocks than for the cold DB to point to blocks that // do not exist. diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 48ae26c8265..b99122da3f4 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -666,7 +666,16 @@ impl NetworkBeaconProcessor { error = ?e, "Backfill batch processing error" ); - // The peer is faulty if they bad signatures. + // The peer is faulty if they send bad signatures. + Some(PeerAction::LowToleranceError) + } + HistoricalBlockError::MismatchedBlockHeader + | HistoricalBlockError::BlobOrDataColumnKzgError(_) => { + warn!( + error = ?e, + "Backfill blob or data column verification error" + ); + // The peer is faulty if they send bad blob or data colum sidecars. Some(PeerAction::LowToleranceError) } HistoricalBlockError::ValidatorPubkeyCacheTimeout => { From d15aa3f04a871c0d5a56bb214628c961320e6569 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Mon, 28 Apr 2025 18:10:43 +0900 Subject: [PATCH 2/5] test done for deneb blobs. fulu cols in progress --- beacon_node/beacon_chain/tests/store_tests.rs | 103 +++++++++++++++++- 1 file changed, 100 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 3343dc101b5..e3521929748 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,9 +1,9 @@ -#![cfg(not(debug_assertions))] +//#![cfg(not(debug_assertions))] use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; -use beacon_chain::data_availability_checker::AvailableBlock; +use beacon_chain::data_availability_checker::{AvailableBlock, AvailableBlockData}; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ @@ -21,6 +21,7 @@ use rand::rngs::StdRng; use rand::Rng; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::{state_advance::complete_state_advance, BlockReplayer}; +use tracing::debug; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryInto; @@ -2279,6 +2280,9 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let temp1 = tempdir().unwrap(); let full_store = get_store(&temp1); + let is_deneb = full_store.get_chain_spec().deneb_fork_epoch.is_some(); + let is_fulu = full_store.get_chain_spec().is_peer_das_scheduled(); + // TODO(das): Run a supernode so the node has full blobs stored. // This may not be required in the future if we end up implementing downloading checkpoint // blobs from p2p peers: @@ -2495,8 +2499,101 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec)) + AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec.clone())) }; + assert_eq!(*batch_with_invalid_first_block[0].block().signature(), Signature::empty()); + + // Corrupt the signature on the 1st blocks with blob sidecar and data column sidecar to ensure + // that the backfill processor is checking signatures correctly. + // TODO make them into one loop if possible + if is_deneb || is_fulu { + let first_blob_or_col_index = + available_blocks + .iter() + .position(|block| { + let (_, _, data) = clone_block(block).deconstruct(); + match &data { + AvailableBlockData::NoData => false, + AvailableBlockData::Blobs(_) => is_deneb && !is_fulu, + AvailableBlockData::DataColumns(_) => is_fulu, + } + }) + .unwrap(); + + let mut batch_with_invalid_header = + available_blocks.iter().map(clone_block).collect::>(); + batch_with_invalid_header[first_blob_or_col_index] = { + let (block_root, block, mut data) = clone_block(&batch_with_invalid_header[first_blob_or_col_index]).deconstruct(); + match &mut data { + AvailableBlockData::NoData => panic!("should get blobs or data columns"), + AvailableBlockData::Blobs(sidecars) => { + assert!(!sidecars.is_empty(), "sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.signed_block_header.signature = Signature::empty(); + AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec.clone())) + } + AvailableBlockData::DataColumns(sidecars) => { + assert!(!sidecars.is_empty(), "sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.signed_block_header.signature = Signature::empty(); + AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec.clone())) + } + } + }; + + let mut batch_with_invalid_kzg = + available_blocks.iter().map(clone_block).collect::>(); + batch_with_invalid_kzg[first_blob_or_col_index] = { + let (block_root, block, mut data) = clone_block(&batch_with_invalid_kzg[first_blob_or_col_index]).deconstruct(); + match &mut data { + AvailableBlockData::NoData => panic!("should get blobs or data columns"), + AvailableBlockData::Blobs(sidecars) => { + assert!(!sidecars.is_empty(), "sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.kzg_commitment = KzgCommitment::empty_for_testing(); + AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec)) + } + AvailableBlockData::DataColumns(sidecars) => { + assert!(!sidecars.is_empty() && !sidecars[0].kzg_commitments.is_empty(), "sidecars and their commitments shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.kzg_commitments[0] = KzgCommitment::empty_for_testing(); + AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec)) + } + } + }; + if let AvailableBlockData::DataColumns(blobs) = + &batch_with_invalid_kzg[first_blob_or_col_index].data() + { + assert!( + !blobs.is_empty(), + "Expected at least one blob in the corrupted block" + ); + + let blob = &blobs[0]; + assert_eq!( + blob.kzg_commitments[0], + KzgCommitment::empty_for_testing(), + "Blob signature was not correctly cleared" + ); + } else { + panic!("Expected Blobs variant at first_blob_or_col_index"); + } + + // Importing the batch with an invalid blob signature block header should error. + assert!(matches!( + beacon_chain + .import_historical_block_batch(batch_with_invalid_header) + .unwrap_err(), + HistoricalBlockError::MismatchedBlockHeader + )); + + assert!(matches!( + beacon_chain + .import_historical_block_batch(batch_with_invalid_kzg) + .unwrap_err(), + HistoricalBlockError::BlobOrDataColumnKzgError(_) + )); + } // Importing the invalid batch should error. assert!(matches!( From a58760a593f93998fb7a3f5961206342cf4d8258 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Wed, 30 Apr 2025 09:38:39 +0900 Subject: [PATCH 3/5] taking changes from dapplion and refactor and clean-ups --- .../beacon_chain/src/block_verification.rs | 2 +- .../src/block_verification_types.rs | 75 +++++- .../src/data_availability_checker.rs | 21 +- .../beacon_chain/src/historical_blocks.rs | 228 +++++++++++------ beacon_node/beacon_chain/tests/store_tests.rs | 231 ++++++++++-------- .../network_beacon_processor/sync_methods.rs | 13 +- 6 files changed, 364 insertions(+), 206 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 074ae93a790..806ae2f0071 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -636,7 +636,7 @@ pub fn signature_verify_chain_segment( let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); let maybe_available_blocks = chain .data_availability_checker - .verify_kzg_for_rpc_blocks(blocks)?; + .verify_kzg_for_rpc_blocks(&blocks)?; // zip it back up let mut signature_verified_blocks = roots .into_iter() diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index dab54dc823e..a769f72401a 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -9,8 +9,8 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, - Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, ColumnIndex, Epoch, + EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: @@ -80,6 +80,38 @@ impl RpcBlock { RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => Some(data_columns), } } + + /// Returns indices of blobs that have conflicting signature with block's signature. + pub fn non_matching_blobs_signed_headers(&self) -> Option> { + match &self.block { + RpcBlockInner::Block(_) => None, + RpcBlockInner::BlockAndBlobs(block, blobs) => Some( + blobs + .iter() + .filter(|blob| &blob.signed_block_header.signature != block.signature()) + .map(|blob| blob.index) + .collect(), + ), + RpcBlockInner::BlockAndCustodyColumns(..) => None, + } + } + + /// Returns indices of custody columns that have conflicting signature with block's signature. + pub fn non_matching_custody_columns_signed_headers(&self) -> Option> { + match &self.block { + RpcBlockInner::Block(_) => None, + RpcBlockInner::BlockAndBlobs(..) => None, + RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => Some( + data_columns + .iter() + .filter(|column| { + &column.as_data_column().signed_block_header.signature != block.signature() + }) + .map(|column| column.index()) + .collect(), + ), + } + } } /// Note: This variant is intentionally private because we want to safely construct the @@ -186,6 +218,28 @@ impl RpcBlock { }) } + /// Only used for testing + pub fn __new_for_testing( + block_root: Hash256, + block: Arc>, + blobs: Option>, + custody_columns: Option>, + custody_columns_count: usize, + ) -> Self { + let inner = if let Some(blobs) = blobs { + RpcBlockInner::BlockAndBlobs(block, blobs) + } else if let Some(columns) = custody_columns { + RpcBlockInner::BlockAndCustodyColumns(block, columns) + } else { + RpcBlockInner::Block(block) + }; + Self { + block_root, + block: inner, + custody_columns_count, + } + } + #[allow(clippy::type_complexity)] pub fn deconstruct( self, @@ -216,6 +270,23 @@ impl RpcBlock { RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), } } + + /// Only used for testing + pub fn __clone_without_recv(&self) -> Self { + Self { + block_root: self.block_root, + block: match &self.block { + RpcBlockInner::Block(block) => RpcBlockInner::Block(block.clone()), + RpcBlockInner::BlockAndBlobs(block, blobs) => { + RpcBlockInner::BlockAndBlobs(block.clone(), blobs.clone()) + } + RpcBlockInner::BlockAndCustodyColumns(block, cols) => { + RpcBlockInner::BlockAndCustodyColumns(block.clone(), cols.clone()) + } + }, + custody_columns_count: self.custody_columns_count, + } + } } /// A block that has gone through all pre-deneb block processing checks including block processing diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 033b472da0c..63e57e3a53e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -391,7 +391,7 @@ impl DataAvailabilityChecker { /// check if there are any missing blobs. pub fn verify_kzg_for_rpc_blocks( &self, - blocks: Vec>, + blocks: &Vec>, ) -> Result>, AvailabilityCheckError> { let mut results = Vec::with_capacity(blocks.len()); let all_blobs = blocks @@ -428,7 +428,7 @@ impl DataAvailabilityChecker { for block in blocks { let custody_columns_count = block.custody_columns_count(); - let (block_root, block, blobs, data_columns) = block.deconstruct(); + let (block_root, block, blobs, data_columns) = block.clone().deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { if let Some(blobs) = blobs { @@ -783,23 +783,6 @@ impl AvailableBlock { } = self; (block_root, block, blob_data) } - - /// Only used for testing - pub fn __clone_without_recv(&self) -> Result { - Ok(Self { - block_root: self.block_root, - block: self.block.clone(), - blob_data: match &self.blob_data { - AvailableBlockData::NoData => AvailableBlockData::NoData, - AvailableBlockData::Blobs(blobs) => AvailableBlockData::Blobs(blobs.clone()), - AvailableBlockData::DataColumns(data_columns) => { - AvailableBlockData::DataColumns(data_columns.clone()) - } - }, - blobs_available_timestamp: self.blobs_available_timestamp, - spec: self.spec.clone(), - }) - } } #[derive(Debug)] diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 469f63dc04a..c6176e0e724 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -1,21 +1,20 @@ -use crate::blob_verification::verify_kzg_for_blob_list; -use crate::data_availability_checker::{AvailableBlock, AvailableBlockData}; -use crate::data_column_verification::verify_kzg_for_data_column_list; +use crate::block_verification_types::{AvailableBlock, MaybeAvailableBlock, RpcBlock}; +use crate::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use crate::{metrics, BeaconChain, BeaconChainTypes}; use itertools::Itertools; -use kzg::Error as KzgError; use state_processing::{ per_block_processing::ParallelSignatureSets, signature_sets::{block_proposal_signature_set_from_parts, Error as SignatureSetError}, }; use std::borrow::Cow; use std::iter; +use std::sync::Arc; use std::time::Duration; use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::debug; -use types::{FixedBytesExtended, Hash256, Slot}; +use types::{ColumnIndex, FixedBytesExtended, Hash256, SignedBeaconBlock, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -29,20 +28,22 @@ pub enum HistoricalBlockError { block_root: Hash256, expected_block_root: Hash256, }, - /// Signed block header mismatch with a blob or data column sidecar. - MismatchedBlockHeader, /// Bad signature, caller should retry with different blocks. - SignatureSet(SignatureSetError), - /// Bad signature, caller should retry with different blocks. - InvalidSignature, - /// KZG verification for blobs or data columns failed. - BlobOrDataColumnKzgError(KzgError), + InvalidSignature(String), + /// One or more signatures in a BlobSidecar of an RpcBlock are invalid + InvalidBlobsSignature(Vec), + /// One or more signatures in a DataColumnSidecar of an RpcBlock are invalid + InvalidDataColumnsSignature(Vec), + /// Unexpected error + Unexpected(String), /// Transitory error, caller should retry with the same blocks. ValidatorPubkeyCacheTimeout, /// Logic error: should never occur. IndexOutOfBounds, /// Internal store error StoreError(StoreError), + /// Faulty and internal AvailabilityCheckError + AvailabilityCheckError(AvailabilityCheckError), } impl From for HistoricalBlockError { @@ -51,10 +52,136 @@ impl From for HistoricalBlockError { } } +impl From for HistoricalBlockError { + fn from(err: SignatureSetError) -> Self { + match err { + // The encoding of the signature is invalid, peer fault + e @ SignatureSetError::SignatureInvalid(_) => Self::InvalidSignature(format!("{e:?}")), + // All these variants are internal errors or unreachable for historical block paths, + // which only check the proposer signature. + // BadBlsBytes = Unreachable + e @ (SignatureSetError::BeaconStateError(_) + | SignatureSetError::ValidatorUnknown(_) + | SignatureSetError::ValidatorPubkeyUnknown(_) + | SignatureSetError::IncorrectBlockProposer { .. } + | SignatureSetError::MismatchedPublicKeyLen { .. } + | SignatureSetError::PublicKeyDecompressionFailed + | SignatureSetError::BadBlsBytes { .. } + | SignatureSetError::InconsistentBlockFork(_)) => Self::Unexpected(format!("{e:?}")), + } + } +} + +impl From for HistoricalBlockError { + fn from(e: AvailabilityCheckError) -> Self { + Self::AvailabilityCheckError(e) + } +} + impl BeaconChain { + fn verify_blobs_and_data_columns( + &self, + blocks: &Vec>, + ) -> Result>, HistoricalBlockError> { + // Verify that blobs or data columns signatures match + let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); + let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); + // TODO: this logic is redundant with one from range sync. Do we have a good place to make + // it as a common function? + blocks + .iter() + .map(|block| { + if let Some(indices) = block.non_matching_blobs_signed_headers() { + if !indices.is_empty() { + return Err(HistoricalBlockError::InvalidBlobsSignature(indices)); + } + } + if let Some(indices) = block.non_matching_custody_columns_signed_headers() { + if !indices.is_empty() { + return Err(HistoricalBlockError::InvalidDataColumnsSignature(indices)); + } + } + Ok(()) + }) + .collect::, _>>()?; + drop(setup_timer); + + // Check that all data columns are present <- faulty failure if missing because we have + // checked the block root is correct first. + let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); + let available_blocks = self + .data_availability_checker + .verify_kzg_for_rpc_blocks(blocks) + .map_err(HistoricalBlockError::AvailabilityCheckError)? // Map outer error + .into_iter() + .map(|maybe_block| match maybe_block { + MaybeAvailableBlock::Available(block) => Ok(block), + MaybeAvailableBlock::AvailabilityPending { .. } => { + Err(HistoricalBlockError::AvailabilityCheckError( + AvailabilityCheckError::Unexpected("block not available".to_string()), + )) + } + }) + .collect::, _>>()?; // Map inner results + + drop(verify_timer); + drop(sig_timer); + + Ok(available_blocks) + } + + fn verify_available_block_signatures( + &self, + signed_blocks: &[Arc>], + ) -> Result<(), HistoricalBlockError> { + // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration + // possible. For each block fetch the parent root from its successor. Slicing from index 1 + // is safe because we've already checked that `blocks_to_import` is non-empty. + let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); + let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); + let anchor_info = self.store.get_anchor_info(); + let pubkey_cache = self + .validator_pubkey_cache + .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) + .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; + let block_roots = signed_blocks + .get(1..) + .ok_or(HistoricalBlockError::IndexOutOfBounds)? + .iter() + .map(|block| block.parent_root()) + .chain(iter::once(anchor_info.oldest_block_parent)); + let signature_set = signed_blocks + .iter() + .zip_eq(block_roots) + .filter(|&(_block, block_root)| (block_root != self.genesis_block_root)) + .map(|(block, block_root)| { + block_proposal_signature_set_from_parts( + block, + Some(block_root), + block.message().proposer_index(), + &self.spec.fork_at_epoch(block.message().epoch()), + self.genesis_validators_root, + |validator_index| pubkey_cache.get(validator_index).cloned().map(Cow::Owned), + &self.spec, + ) + }) + .collect::, _>>() + .map(ParallelSignatureSets::from)?; + drop(pubkey_cache); + drop(setup_timer); + + let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); + if !signature_set.verify() { + return Err(HistoricalBlockError::InvalidSignature("invalid".to_owned())); + } + drop(verify_timer); + drop(sig_timer); + + Ok(()) + } + /// Store a batch of historical blocks in the database. /// - /// TODO: the ascending order is never checked anywhere. /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block /// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`. /// @@ -62,7 +189,7 @@ impl BeaconChain { /// root listed in its successor, then the whole batch will be discarded and /// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then /// `SignatureSetError` or `InvalidSignature` will be returned. - /// + /// /// The block's blob and data column sidecars are verified. The function checks their signed /// block headers equal to their block's block header. Then, it verifies their KZG commitments /// with proofs. KZG inclusion proof verification is not held here as it is already done by the @@ -78,8 +205,11 @@ impl BeaconChain { /// Return the number of blocks successfully imported. pub fn import_historical_block_batch( &self, - mut blocks: Vec>, + blocks: &Vec>, ) -> Result { + // Verify blobs and data columns (headers and signatures) + let mut blocks = self.verify_blobs_and_data_columns(blocks)?; + let anchor_info = self.store.get_anchor_info(); let blob_info = self.store.get_blob_info(); let data_column_info = self.store.get_data_column_info(); @@ -115,8 +245,6 @@ impl BeaconChain { let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); - let mut all_blobs = Vec::new(); - let mut all_data_columns = Vec::new(); for available_block in blocks_to_import.into_iter().rev() { let (block_root, block, block_data) = available_block.deconstruct(); @@ -147,22 +275,10 @@ impl BeaconChain { // Also, update the oldest blob and data column slots. match &block_data { AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(blobs) => { - for blob in blobs { - if blob.signed_block_header != block.signed_block_header() { - return Err(HistoricalBlockError::MismatchedBlockHeader); - } - } - all_blobs.extend(blobs.clone()); + AvailableBlockData::Blobs(..) => { new_oldest_blob_slot = Some(block.slot()); } - AvailableBlockData::DataColumns(data_columns) => { - for data_column in data_columns { - if data_column.signed_block_header != block.signed_block_header() { - return Err(HistoricalBlockError::MismatchedBlockHeader); - } - } - all_data_columns.extend(data_columns.clone()); + AvailableBlockData::DataColumns(_) => { new_oldest_data_column_slot = Some(block.slot()); } } @@ -209,55 +325,11 @@ impl BeaconChain { break; } } - // these were pushed in reverse order so we reverse again + // These were pushed in reverse order so we reverse again. signed_blocks.reverse(); - // TODO: add metrics for blobs and column verification - // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration - // possible. For each block fetch the parent root from its successor. Slicing from index 1 - // is safe because we've already checked that `blocks_to_import` is non-empty. - let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); - let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); - let pubkey_cache = self - .validator_pubkey_cache - .try_read_for(PUBKEY_CACHE_LOCK_TIMEOUT) - .ok_or(HistoricalBlockError::ValidatorPubkeyCacheTimeout)?; - let block_roots = signed_blocks - .get(1..) - .ok_or(HistoricalBlockError::IndexOutOfBounds)? - .iter() - .map(|block| block.parent_root()) - .chain(iter::once(anchor_info.oldest_block_parent)); - let signature_set = signed_blocks - .iter() - .zip_eq(block_roots) - .filter(|&(_block, block_root)| (block_root != self.genesis_block_root)) - .map(|(block, block_root)| { - block_proposal_signature_set_from_parts( - block, - Some(block_root), - block.message().proposer_index(), - &self.spec.fork_at_epoch(block.message().epoch()), - self.genesis_validators_root, - |validator_index| pubkey_cache.get(validator_index).cloned().map(Cow::Owned), - &self.spec, - ) - }) - .collect::, _>>() - .map_err(HistoricalBlockError::SignatureSet) - .map(ParallelSignatureSets::from)?; - drop(pubkey_cache); - drop(setup_timer); - - let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); - if !signature_set.verify() { - return Err(HistoricalBlockError::InvalidSignature); - } - drop(verify_timer); - drop(sig_timer); - - verify_kzg_for_blob_list(all_blobs.iter(), self.kzg.as_ref()).map_err(|e| HistoricalBlockError::BlobOrDataColumnKzgError(e))?; - verify_kzg_for_data_column_list(all_data_columns.iter(), self.kzg.as_ref()).map_err(|e| HistoricalBlockError::BlobOrDataColumnKzgError(e))?; + // Verify signatures in a batch. + self.verify_available_block_signatures(&signed_blocks)?; // Write the I/O batches to disk, writing the blocks themselves first, as it's better // for the hot DB to contain extra blocks than for the cold DB to point to blocks that diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index e3521929748..024a75ca84b 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,9 +1,9 @@ -//#![cfg(not(debug_assertions))] +#![cfg(not(debug_assertions))] use beacon_chain::attestation_verification::Error as AttnError; -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::builder::BeaconChainBuilder; -use beacon_chain::data_availability_checker::{AvailableBlock, AvailableBlockData}; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::schema_change::migrate_schema; use beacon_chain::test_utils::SyncCommitteeStrategy; use beacon_chain::test_utils::{ @@ -21,7 +21,6 @@ use rand::rngs::StdRng; use rand::Rng; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::{state_advance::complete_state_advance, BlockReplayer}; -use tracing::debug; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryInto; @@ -2469,7 +2468,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .map(|s| s.beacon_block.clone()) .collect::>(); - let mut available_blocks = vec![]; + let mut rpc_blocks = vec![]; for blinded in historical_blocks { let block_root = blinded.canonical_root(); let full_block = harness @@ -2478,141 +2477,177 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .await .expect("should get block") .expect("should get block"); + let rpc_block = + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)); - if let MaybeAvailableBlock::Available(block) = harness + if let MaybeAvailableBlock::Available(_) = harness .chain .data_availability_checker - .verify_kzg_for_rpc_block( - harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), - ) + .verify_kzg_for_rpc_block(rpc_block.clone()) .expect("should verify kzg") { - available_blocks.push(block); + rpc_blocks.push(rpc_block); } } // Corrupt the signature on the 1st block to ensure that the backfill processor is checking // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. - let mut batch_with_invalid_first_block = - available_blocks.iter().map(clone_block).collect::>(); + let mut batch_with_invalid_first_block = rpc_blocks + .iter() + .map(|block| block.__clone_without_recv()) + .collect::>(); batch_with_invalid_first_block[0] = { - let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); - let mut corrupt_block = (*block).clone(); - *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec.clone())) + let (block_root, mut block, blobs, columns) = + rpc_blocks[0].__clone_without_recv().deconstruct(); + let custody_columns_count = rpc_blocks[0].custody_columns_count(); + let block_mut = Arc::make_mut(&mut block); + *block_mut.signature_mut() = Signature::empty(); + RpcBlock::__new_for_testing(block_root, block, blobs, columns, custody_columns_count) }; - assert_eq!(*batch_with_invalid_first_block[0].block().signature(), Signature::empty()); - // Corrupt the signature on the 1st blocks with blob sidecar and data column sidecar to ensure - // that the backfill processor is checking signatures correctly. - // TODO make them into one loop if possible + // Corrupt the signature and KZG commitments on the 1st blocks with blob sidecar and data + // column sidecar to ensure that the backfill processor is checking signatures correctly. if is_deneb || is_fulu { - let first_blob_or_col_index = - available_blocks - .iter() - .position(|block| { - let (_, _, data) = clone_block(block).deconstruct(); - match &data { - AvailableBlockData::NoData => false, - AvailableBlockData::Blobs(_) => is_deneb && !is_fulu, - AvailableBlockData::DataColumns(_) => is_fulu, - } - }) - .unwrap(); - - let mut batch_with_invalid_header = - available_blocks.iter().map(clone_block).collect::>(); - batch_with_invalid_header[first_blob_or_col_index] = { - let (block_root, block, mut data) = clone_block(&batch_with_invalid_header[first_blob_or_col_index]).deconstruct(); - match &mut data { - AvailableBlockData::NoData => panic!("should get blobs or data columns"), - AvailableBlockData::Blobs(sidecars) => { - assert!(!sidecars.is_empty(), "sidecars shouldn't be empty"); - let mut_sidecar = Arc::make_mut(&mut sidecars[0]); - mut_sidecar.signed_block_header.signature = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec.clone())) - } - AvailableBlockData::DataColumns(sidecars) => { - assert!(!sidecars.is_empty(), "sidecars shouldn't be empty"); - let mut_sidecar = Arc::make_mut(&mut sidecars[0]); - mut_sidecar.signed_block_header.signature = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec.clone())) + let first_blob_or_col_index = rpc_blocks + .iter() + .position(|block| { + let (_, _, blobs, columns) = block.__clone_without_recv().deconstruct(); + if blobs.is_some() { + is_deneb && !is_fulu + } else if columns.is_some() { + is_fulu + } else { + false } + }) + .unwrap(); + + let mut batch_with_invalid_header = rpc_blocks + .iter() + .map(|block| block.__clone_without_recv()) + .collect::>(); + batch_with_invalid_header[first_blob_or_col_index] = { + let (block_root, block, blobs, cols) = batch_with_invalid_header + [first_blob_or_col_index] + .__clone_without_recv() + .deconstruct(); + let custody_columns_count = + batch_with_invalid_header[first_blob_or_col_index].custody_columns_count(); + if let Some(mut sidecars) = blobs { + assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.signed_block_header.signature = Signature::empty(); + RpcBlock::__new_for_testing( + block_root, + block, + Some(sidecars), + cols, + custody_columns_count, + ) + } else if let Some(mut sidecars) = cols { + assert!( + !sidecars.is_empty(), + "data column sidecars shouldn't be empty" + ); + let mut sidecar = sidecars[0].clone_arc(); + let mut_sidecar = Arc::make_mut(&mut sidecar); + mut_sidecar.signed_block_header.signature = Signature::empty(); + sidecars[0] = CustodyDataColumn::from_asserted_custody(sidecar); + RpcBlock::__new_for_testing( + block_root, + block, + blobs, + Some(sidecars), + custody_columns_count, + ) + } else { + panic!("should get blobs or data columns") } }; - let mut batch_with_invalid_kzg = - available_blocks.iter().map(clone_block).collect::>(); - batch_with_invalid_kzg[first_blob_or_col_index] = { - let (block_root, block, mut data) = clone_block(&batch_with_invalid_kzg[first_blob_or_col_index]).deconstruct(); - match &mut data { - AvailableBlockData::NoData => panic!("should get blobs or data columns"), - AvailableBlockData::Blobs(sidecars) => { - assert!(!sidecars.is_empty(), "sidecars shouldn't be empty"); - let mut_sidecar = Arc::make_mut(&mut sidecars[0]); - mut_sidecar.kzg_commitment = KzgCommitment::empty_for_testing(); - AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec)) - } - AvailableBlockData::DataColumns(sidecars) => { - assert!(!sidecars.is_empty() && !sidecars[0].kzg_commitments.is_empty(), "sidecars and their commitments shouldn't be empty"); - let mut_sidecar = Arc::make_mut(&mut sidecars[0]); - mut_sidecar.kzg_commitments[0] = KzgCommitment::empty_for_testing(); - AvailableBlock::__new_for_testing(block_root, block, data, Arc::new(spec)) - } + let mut batch_with_invalid_kzg = rpc_blocks + .iter() + .map(|block| block.__clone_without_recv()) + .collect::>(); + batch_with_invalid_kzg[first_blob_or_col_index] = { + let (block_root, block, blobs, cols) = batch_with_invalid_kzg[first_blob_or_col_index] + .__clone_without_recv() + .deconstruct(); + let custody_columns_count = + batch_with_invalid_kzg[first_blob_or_col_index].custody_columns_count(); + if let Some(mut sidecars) = blobs { + assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.kzg_commitment = KzgCommitment::empty_for_testing(); + RpcBlock::__new_for_testing( + block_root, + block, + Some(sidecars), + cols, + custody_columns_count, + ) + } else if let Some(mut sidecars) = cols { + assert!( + !sidecars.is_empty(), + "data column sidecars shouldn't be empty" + ); + let mut sidecar = sidecars[0].clone_arc(); + let mut_sidecar = Arc::make_mut(&mut sidecar); + mut_sidecar.kzg_commitments[0] = KzgCommitment::empty_for_testing(); + sidecars[0] = CustodyDataColumn::from_asserted_custody(sidecar); + RpcBlock::__new_for_testing( + block_root, + block, + blobs, + Some(sidecars), + custody_columns_count, + ) + } else { + panic!("should get blobs or data columns") } }; - if let AvailableBlockData::DataColumns(blobs) = - &batch_with_invalid_kzg[first_blob_or_col_index].data() - { - assert!( - !blobs.is_empty(), - "Expected at least one blob in the corrupted block" - ); - - let blob = &blobs[0]; - assert_eq!( - blob.kzg_commitments[0], - KzgCommitment::empty_for_testing(), - "Blob signature was not correctly cleared" - ); - } else { - panic!("Expected Blobs variant at first_blob_or_col_index"); - } - - // Importing the batch with an invalid blob signature block header should error. + + // Importing the batch with an invalid blob/column signature block header should error. assert!(matches!( beacon_chain - .import_historical_block_batch(batch_with_invalid_header) + .import_historical_block_batch(&batch_with_invalid_header) .unwrap_err(), - HistoricalBlockError::MismatchedBlockHeader + HistoricalBlockError::InvalidBlobsSignature(_) + | HistoricalBlockError::InvalidDataColumnsSignature(_) )); + // Importing the batch with an invalid blob/column KZG commitment should error. assert!(matches!( beacon_chain - .import_historical_block_batch(batch_with_invalid_kzg) + .import_historical_block_batch(&batch_with_invalid_kzg) .unwrap_err(), - HistoricalBlockError::BlobOrDataColumnKzgError(_) + HistoricalBlockError::AvailabilityCheckError(_) )); } // Importing the invalid batch should error. assert!(matches!( beacon_chain - .import_historical_block_batch(batch_with_invalid_first_block) + .import_historical_block_batch(&batch_with_invalid_first_block) .unwrap_err(), - HistoricalBlockError::InvalidSignature + HistoricalBlockError::InvalidSignature(_) + | HistoricalBlockError::InvalidBlobsSignature(_) + | HistoricalBlockError::InvalidDataColumnsSignature(_), )); // Importing the batch with valid signatures should succeed. - let available_blocks_dup = available_blocks.iter().map(clone_block).collect::>(); + let rpc_blocks_dup = rpc_blocks + .iter() + .map(|block| block.__clone_without_recv()) + .collect::>(); beacon_chain - .import_historical_block_batch(available_blocks_dup) + .import_historical_block_batch(&rpc_blocks_dup) .unwrap(); assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0); // Resupplying the blocks should not fail, they can be safely ignored. beacon_chain - .import_historical_block_batch(available_blocks) + .import_historical_block_batch(&rpc_blocks) .unwrap(); // The forwards iterator should now match the original chain @@ -3773,7 +3808,3 @@ fn get_blocks( .map(|checkpoint| checkpoint.beacon_block_root.into()) .collect() } - -fn clone_block(block: &AvailableBlock) -> AvailableBlock { - block.__clone_without_recv().unwrap() -} diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index b99122da3f4..b892f889773 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -590,7 +590,7 @@ impl NetworkBeaconProcessor { let available_blocks = match self .chain .data_availability_checker - .verify_kzg_for_rpc_blocks(downloaded_blocks) + .verify_kzg_for_rpc_blocks(&downloaded_blocks) { Ok(blocks) => blocks .into_iter() @@ -635,7 +635,7 @@ impl NetworkBeaconProcessor { ); } - match self.chain.import_historical_block_batch(available_blocks) { + match self.chain.import_historical_block_batch(&downloaded_blocks) { Ok(imported_blocks) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, @@ -660,8 +660,7 @@ impl NetworkBeaconProcessor { // The peer is faulty if they send blocks with bad roots. Some(PeerAction::LowToleranceError) } - HistoricalBlockError::InvalidSignature - | HistoricalBlockError::SignatureSet(_) => { + HistoricalBlockError::InvalidSignature(e) => { warn!( error = ?e, "Backfill batch processing error" @@ -669,8 +668,10 @@ impl NetworkBeaconProcessor { // The peer is faulty if they send bad signatures. Some(PeerAction::LowToleranceError) } - HistoricalBlockError::MismatchedBlockHeader - | HistoricalBlockError::BlobOrDataColumnKzgError(_) => { + HistoricalBlockError::InvalidBlobsSignature(_) + | HistoricalBlockError::InvalidDataColumnsSignature(_) + | HistoricalBlockError::Unexpected(_) + | HistoricalBlockError::AvailabilityCheckError(_) => { warn!( error = ?e, "Backfill blob or data column verification error" From 6a9f3813a14148187082e85762f9d6cd8f13027b Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Sun, 4 May 2025 21:19:49 +0900 Subject: [PATCH 4/5] restructured importing for logical code separation --- .../src/block_verification_types.rs | 17 -- .../src/data_availability_checker.rs | 2 +- .../beacon_chain/src/historical_blocks.rs | 270 ++++++++++-------- beacon_node/beacon_chain/tests/store_tests.rs | 71 +++-- .../network_beacon_processor/sync_methods.rs | 2 +- 5 files changed, 190 insertions(+), 172 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index a769f72401a..e05611b70bc 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -270,23 +270,6 @@ impl RpcBlock { RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), } } - - /// Only used for testing - pub fn __clone_without_recv(&self) -> Self { - Self { - block_root: self.block_root, - block: match &self.block { - RpcBlockInner::Block(block) => RpcBlockInner::Block(block.clone()), - RpcBlockInner::BlockAndBlobs(block, blobs) => { - RpcBlockInner::BlockAndBlobs(block.clone(), blobs.clone()) - } - RpcBlockInner::BlockAndCustodyColumns(block, cols) => { - RpcBlockInner::BlockAndCustodyColumns(block.clone(), cols.clone()) - } - }, - custody_columns_count: self.custody_columns_count, - } - } } /// A block that has gone through all pre-deneb block processing checks including block processing diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 63e57e3a53e..31c20b6c43a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -391,7 +391,7 @@ impl DataAvailabilityChecker { /// check if there are any missing blobs. pub fn verify_kzg_for_rpc_blocks( &self, - blocks: &Vec>, + blocks: &[RpcBlock], ) -> Result>, AvailabilityCheckError> { let mut results = Vec::with_capacity(blocks.len()); let all_blobs = blocks diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index c6176e0e724..2e89239adb8 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -8,19 +8,21 @@ use state_processing::{ }; use std::borrow::Cow; use std::iter; -use std::sync::Arc; use std::time::Duration; use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::debug; -use types::{ColumnIndex, FixedBytesExtended, Hash256, SignedBeaconBlock, Slot}; +use types::{ColumnIndex, FixedBytesExtended, Hash256, Slot}; /// Use a longer timeout on the pubkey cache. /// /// It's ok if historical sync is stalled due to writes from forwards block processing. const PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(30); +type RpcBlockBatchResult = Result<(usize, Vec>), HistoricalBlockError>; +type AvailableBlockBatchResult = Result<(usize, Vec>), HistoricalBlockError>; + #[derive(Debug, IntoStaticStr)] pub enum HistoricalBlockError { /// Block root mismatch, caller should retry with different blocks. @@ -56,7 +58,10 @@ impl From for HistoricalBlockError { fn from(err: SignatureSetError) -> Self { match err { // The encoding of the signature is invalid, peer fault - e @ SignatureSetError::SignatureInvalid(_) => Self::InvalidSignature(format!("{e:?}")), + e + @ (SignatureSetError::SignatureInvalid(_) | SignatureSetError::BadBlsBytes { .. }) => { + Self::InvalidSignature(format!("{e:?}")) + } // All these variants are internal errors or unreachable for historical block paths, // which only check the proposer signature. // BadBlsBytes = Unreachable @@ -66,7 +71,6 @@ impl From for HistoricalBlockError { | SignatureSetError::IncorrectBlockProposer { .. } | SignatureSetError::MismatchedPublicKeyLen { .. } | SignatureSetError::PublicKeyDecompressionFailed - | SignatureSetError::BadBlsBytes { .. } | SignatureSetError::InconsistentBlockFork(_)) => Self::Unexpected(format!("{e:?}")), } } @@ -79,60 +83,58 @@ impl From for HistoricalBlockError { } impl BeaconChain { - fn verify_blobs_and_data_columns( + fn get_correct_historical_block_chain( &self, - blocks: &Vec>, - ) -> Result>, HistoricalBlockError> { - // Verify that blobs or data columns signatures match - let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); - let setup_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_SETUP_TIMES); - // TODO: this logic is redundant with one from range sync. Do we have a good place to make - // it as a common function? - blocks - .iter() - .map(|block| { - if let Some(indices) = block.non_matching_blobs_signed_headers() { - if !indices.is_empty() { - return Err(HistoricalBlockError::InvalidBlobsSignature(indices)); - } - } - if let Some(indices) = block.non_matching_custody_columns_signed_headers() { - if !indices.is_empty() { - return Err(HistoricalBlockError::InvalidDataColumnsSignature(indices)); - } - } - Ok(()) - }) - .collect::, _>>()?; - drop(setup_timer); + blocks: &[RpcBlock], + ) -> RpcBlockBatchResult { + let anchor_info = self.store.get_anchor_info(); - // Check that all data columns are present <- faulty failure if missing because we have - // checked the block root is correct first. - let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); - let available_blocks = self - .data_availability_checker - .verify_kzg_for_rpc_blocks(blocks) - .map_err(HistoricalBlockError::AvailabilityCheckError)? // Map outer error - .into_iter() - .map(|maybe_block| match maybe_block { - MaybeAvailableBlock::Available(block) => Ok(block), - MaybeAvailableBlock::AvailabilityPending { .. } => { - Err(HistoricalBlockError::AvailabilityCheckError( - AvailabilityCheckError::Unexpected("block not available".to_string()), - )) - } - }) - .collect::, _>>()?; // Map inner results + // Take all blocks with slots less than the oldest block slot. + let num_relevant = blocks.partition_point(|available_block| { + available_block.as_block().slot() < anchor_info.oldest_block_slot + }); - drop(verify_timer); - drop(sig_timer); + let total_blocks = blocks.len(); + let mut blocks_to_import = blocks.to_vec(); + blocks_to_import.truncate(num_relevant); - Ok(available_blocks) + if blocks_to_import.len() != total_blocks { + debug!( + oldest_block_slot = %anchor_info.oldest_block_slot, + total_blocks, + ignored = total_blocks.saturating_sub(blocks_to_import.len()), + "Ignoring some historic blocks" + ); + } + + // Validate that the blocks are chaining ascending order. + let mut expected_block_root = anchor_info.oldest_block_parent; + let mut num_to_include = blocks_to_import.len(); + + for (i, block) in blocks_to_import.iter().enumerate().rev() { + if block.block_root() != expected_block_root { + return Err(HistoricalBlockError::MismatchedBlockRoot { + block_root: block.block_root(), + expected_block_root, + }); + } + + expected_block_root = block.as_block().message().parent_root(); + + if expected_block_root == self.genesis_block_root { + num_to_include = i; + break; + } + } + + // Truncate when we reach the gensis block. + blocks_to_import.drain(..num_to_include); + Ok((num_relevant, blocks_to_import)) } - fn verify_available_block_signatures( + fn verify_rpc_block_signatures( &self, - signed_blocks: &[Arc>], + signed_blocks: &[RpcBlock], ) -> Result<(), HistoricalBlockError> { // Verify signatures in one batch, holding the pubkey cache lock for the shortest duration // possible. For each block fetch the parent root from its successor. Slicing from index 1 @@ -148,13 +150,14 @@ impl BeaconChain { .get(1..) .ok_or(HistoricalBlockError::IndexOutOfBounds)? .iter() - .map(|block| block.parent_root()) + .map(|block| block.as_block().parent_root()) .chain(iter::once(anchor_info.oldest_block_parent)); let signature_set = signed_blocks .iter() .zip_eq(block_roots) .filter(|&(_block, block_root)| (block_root != self.genesis_block_root)) .map(|(block, block_root)| { + let block = block.as_block(); block_proposal_signature_set_from_parts( block, Some(block_root), @@ -180,61 +183,76 @@ impl BeaconChain { Ok(()) } - /// Store a batch of historical blocks in the database. - /// - /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block - /// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`. - /// - /// The block roots and proposer signatures are verified. If any block doesn't match the parent - /// root listed in its successor, then the whole batch will be discarded and - /// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then - /// `SignatureSetError` or `InvalidSignature` will be returned. - /// - /// The block's blob and data column sidecars are verified. The function checks their signed - /// block headers equal to their block's block header. Then, it verifies their KZG commitments - /// with proofs. KZG inclusion proof verification is not held here as it is already done by the - /// network crate. - /// - /// To align with sync we allow some excess blocks with slots greater than or equal to - /// `oldest_block_slot` to be provided. They will be ignored without being checked. - /// - /// This function should not be called concurrently with any other function that mutates - /// the anchor info (including this function itself). If a concurrent mutation occurs that - /// would violate consistency then an `AnchorInfoConcurrentMutation` error will be returned. - /// - /// Return the number of blocks successfully imported. - pub fn import_historical_block_batch( + fn verify_blobs_and_data_columns( &self, - blocks: &Vec>, - ) -> Result { - // Verify blobs and data columns (headers and signatures) - let mut blocks = self.verify_blobs_and_data_columns(blocks)?; + blocks: &[RpcBlock], + ) -> Result>, HistoricalBlockError> { + // Verify that blobs or data columns signatures match + // TODO: this logic is redundant with one from range sync. Do we have a good place to make + // it as a common function? + blocks + .iter() + .map(|block| { + if let Some(indices) = block.non_matching_blobs_signed_headers() { + if !indices.is_empty() { + return Err(HistoricalBlockError::InvalidBlobsSignature(indices)); + } + } + if let Some(indices) = block.non_matching_custody_columns_signed_headers() { + if !indices.is_empty() { + return Err(HistoricalBlockError::InvalidDataColumnsSignature(indices)); + } + } + Ok(()) + }) + .collect::, _>>()?; - let anchor_info = self.store.get_anchor_info(); - let blob_info = self.store.get_blob_info(); - let data_column_info = self.store.get_data_column_info(); + // Check that all data columns are present <- faulty failure if missing because we have + // checked the block root is correct first. + let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); + let verify_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_VERIFY_TIMES); + let available_blocks = self + .data_availability_checker + .verify_kzg_for_rpc_blocks(blocks) + .map_err(HistoricalBlockError::AvailabilityCheckError)? // Map outer error + .into_iter() + .map(|maybe_block| match maybe_block { + MaybeAvailableBlock::Available(block) => Ok(block), + MaybeAvailableBlock::AvailabilityPending { .. } => { + Err(HistoricalBlockError::AvailabilityCheckError( + AvailabilityCheckError::Unexpected("block not available".to_string()), + )) + } + }) + .collect::, _>>()?; // Map inner results - // Take all blocks with slots less than the oldest block slot. - let num_relevant = blocks.partition_point(|available_block| { - available_block.block().slot() < anchor_info.oldest_block_slot - }); + drop(verify_timer); + drop(sig_timer); - let total_blocks = blocks.len(); - blocks.truncate(num_relevant); - let blocks_to_import = blocks; + Ok(available_blocks) + } - if blocks_to_import.len() != total_blocks { - debug!( - oldest_block_slot = %anchor_info.oldest_block_slot, - total_blocks, - ignored = total_blocks.saturating_sub(blocks_to_import.len()), - "Ignoring some historic blocks" - ); - } + fn verify_historical_block_batch( + &self, + blocks: &[RpcBlock], + ) -> AvailableBlockBatchResult { + let (num_relevant, importing_blocks) = self.get_correct_historical_block_chain(blocks)?; - if blocks_to_import.is_empty() { - return Ok(0); + if num_relevant == 0 { + return Ok((num_relevant, vec![])); } + self.verify_rpc_block_signatures(&importing_blocks)?; + let verified_blocks = self.verify_blobs_and_data_columns(&importing_blocks)?; + Ok((num_relevant, verified_blocks)) + } + + fn store_historical_block_batch( + &self, + blocks: Vec>, + ) -> Result<(), HistoricalBlockError> { + let anchor_info = self.store.get_anchor_info(); + let blob_info = self.store.get_blob_info(); + let data_column_info = self.store.get_data_column_info(); let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; @@ -242,20 +260,12 @@ impl BeaconChain { let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; let mut blob_batch = Vec::::new(); - let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); - let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); - let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); + let mut cold_batch = Vec::with_capacity(blocks.len()); + let mut hot_batch = Vec::with_capacity(blocks.len()); - for available_block in blocks_to_import.into_iter().rev() { + for available_block in blocks.into_iter().rev() { let (block_root, block, block_data) = available_block.deconstruct(); - if block_root != expected_block_root { - return Err(HistoricalBlockError::MismatchedBlockRoot { - block_root, - expected_block_root, - }); - } - if !self.store.get_config().prune_payloads { // If prune-payloads is set to false, store the block which includes the execution payload self.store @@ -306,7 +316,6 @@ impl BeaconChain { prev_block_slot = block.slot(); expected_block_root = block.message().parent_root(); - signed_blocks.push(block); // If we've reached genesis, add the genesis block root to the batch for all slots // between 0 and the first block slot, and set the anchor slot to 0 to indicate @@ -325,11 +334,6 @@ impl BeaconChain { break; } } - // These were pushed in reverse order so we reverse again. - signed_blocks.reverse(); - - // Verify signatures in a batch. - self.verify_available_block_signatures(&signed_blocks)?; // Write the I/O batches to disk, writing the blocks themselves first, as it's better // for the hot DB to contain extra blocks than for the cold DB to point to blocks that @@ -390,6 +394,38 @@ impl BeaconChain { self.store_migrator.process_reconstruction(); } + Ok(()) + } + + /// Verify and store a batch of historical blocks in the database. + /// + /// The `blocks` should be given in slot-ascending order. One of the blocks should have a block + /// root corresponding to the `oldest_block_parent` from the store's `AnchorInfo`. + /// + /// The block roots and proposer signatures are verified. If any block doesn't match the parent + /// root listed in its successor, then the whole batch will be discarded and + /// `MismatchedBlockRoot` will be returned. If any proposer signature is invalid then + /// `SignatureSetError` or `InvalidSignature` will be returned. + /// + /// The block's blob and data column sidecars are verified. The function checks their signed + /// block headers equal to their block's block header. Then, it verifies their KZG commitments + /// with proofs. KZG inclusion proof verification is not held here as it is already done by the + /// network crate. + /// + /// To align with sync we allow some excess blocks with slots greater than or equal to + /// `oldest_block_slot` to be provided. They will be ignored without being checked. + /// + /// This function should not be called concurrently with any other function that mutates + /// the anchor info (including this function itself). If a concurrent mutation occurs that + /// would violate consistency then an `AnchorInfoConcurrentMutation` error will be returned. + /// + /// Return the number of blocks successfully imported. + pub fn import_historical_block_batch( + &self, + blocks: &[RpcBlock], + ) -> Result { + let (num_relevant, verified_blocks) = self.verify_historical_block_batch(blocks)?; + self.store_historical_block_batch(verified_blocks)?; Ok(num_relevant) } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 024a75ca84b..705c3d81e6c 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2494,11 +2494,10 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. let mut batch_with_invalid_first_block = rpc_blocks .iter() - .map(|block| block.__clone_without_recv()) + .map(|block| block.clone()) .collect::>(); batch_with_invalid_first_block[0] = { - let (block_root, mut block, blobs, columns) = - rpc_blocks[0].__clone_without_recv().deconstruct(); + let (block_root, mut block, blobs, columns) = rpc_blocks[0].clone().deconstruct(); let custody_columns_count = rpc_blocks[0].custody_columns_count(); let block_mut = Arc::make_mut(&mut block); *block_mut.signature_mut() = Signature::empty(); @@ -2507,11 +2506,11 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // Corrupt the signature and KZG commitments on the 1st blocks with blob sidecar and data // column sidecar to ensure that the backfill processor is checking signatures correctly. - if is_deneb || is_fulu { + if is_deneb { let first_blob_or_col_index = rpc_blocks .iter() .position(|block| { - let (_, _, blobs, columns) = block.__clone_without_recv().deconstruct(); + let (_, _, blobs, columns) = block.clone().deconstruct(); if blobs.is_some() { is_deneb && !is_fulu } else if columns.is_some() { @@ -2524,27 +2523,17 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let mut batch_with_invalid_header = rpc_blocks .iter() - .map(|block| block.__clone_without_recv()) + .map(|block| block.clone()) .collect::>(); batch_with_invalid_header[first_blob_or_col_index] = { let (block_root, block, blobs, cols) = batch_with_invalid_header [first_blob_or_col_index] - .__clone_without_recv() + .clone() .deconstruct(); let custody_columns_count = batch_with_invalid_header[first_blob_or_col_index].custody_columns_count(); - if let Some(mut sidecars) = blobs { - assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); - let mut_sidecar = Arc::make_mut(&mut sidecars[0]); - mut_sidecar.signed_block_header.signature = Signature::empty(); - RpcBlock::__new_for_testing( - block_root, - block, - Some(sidecars), - cols, - custody_columns_count, - ) - } else if let Some(mut sidecars) = cols { + if is_fulu { + let mut sidecars = cols.unwrap(); assert!( !sidecars.is_empty(), "data column sidecars shouldn't be empty" @@ -2561,32 +2550,32 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { custody_columns_count, ) } else { - panic!("should get blobs or data columns") + let mut sidecars = blobs.unwrap(); + assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.signed_block_header.signature = Signature::empty(); + RpcBlock::__new_for_testing( + block_root, + block, + Some(sidecars), + cols, + custody_columns_count, + ) } }; let mut batch_with_invalid_kzg = rpc_blocks .iter() - .map(|block| block.__clone_without_recv()) + .map(|block| block.clone()) .collect::>(); batch_with_invalid_kzg[first_blob_or_col_index] = { let (block_root, block, blobs, cols) = batch_with_invalid_kzg[first_blob_or_col_index] - .__clone_without_recv() + .clone() .deconstruct(); let custody_columns_count = batch_with_invalid_kzg[first_blob_or_col_index].custody_columns_count(); - if let Some(mut sidecars) = blobs { - assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); - let mut_sidecar = Arc::make_mut(&mut sidecars[0]); - mut_sidecar.kzg_commitment = KzgCommitment::empty_for_testing(); - RpcBlock::__new_for_testing( - block_root, - block, - Some(sidecars), - cols, - custody_columns_count, - ) - } else if let Some(mut sidecars) = cols { + if is_fulu { + let mut sidecars = cols.unwrap(); assert!( !sidecars.is_empty(), "data column sidecars shouldn't be empty" @@ -2603,7 +2592,17 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { custody_columns_count, ) } else { - panic!("should get blobs or data columns") + let mut sidecars = blobs.unwrap(); + assert!(!sidecars.is_empty(), "blob sidecars shouldn't be empty"); + let mut_sidecar = Arc::make_mut(&mut sidecars[0]); + mut_sidecar.kzg_commitment = KzgCommitment::empty_for_testing(); + RpcBlock::__new_for_testing( + block_root, + block, + Some(sidecars), + cols, + custody_columns_count, + ) } }; @@ -2638,7 +2637,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // Importing the batch with valid signatures should succeed. let rpc_blocks_dup = rpc_blocks .iter() - .map(|block| block.__clone_without_recv()) + .map(|block| block.clone()) .collect::>(); beacon_chain .import_historical_block_batch(&rpc_blocks_dup) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index b892f889773..12dca99e9fb 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -672,7 +672,7 @@ impl NetworkBeaconProcessor { | HistoricalBlockError::InvalidDataColumnsSignature(_) | HistoricalBlockError::Unexpected(_) | HistoricalBlockError::AvailabilityCheckError(_) => { - warn!( + debug!( error = ?e, "Backfill blob or data column verification error" ); From b85dbdb9ac15e791d13a323302129ead2710cf57 Mon Sep 17 00:00:00 2001 From: SunnysidedJ Date: Wed, 7 May 2025 15:43:17 +0900 Subject: [PATCH 5/5] signature header call to a single place --- .../src/block_verification_types.rs | 32 +++++++++++++++++++ .../src/data_availability_checker.rs | 9 ++++++ .../src/data_availability_checker/error.rs | 8 ++++- .../beacon_chain/src/historical_blocks.rs | 26 +-------------- beacon_node/beacon_chain/tests/store_tests.rs | 7 ++-- .../network_beacon_processor/sync_methods.rs | 4 +-- 6 files changed, 52 insertions(+), 34 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index e05611b70bc..d6e78ffa0f5 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -81,6 +81,38 @@ impl RpcBlock { } } + /// Returns indices of blobs or data columns that have conflicting signature with block's + /// signature. + pub fn validate_non_matching_signed_headers(&self) -> Result<(), AvailabilityCheckError> { + match &self.block { + RpcBlockInner::Block(_) => Ok(()), + RpcBlockInner::BlockAndBlobs(block, blobs) => { + let indices: Vec<_> = blobs + .iter() + .filter(|blob| &blob.signed_block_header.signature != block.signature()) + .map(|blob| blob.index) + .collect(); + if !indices.is_empty() { + return Err(AvailabilityCheckError::InvalidBlobsSignature(indices)); + } + Ok(()) + } + RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { + let indices: Vec<_> = data_columns + .iter() + .filter(|column| { + &column.as_data_column().signed_block_header.signature != block.signature() + }) + .map(|column| column.index()) + .collect(); + if !indices.is_empty() { + return Err(AvailabilityCheckError::InvalidDataColumnsSignature(indices)); + } + Ok(()) + } + } + } + /// Returns indices of blobs that have conflicting signature with block's signature. pub fn non_matching_blobs_signed_headers(&self) -> Option> { match &self.block { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 31c20b6c43a..d73ad4f6bba 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -323,6 +323,9 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { + // Verify that blobs or data columns signatures match + block.validate_non_matching_signed_headers()?; + let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { @@ -393,6 +396,12 @@ impl DataAvailabilityChecker { &self, blocks: &[RpcBlock], ) -> Result>, AvailabilityCheckError> { + // Verify that blobs or data columns signatures match + blocks + .iter() + .map(|block| block.validate_non_matching_signed_headers()) + .collect::, AvailabilityCheckError>>()?; + let mut results = Vec::with_capacity(blocks.len()); let all_blobs = blocks .iter() diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb5..aa93e6085ba 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -22,6 +22,10 @@ pub enum Error { BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, + /// One or more signatures in a BlobSidecar of an RpcBlock are invalid + InvalidBlobsSignature(Vec), + /// One or more signatures in a DataColumnSidecar of an RpcBlock are invalid + InvalidDataColumnsSignature(Vec), } #[derive(PartialEq, Eq)] @@ -44,7 +48,9 @@ impl Error { | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) - | Error::SlotClockError => ErrorCategory::Internal, + | Error::SlotClockError + | Error::InvalidBlobsSignature(_) + | Error::InvalidDataColumnsSignature(_) => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } | Error::ReconstructColumnsError { .. } diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 2e89239adb8..ccefc8048c9 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -13,7 +13,7 @@ use store::metadata::DataColumnInfo; use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use tracing::debug; -use types::{ColumnIndex, FixedBytesExtended, Hash256, Slot}; +use types::{FixedBytesExtended, Hash256, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -32,10 +32,6 @@ pub enum HistoricalBlockError { }, /// Bad signature, caller should retry with different blocks. InvalidSignature(String), - /// One or more signatures in a BlobSidecar of an RpcBlock are invalid - InvalidBlobsSignature(Vec), - /// One or more signatures in a DataColumnSidecar of an RpcBlock are invalid - InvalidDataColumnsSignature(Vec), /// Unexpected error Unexpected(String), /// Transitory error, caller should retry with the same blocks. @@ -187,26 +183,6 @@ impl BeaconChain { &self, blocks: &[RpcBlock], ) -> Result>, HistoricalBlockError> { - // Verify that blobs or data columns signatures match - // TODO: this logic is redundant with one from range sync. Do we have a good place to make - // it as a common function? - blocks - .iter() - .map(|block| { - if let Some(indices) = block.non_matching_blobs_signed_headers() { - if !indices.is_empty() { - return Err(HistoricalBlockError::InvalidBlobsSignature(indices)); - } - } - if let Some(indices) = block.non_matching_custody_columns_signed_headers() { - if !indices.is_empty() { - return Err(HistoricalBlockError::InvalidDataColumnsSignature(indices)); - } - } - Ok(()) - }) - .collect::, _>>()?; - // Check that all data columns are present <- faulty failure if missing because we have // checked the block root is correct first. let sig_timer = metrics::start_timer(&metrics::BACKFILL_SIGNATURE_TOTAL_TIMES); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 705c3d81e6c..e73de11d3c7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2611,8 +2611,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { beacon_chain .import_historical_block_batch(&batch_with_invalid_header) .unwrap_err(), - HistoricalBlockError::InvalidBlobsSignature(_) - | HistoricalBlockError::InvalidDataColumnsSignature(_) + HistoricalBlockError::AvailabilityCheckError(_) )); // Importing the batch with an invalid blob/column KZG commitment should error. @@ -2629,9 +2628,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { beacon_chain .import_historical_block_batch(&batch_with_invalid_first_block) .unwrap_err(), - HistoricalBlockError::InvalidSignature(_) - | HistoricalBlockError::InvalidBlobsSignature(_) - | HistoricalBlockError::InvalidDataColumnsSignature(_), + HistoricalBlockError::InvalidSignature(_) | HistoricalBlockError::AvailabilityCheckError(_) )); // Importing the batch with valid signatures should succeed. diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 12dca99e9fb..3869d462546 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -668,9 +668,7 @@ impl NetworkBeaconProcessor { // The peer is faulty if they send bad signatures. Some(PeerAction::LowToleranceError) } - HistoricalBlockError::InvalidBlobsSignature(_) - | HistoricalBlockError::InvalidDataColumnsSignature(_) - | HistoricalBlockError::Unexpected(_) + HistoricalBlockError::Unexpected(_) | HistoricalBlockError::AvailabilityCheckError(_) => { debug!( error = ?e,