diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d81d964e7cf..720a5bc8bd3 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -39,7 +39,9 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::{self, error::TrySendError}; use types::*; -pub use sync_methods::ChainSegmentProcessId; +pub use sync_methods::{ + ChainSegmentProcessId, ErrorCategory as LookupSyncErrorCategory, LookupSyncProcessingResult, +}; use types::blob_sidecar::FixedBlobSidecarList; pub type Error = TrySendError>; @@ -983,7 +985,7 @@ impl NetworkBeaconProcessor { "result" => "imported block and custody columns", "block_hash" => %hash, ); - self.chain.recompute_head_at_current_slot().await; + // Head will be recomputed `handle_lookup_sync_processing_result` } AvailabilityProcessingStatus::MissingComponents(_, _) => { debug!( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6c6bb26ee09..5325c1f53a0 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -9,6 +9,7 @@ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::data_column_verification::verify_kzg_for_data_column_list; +use beacon_chain::ExecutionPayloadError; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -44,6 +45,60 @@ struct ChainSegmentFailed { peer_action: Option, } +#[derive(Debug)] +pub enum LookupSyncProcessingResult { + FullyImported, + ImportedMissingComponents, + Error(ErrorCategory), + ParentUnknown(Hash256), + Ignored, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum ErrorCategory { + /// Internal Errors (not caused by peers). + /// + /// An internal no-retry error is permanent and block processing should not be + /// re-attempted. + Internal { retry: bool }, + /// Errors caused by faulty / malicious peers. + /// + /// No retry errors are deterministic against the block's root. Re-downloading data + /// key-ed by block root MUST result in the same no-retry error (i.e. invalid parent, + /// invalid state root, etc). + /// + /// The error also indicates which block component index is malicious if applicable. + Malicious { retry: bool, index: usize }, +} + +impl From for LookupSyncProcessingResult { + fn from(e: ErrorCategory) -> Self { + Self::Error(e) + } +} + +impl ErrorCategory { + // Helper functions for readibility on large match statements + pub fn internal_no_retry() -> Self { + Self::Internal { retry: false } + } + pub fn internal_retry() -> Self { + Self::Internal { retry: true } + } + pub fn malicious_no_retry() -> Self { + Self::Malicious { + retry: false, + index: 0, + } + } + pub fn malicious_retry() -> Self { + Self::Malicious { + retry: true, + index: 0, + } + } +} + impl NetworkBeaconProcessor { /// Returns an async closure which processes a beacon block received via RPC. /// @@ -92,7 +147,7 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: crate::sync::manager::BlockProcessingResult::Ignored, + result: LookupSyncProcessingResult::Ignored, }); }; (process_fn, Box::new(ignore_fn)) @@ -214,7 +269,9 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.into(), + result: self + .handle_lookup_sync_processing_result(block_root, result) + .await, }); // Drop the handle to remove the entry from the cache @@ -287,48 +344,12 @@ impl NetworkBeaconProcessor { let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await; - match &result { - Ok(AvailabilityProcessingStatus::Imported(hash)) => { - debug!( - self.log, - "Block components retrieved"; - "result" => "imported block and blobs", - "slot" => %slot, - "block_hash" => %hash, - ); - self.chain.recompute_head_at_current_slot().await; - } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - debug!( - self.log, - "Missing components over rpc"; - "block_hash" => %block_root, - "slot" => %slot, - ); - } - Err(BlockError::DuplicateFullyImported(_)) => { - debug!( - self.log, - "Blobs have already been imported"; - "block_hash" => %block_root, - "slot" => %slot, - ); - } - Err(e) => { - warn!( - self.log, - "Error when importing rpc blobs"; - "error" => ?e, - "block_hash" => %block_root, - "slot" => %slot, - ); - } - } - // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.into(), + result: self + .handle_lookup_sync_processing_result(block_root, result) + .await, }); } @@ -344,53 +365,159 @@ impl NetworkBeaconProcessor { .process_rpc_custody_columns(custody_columns) .await; - match &result { - Ok(availability) => match availability { - AvailabilityProcessingStatus::Imported(hash) => { - debug!( - self.log, - "Block components retrieved"; - "result" => "imported block and custody columns", - "block_hash" => %hash, - ); - self.chain.recompute_head_at_current_slot().await; - } - AvailabilityProcessingStatus::MissingComponents(_, _) => { - debug!( - self.log, - "Missing components over rpc"; - "block_hash" => %block_root, - ); - // Attempt reconstruction here before notifying sync, to avoid sending out more requests - // that we may no longer need. - if let Some(availability) = - self.attempt_data_column_reconstruction(block_root).await - { - result = Ok(availability) - } - } - }, - Err(BlockError::DuplicateFullyImported(_)) => { + if let &Ok(AvailabilityProcessingStatus::MissingComponents { .. }) = &result { + // Attempt reconstruction here before notifying sync, to avoid sending out more requests + // that we may no longer need. + if let Some(availability) = self.attempt_data_column_reconstruction(block_root).await { + result = Ok(availability) + } + } + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: self + .handle_lookup_sync_processing_result(block_root, result) + .await, + }); + } + + async fn handle_lookup_sync_processing_result( + &self, + block_root: Hash256, + result: Result, + ) -> LookupSyncProcessingResult { + match result { + Ok(AvailabilityProcessingStatus::Imported { .. }) => { debug!( self.log, - "Custody columns have already been imported"; - "block_hash" => %block_root, + "Fully imported block components over RPC"; + "block_root" => %block_root, ); + self.chain.recompute_head_at_current_slot().await; + LookupSyncProcessingResult::FullyImported } - Err(e) => { - warn!( + Ok(AvailabilityProcessingStatus::MissingComponents { .. }) => { + debug!( self.log, - "Error when importing rpc custody columns"; - "error" => ?e, - "block_hash" => %block_root, + "Missing components over RPC"; + "block_root" => %block_root, ); + LookupSyncProcessingResult::ImportedMissingComponents + } + Err(err) => { + debug!( + self.log, + "Error processing block component"; + "block_root" => %block_root, + "err" => ?err, + ); + match err { + BlockError::ParentUnknown { parent_root } => { + LookupSyncProcessingResult::ParentUnknown(parent_root) + } + // A peer may craft a block that is at a future slot. It's possible that + // eventually the slot will no longer be in the future. However, since it's + // malicious action to serve an RPC with a future slot we will not retry. + BlockError::FutureSlot { .. } => ErrorCategory::malicious_no_retry().into(), + // All these variants are invalid block errors deterministic on the block root, + // no need to retry + BlockError::StateRootMismatch { .. } + | BlockError::GenesisBlock + | BlockError::WouldRevertFinalizedSlot { .. } + | BlockError::NotFinalizedDescendant { .. } + | BlockError::BlockSlotLimitReached + | BlockError::IncorrectBlockProposer { .. } + | BlockError::UnknownValidator { .. } + | BlockError::BlockIsNotLaterThanParent { .. } + | BlockError::PerBlockProcessingError(_) + | BlockError::InconsistentFork(_) + | BlockError::WeakSubjectivityConflict => { + ErrorCategory::malicious_no_retry().into() + } + BlockError::DuplicateFullyImported { .. } => { + LookupSyncProcessingResult::FullyImported + } + BlockError::DuplicateImportStatusUnknown { .. } => { + // This is unreachable because RPC blocks do not undergo gossip verification, and + // this error can *only* come from gossip verification. + ErrorCategory::internal_no_retry().into() + } + // TODO (InvalidSignature): Labeling as recoverable as it may be the proposer signature. We should check + // Which one is it and label as non recoverable if the proposer signature is correct. + BlockError::ProposalSignatureInvalid | BlockError::InvalidSignature => { + ErrorCategory::malicious_retry().into() + } + BlockError::ExecutionPayloadError(e) => match e { + // The peer has nothing to do with this error, do not penalize them. + ExecutionPayloadError::NoExecutionConnection => { + ErrorCategory::internal_no_retry() + } + // The peer has nothing to do with this error, do not penalize them. + ExecutionPayloadError::RequestFailed(_) => ErrorCategory::internal_retry(), + // Execution payload is invalid + ExecutionPayloadError::RejectedByExecutionEngine { .. } + | ExecutionPayloadError::InvalidPayloadTimestamp { .. } + | ExecutionPayloadError::InvalidTerminalPoWBlock { .. } + | ExecutionPayloadError::InvalidActivationEpoch { .. } + | ExecutionPayloadError::InvalidTerminalBlockHash { .. } => { + ErrorCategory::malicious_no_retry() + } + // Do not penalize the peer since it's not their fault that *we're* optimistic. + ExecutionPayloadError::UnverifiedNonOptimisticCandidate => { + ErrorCategory::internal_retry() + } + } + .into(), + BlockError::ParentExecutionPayloadInvalid { .. } => { + ErrorCategory::malicious_no_retry().into() + } + // TODO: Review AvailabilityCheckError variants + BlockError::AvailabilityCheck(e) => match e { + AvailabilityCheckError::SszTypes(_) + | AvailabilityCheckError::StoreError(_) + | AvailabilityCheckError::Unexpected + | AvailabilityCheckError::ParentStateMissing(_) + | AvailabilityCheckError::BlockReplayError(_) + | AvailabilityCheckError::RebuildingStateCaches(_) + | AvailabilityCheckError::SlotClockError => ErrorCategory::internal_retry(), + AvailabilityCheckError::InvalidColumn(index, _) => { + ErrorCategory::Malicious { + retry: true, + index: index as usize, + } + } + AvailabilityCheckError::InvalidBlobs { .. } + | AvailabilityCheckError::MissingBlobs + | AvailabilityCheckError::MissingCustodyColumns + | AvailabilityCheckError::DecodeError(_) + | AvailabilityCheckError::ReconstructColumnsError { .. } + | AvailabilityCheckError::BlobIndexInvalid(_) + | AvailabilityCheckError::DataColumnIndexInvalid(_) + | AvailabilityCheckError::KzgCommitmentMismatch { .. } => { + ErrorCategory::malicious_retry() + } // Do not use a fallback match, handle all errors explicitly + } + .into(), + // The proposer making a slashable block is not the peer's fault nor ours. Mark + // as internal (don't penalize peer), and no retry (the block will forever be + // slashable). + BlockError::Slashable => ErrorCategory::internal_no_retry().into(), + // TODO: BeaconChainError should be retried? + BlockError::BeaconChainError(_) | BlockError::InternalError(_) => { + ErrorCategory::internal_no_retry().into() + } + // unreachable, this error is only part of gossip + BlockError::BlobNotRequired(_) => ErrorCategory::malicious_retry().into(), + // Unreachable: This variants never happen in lookup sync, only in range sync. + // Does not matter what we set here, just setting `internal_recoverable` to + // put something. + BlockError::NonLinearParentRoots | BlockError::NonLinearSlots => { + ErrorCategory::internal_retry().into() + } // + // Do not use a fallback match, handle all errors explicitly + } } } - - self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type, - result: result.into(), - }); } /// Validate a list of data columns received from RPC requests diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 5a11bca4814..2864fabb125 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -23,17 +23,15 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; -use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; +use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; +use crate::network_beacon_processor::{LookupSyncErrorCategory, LookupSyncProcessingResult}; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::SyncMessage; use beacon_chain::block_verification_types::AsBlock; -use beacon_chain::data_availability_checker::{ - AvailabilityCheckError, AvailabilityCheckErrorCategory, -}; -use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; +use beacon_chain::BeaconChainTypes; pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; @@ -516,7 +514,7 @@ impl BlockLookups { pub fn on_processing_result( &mut self, process_type: BlockProcessType, - result: BlockProcessingResult, + result: LookupSyncProcessingResult, cx: &mut SyncNetworkContext, ) { let lookup_result = match process_type { @@ -536,7 +534,7 @@ impl BlockLookups { pub fn on_processing_result_inner>( &mut self, lookup_id: SingleLookupId, - result: BlockProcessingResult, + result: LookupSyncProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { @@ -559,16 +557,12 @@ impl BlockLookups { ); let action = match result { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) => { + LookupSyncProcessingResult::FullyImported => { // Successfully imported request_state.on_processing_success()?; Action::Continue } - - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { - .. - }) => { + LookupSyncProcessingResult::ImportedMissingComponents => { // `on_processing_success` is called here to ensure the request state is updated prior to checking // if both components have been processed. request_state.on_processing_success()?; @@ -584,17 +578,7 @@ impl BlockLookups { Action::Retry } } - BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => { - // This is unreachable because RPC blocks do not undergo gossip verification, and - // this error can *only* come from gossip verification. - error!( - self.log, - "Single block lookup hit unreachable condition"; - "block_root" => ?block_root - ); - Action::Drop - } - BlockProcessingResult::Ignored => { + LookupSyncProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. warn!( @@ -604,58 +588,37 @@ impl BlockLookups { ); Action::Drop } - BlockProcessingResult::Err(e) => { - match e { - BlockError::BeaconChainError(e) => { - // Internal error - error!(self.log, "Beacon chain error processing lookup component"; "block_root" => %block_root, "error" => ?e); - Action::Drop - } - BlockError::ParentUnknown { parent_root, .. } => { - // Reverts the status of this request to `AwaitingProcessing` holding the - // downloaded data. A future call to `continue_requests` will re-submit it - // once there are no pending parent requests. - // Note: `BlockError::ParentUnknown` is only returned when processing - // blocks, not blobs. - request_state.revert_to_awaiting_processing()?; - Action::ParentUnknown { parent_root } + LookupSyncProcessingResult::ParentUnknown(parent_root) => { + // Reverts the status of this request to `AwaitingProcessing` holding the + // downloaded data. A future call to `continue_requests` will re-submit it + // once there are no pending parent requests. + // Note: `BlockError::ParentUnknown` is only returned when processing + // blocks, not blobs. + request_state.revert_to_awaiting_processing()?; + Action::ParentUnknown { parent_root } + } + LookupSyncProcessingResult::Error(e) => { + let recoverable = match e { + LookupSyncErrorCategory::Internal { retry: recoverable } => { + error!(self.log, + "Internal error processing lookup component"; + "block_root" => %block_root, + "recoverable" => recoverable, + ); + recoverable } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - self.log, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; + LookupSyncErrorCategory::Malicious { + retry: recoverable, + index, + } => { + debug!(self.log, + "Invalid lookup component"; "block_root" => ?block_root, - "error" => ?e + "component" => ?R::response_type(), + "recoverable" => recoverable, ); - Action::Drop - } - BlockError::AvailabilityCheck(e) - if e.category() == AvailabilityCheckErrorCategory::Internal => - { - // There errors indicate internal problems and should not downscore the peer - warn!(self.log, "Internal availability check failure"; "block_root" => ?block_root, "error" => ?e); - - // Here we choose *not* to call `on_processing_failure` because this could result in a bad - // lookup state transition. This error invalidates both blob and block requests, and we don't know the - // state of both requests. Blobs may have already successfullly processed for example. - // We opt to drop the lookup instead. - Action::Drop - } - other => { - debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other); let peer_group = request_state.on_processing_failure()?; - let peers_to_penalize: Vec<_> = match other { - // Note: currenlty only InvalidColumn errors have index granularity, - // but future errors may follow the same pattern. Generalize this - // pattern with https://github.com/sigp/lighthouse/pull/6321 - BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn(index, _), - ) => peer_group.of_index(index as usize).collect(), - _ => peer_group.all().collect(), - }; - for peer in peers_to_penalize { + for peer in peer_group.of_index(index) { cx.report_peer( *peer, PeerAction::MidToleranceError, @@ -668,9 +631,14 @@ impl BlockLookups { }, ); } - - Action::Retry + recoverable } + }; + + if recoverable { + Action::Retry + } else { + Action::Drop } } }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 344e91711c4..7d8b9d667a4 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,7 +41,9 @@ use super::network_context::{ use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; -use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; +use crate::network_beacon_processor::{ + ChainSegmentProcessId, LookupSyncProcessingResult, NetworkBeaconProcessor, +}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ @@ -51,9 +53,7 @@ use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; -use beacon_chain::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, -}; +use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ @@ -163,7 +163,7 @@ pub enum SyncMessage { /// Block processed BlockComponentProcessed { process_type: BlockProcessType, - result: BlockProcessingResult, + result: LookupSyncProcessingResult, }, /// Sample data column verified @@ -194,13 +194,6 @@ impl BlockProcessType { } } -#[derive(Debug)] -pub enum BlockProcessingResult { - Ok(AvailabilityProcessingStatus), - Err(BlockError), - Ignored, -} - /// The result of processing multiple blocks (a chain segment). #[derive(Debug)] pub enum BatchProcessResult { @@ -1290,18 +1283,3 @@ impl SyncManager { } } } - -impl From> for BlockProcessingResult { - fn from(result: Result) -> Self { - match result { - Ok(status) => BlockProcessingResult::Ok(status), - Err(e) => BlockProcessingResult::Err(e), - } - } -} - -impl From for BlockProcessingResult { - fn from(e: BlockError) -> Self { - BlockProcessingResult::Err(e) - } -} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9f2c9ef66f0..f47d1244539 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1,4 +1,4 @@ -use crate::network_beacon_processor::NetworkBeaconProcessor; +use crate::network_beacon_processor::{LookupSyncProcessingResult, NetworkBeaconProcessor}; use crate::sync::block_lookups::{ BlockLookupSummary, PARENT_DEPTH_TOLERANCE, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; @@ -421,24 +421,21 @@ impl TestRig { *parent_chain.last().unwrap() } - fn parent_block_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { + fn parent_block_processed(&mut self, chain_hash: Hash256, result: LookupSyncProcessingResult) { let id = self.find_single_lookup_for(self.find_oldest_parent_lookup(chain_hash)); self.single_block_component_processed(id, result); } - fn parent_blob_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { + fn parent_blob_processed(&mut self, chain_hash: Hash256, result: LookupSyncProcessingResult) { let id = self.find_single_lookup_for(self.find_oldest_parent_lookup(chain_hash)); self.single_blob_component_processed(id, result); } fn parent_block_processed_imported(&mut self, chain_hash: Hash256) { - self.parent_block_processed( - chain_hash, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(chain_hash)), - ); + self.parent_block_processed(chain_hash, LookupSyncProcessingResult::FullyImported); } - fn single_block_component_processed(&mut self, id: Id, result: BlockProcessingResult) { + fn single_block_component_processed(&mut self, id: Id, result: LookupSyncProcessingResult) { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type: BlockProcessType::SingleBlock { id }, result, @@ -447,13 +444,10 @@ impl TestRig { fn single_block_component_processed_imported(&mut self, block_root: Hash256) { let id = self.find_single_lookup_for(block_root); - self.single_block_component_processed( - id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), - ) + self.single_block_component_processed(id, LookupSyncProcessingResult::FullyImported) } - fn single_blob_component_processed(&mut self, id: Id, result: BlockProcessingResult) { + fn single_blob_component_processed(&mut self, id: Id, result: LookupSyncProcessingResult) { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type: BlockProcessType::SingleBlob { id }, result, @@ -541,18 +535,14 @@ impl TestRig { blobs: Vec>, import: bool, ) { - let block_root = blobs.first().unwrap().block_root(); - let block_slot = blobs.first().unwrap().slot(); self.complete_single_lookup_blob_download(id, peer_id, blobs); self.expect_block_process(ResponseType::Blob); self.single_blob_component_processed( id.lookup_id, if import { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) + LookupSyncProcessingResult::FullyImported } else { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - block_slot, block_root, - )) + LookupSyncProcessingResult::ImportedMissingComponents }, ); } @@ -572,12 +562,9 @@ impl TestRig { self.single_block_component_processed( id, if import { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) + LookupSyncProcessingResult::FullyImported } else { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - Slot::new(0), - block_root, - )) + LookupSyncProcessingResult::ImportedMissingComponents }, ) } @@ -656,8 +643,6 @@ impl TestRig { ) { // Complete download let peer_id = PeerId::random(); - let slot = block.slot(); - let block_root = block.canonical_root(); self.single_lookup_block_response(id, peer_id, Some(block)); self.single_lookup_block_response(id, peer_id, None); // Expect processing and resolve with import @@ -665,11 +650,9 @@ impl TestRig { self.single_block_component_processed( id.lookup_id, if missing_components { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - slot, block_root, - )) + LookupSyncProcessingResult::ImportedMissingComponents } else { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) + LookupSyncProcessingResult::FullyImported }, ) } @@ -736,8 +719,6 @@ impl TestRig { panic!("not a custody requester") }; - let first_column = data_columns.first().cloned().unwrap(); - for id in ids { self.log(&format!("return valid data column for {id:?}")); let indices = &id.1; @@ -756,14 +737,9 @@ impl TestRig { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type: BlockProcessType::SingleCustodyColumn(lookup_id), result: if missing_components { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - first_column.slot(), - first_column.block_root(), - )) + LookupSyncProcessingResult::ImportedMissingComponents } else { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( - first_column.block_root(), - )) + LookupSyncProcessingResult::FullyImported }, }); } @@ -1468,9 +1444,7 @@ fn test_single_block_lookup_becomes_parent_request() { // parent request after processing. rig.single_block_component_processed( id.lookup_id, - BlockProcessingResult::Err(BlockError::ParentUnknown { - parent_root: block.parent_root(), - }), + LookupSyncProcessingResult::ParentUnknown(block.parent_root()), ); assert_eq!(rig.active_single_lookups_count(), 2); // 2 = current + parent rig.expect_block_parent_request(parent_root); @@ -1499,10 +1473,7 @@ fn test_parent_lookup_happy_path() { // Add peer to child lookup to prevent it being dropped rig.trigger_unknown_block_from_attestation(block_root, peer_id); // Processing succeeds, now the rest of the chain should be sent for processing. - rig.parent_block_processed( - block_root, - BlockError::DuplicateFullyImported(block_root).into(), - ); + rig.parent_block_processed(block_root, LookupSyncProcessingResult::FullyImported); rig.expect_parent_chain_process(); rig.parent_chain_processed_success(block_root, &[]); rig.expect_no_active_lookups_empty_network(); @@ -1661,7 +1632,10 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { rig.assert_not_failed_chain(block_root); // send the right parent but fail processing rig.parent_lookup_block_response(id, peer_id, Some(parent.clone().into())); - rig.parent_block_processed(block_root, BlockError::InvalidSignature.into()); + rig.parent_block_processed( + block_root, + LookupSyncProcessingResult::Error(LookupSyncErrorCategory::malicious_retry()), + ); rig.parent_lookup_block_response(id, peer_id, None); rig.expect_penalty(peer_id, "lookup_block_processing_failure"); } @@ -1691,9 +1665,7 @@ fn test_parent_lookup_too_deep_grow_ancestor() { // the processing result rig.parent_block_processed( chain_hash, - BlockProcessingResult::Err(BlockError::ParentUnknown { - parent_root: block.parent_root(), - }), + LookupSyncProcessingResult::ParentUnknown(block.parent_root()), ) } @@ -1728,10 +1700,7 @@ fn test_parent_lookup_too_deep_grow_tip() { rig.expect_block_process(ResponseType::Block); rig.single_block_component_processed( id.lookup_id, - BlockError::ParentUnknown { - parent_root: block.parent_root(), - } - .into(), + LookupSyncProcessingResult::ParentUnknown(block.parent_root()), ); } @@ -1836,7 +1805,7 @@ fn test_single_block_lookup_ignored_response() { // after processing. rig.single_lookup_block_response(id, peer_id, None); // Send an Ignored response, the request should be dropped - rig.single_block_component_processed(id.lookup_id, BlockProcessingResult::Ignored); + rig.single_block_component_processed(id.lookup_id, LookupSyncProcessingResult::Ignored); rig.expect_no_active_lookups_empty_network(); } @@ -1859,7 +1828,7 @@ fn test_parent_lookup_ignored_response() { rig.expect_empty_network(); // Return an Ignored result. The request should be dropped - rig.parent_block_processed(block_root, BlockProcessingResult::Ignored); + rig.parent_block_processed(block_root, LookupSyncProcessingResult::Ignored); rig.expect_empty_network(); rig.expect_no_active_lookups(); } @@ -1889,17 +1858,12 @@ fn test_same_chain_race_condition() { // the processing result if i + 2 == depth { rig.log(&format!("Block {i} was removed and is already known")); - rig.parent_block_processed( - chain_hash, - BlockError::DuplicateFullyImported(block.canonical_root()).into(), - ) + rig.parent_block_processed(chain_hash, LookupSyncProcessingResult::FullyImported) } else { rig.log(&format!("Block {i} ParentUnknown")); rig.parent_block_processed( chain_hash, - BlockProcessingResult::Err(BlockError::ParentUnknown { - parent_root: block.parent_root(), - }), + LookupSyncProcessingResult::ParentUnknown(block.parent_root()), ) } } @@ -2172,9 +2136,7 @@ fn custody_lookup_happy_path() { mod deneb_only { use super::*; - use beacon_chain::{ - block_verification_types::RpcBlock, data_availability_checker::AvailabilityCheckError, - }; + use beacon_chain::block_verification_types::RpcBlock; use ssz_types::VariableList; use std::collections::VecDeque; @@ -2192,7 +2154,6 @@ mod deneb_only { parent_block_req_id: Option, blob_req_id: Option, parent_blob_req_id: Option, - slot: Slot, block_root: Hash256, } @@ -2220,7 +2181,6 @@ mod deneb_only { let (block, blobs) = rig.rand_block_and_blobs(NumBlobs::Random); let mut block = Arc::new(block); let mut blobs = blobs.into_iter().map(Arc::new).collect::>(); - let slot = block.slot(); let num_parents = request_trigger.num_parents(); let mut parent_block_chain = VecDeque::with_capacity(num_parents); @@ -2296,7 +2256,6 @@ mod deneb_only { parent_block_req_id, blob_req_id, parent_blob_req_id, - slot, block_root, }) } @@ -2454,10 +2413,7 @@ mod deneb_only { fn block_missing_components(mut self) -> Self { self.rig.single_block_component_processed( self.block_req_id.expect("block request id").lookup_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.block.slot(), - self.block_root, - )), + LookupSyncProcessingResult::ImportedMissingComponents, ); self.rig.expect_empty_network(); self.rig.assert_single_lookups_count(1); @@ -2467,7 +2423,7 @@ mod deneb_only { fn blob_imported(mut self) -> Self { self.rig.single_blob_component_processed( self.blob_req_id.expect("blob request id").lookup_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + LookupSyncProcessingResult::FullyImported, ); self.rig.expect_empty_network(); self.rig.assert_single_lookups_count(0); @@ -2482,7 +2438,7 @@ mod deneb_only { .or(self.blob_req_id) .expect("block request id") .lookup_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + LookupSyncProcessingResult::FullyImported, ); self.rig.expect_empty_network(); self.rig.assert_single_lookups_count(0); @@ -2493,10 +2449,8 @@ mod deneb_only { let parent_root = *self.parent_block_roots.first().unwrap(); self.rig .log(&format!("parent_block_imported {parent_root:?}")); - self.rig.parent_block_processed( - self.block_root, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(parent_root)), - ); + self.rig + .parent_block_processed(self.block_root, LookupSyncProcessingResult::FullyImported); self.rig.expect_no_requests_for(parent_root); self.rig.assert_parent_lookups_count(0); self @@ -2508,10 +2462,7 @@ mod deneb_only { .log(&format!("parent_block_missing_components {parent_root:?}")); self.rig.parent_block_processed( self.block_root, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - Slot::new(0), - parent_root, - )), + LookupSyncProcessingResult::ImportedMissingComponents, ); self.rig.expect_no_requests_for(parent_root); self @@ -2521,10 +2472,8 @@ mod deneb_only { let parent_root = *self.parent_block_roots.first().unwrap(); self.rig .log(&format!("parent_blob_imported {parent_root:?}")); - self.rig.parent_blob_processed( - self.block_root, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(parent_root)), - ); + self.rig + .parent_blob_processed(self.block_root, LookupSyncProcessingResult::FullyImported); self.rig.expect_no_requests_for(parent_root); self.rig.assert_parent_lookups_count(0); @@ -2544,9 +2493,7 @@ mod deneb_only { .unwrap(); self.rig.parent_block_processed( self.block_root, - BlockProcessingResult::Err(BlockError::ParentUnknown { - parent_root: block.parent_root(), - }), + LookupSyncProcessingResult::ParentUnknown(block.parent_root()), ); assert_eq!(self.rig.active_parent_lookups_count(), 1); self @@ -2555,7 +2502,7 @@ mod deneb_only { fn invalid_parent_processed(mut self) -> Self { self.rig.parent_block_processed( self.block_root, - BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), + LookupSyncProcessingResult::Error(LookupSyncErrorCategory::malicious_retry()), ); assert_eq!(self.rig.active_parent_lookups_count(), 1); self @@ -2564,7 +2511,7 @@ mod deneb_only { fn invalid_block_processed(mut self) -> Self { self.rig.single_block_component_processed( self.block_req_id.expect("block request id").lookup_id, - BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), + LookupSyncProcessingResult::Error(LookupSyncErrorCategory::malicious_retry()), ); self.rig.assert_single_lookups_count(1); self @@ -2574,9 +2521,7 @@ mod deneb_only { self.rig.log("invalid_blob_processed"); self.rig.single_blob_component_processed( self.blob_req_id.expect("blob request id").lookup_id, - BlockProcessingResult::Err(BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidBlobs(kzg::Error::KzgVerificationFailed), - )), + LookupSyncProcessingResult::Error(LookupSyncErrorCategory::malicious_retry()), ); self.rig.assert_single_lookups_count(1); self @@ -2585,10 +2530,7 @@ mod deneb_only { fn missing_components_from_block_request(mut self) -> Self { self.rig.single_block_component_processed( self.block_req_id.expect("block request id").lookup_id, - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - self.slot, - self.block_root, - )), + LookupSyncProcessingResult::ImportedMissingComponents, ); // Add block to da_checker so blobs request can continue self.rig.insert_block_to_da_checker(self.block.clone());