diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d0ed8258e55..ff9785b9a6a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -418,7 +418,7 @@ pub enum ExecutionPayloadError { } impl ExecutionPayloadError { - pub fn penalize_peer(&self) -> bool { + pub fn penalize_gossip_peer(&self) -> bool { // This match statement should never have a default case so that we are // always forced to consider here whether or not to penalize a peer when // we add a new error condition. @@ -447,6 +447,39 @@ impl ExecutionPayloadError { ExecutionPayloadError::UnverifiedNonOptimisticCandidate => false, } } + + pub fn penalize_sync_peer(&self) -> bool { + // This match statement should never have a default case so that we are + // always forced to consider here whether or not to penalize a peer when + // we add a new error condition. + match self { + // The peer has nothing to do with this error, do not penalize them. + ExecutionPayloadError::NoExecutionConnection => false, + // The peer has nothing to do with this error, do not penalize them. + ExecutionPayloadError::RequestFailed(_) => false, + // For the sync case, we do not want a peer to keep sending us blocks that our + // execution engine considers invalid. + // + // Also, we ask peers for blocks over sync/rpc only when they indicate + // that they have fully validated a given block (using their status message). + // + // Hence, we should penalize for this error in the sync case. + ExecutionPayloadError::RejectedByExecutionEngine { .. } => true, + // There is no reason for an honest peer to propagate a block with an invalid + // payload time stamp. + ExecutionPayloadError::InvalidPayloadTimestamp { .. } => true, + // We do not want to receive these blocks over rpc even though the gossip + // case is still allowed. + ExecutionPayloadError::InvalidTerminalPoWBlock { .. } => true, + // We should penalize RPC blocks, since even an optimistic node shouldn't + // verify this block. + ExecutionPayloadError::InvalidActivationEpoch { .. } => true, + // As per `Self::InvalidActivationEpoch`. + ExecutionPayloadError::InvalidTerminalBlockHash { .. } => true, + // Do not penalize the peer since it's not their fault that *we're* optimistic. + ExecutionPayloadError::UnverifiedNonOptimisticCandidate => false, + } + } } impl From for ExecutionPayloadError { diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 0ccad8d0421..45293cdbaab 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -326,6 +326,36 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns an iterator of all good gossipsub peers that are supposed to be custodying + /// the given subnet id and have the epoch according to their status messages. + pub fn good_custody_subnet_peer_range_sync( + &self, + subnet: DataColumnSubnetId, + epoch: Epoch, + ) -> impl Iterator { + self.peers + .iter() + .filter(move |(_, info)| { + // The custody_subnets hashset can be populated via enr or metadata + let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); + + info.is_connected() + && is_custody_subnet_peer + && match info.sync_status() { + SyncStatus::Synced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::Advanced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::IrrelevantPeer + | SyncStatus::Behind { .. } + | SyncStatus::Unknown => false, + } + }) + .map(|(peer_id, _)| peer_id) + } + /// Checks if there is at least one good peer for each specified custody subnet for the given epoch. /// A "good" peer is one that is both connected and synced (or advanced) for the specified epoch. pub fn has_good_custody_range_sync_peer( diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 9319973e597..4b930a091f4 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -535,6 +535,18 @@ impl DataColumnsByRootRequest { Ok(Self { data_column_ids }) } + pub fn from_single_block(block_root: Hash256, indices: Vec) -> Result { + let columns = VariableList::new(indices) + .map_err(|_| "Number of indices exceeds total number of columns")?; + DataColumnsByRootRequest::new( + vec![DataColumnsByRootIdentifier { + block_root, + columns, + }], + 1, + ) + } + pub fn max_requested(&self) -> usize { self.data_column_ids.iter().map(|id| id.columns.len()).sum() } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 0f5fd99c279..efcbcaf9561 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -9,6 +9,12 @@ use types::{ pub type Id = u32; +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum RangeRequestType { + ForwardSync, + BackfillSync, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct SingleLookupReqId { pub lookup_id: Id, @@ -38,6 +44,7 @@ pub enum SyncRequestId { pub struct DataColumnsByRootRequestId { pub id: Id, pub requester: DataColumnsByRootRequester, + pub peer: PeerId, } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -46,6 +53,20 @@ pub struct BlocksByRangeRequestId { pub id: Id, /// The Id of the overall By Range request for block components. pub parent_request_id: ComponentsByRangeRequestId, + /// The peer that we made this request to + pub peer_id: PeerId, +} + +impl BlocksByRangeRequestId { + pub fn batch_id(&self) -> Epoch { + match self.parent_request_id.requester { + RangeRequestId::BackfillSync { batch_id } => batch_id, + RangeRequestId::RangeSync { + chain_id: _, + batch_id, + } => batch_id, + } + } } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -86,12 +107,31 @@ pub enum RangeRequestId { RangeSync { chain_id: Id, batch_id: Epoch }, BackfillSync { batch_id: Epoch }, } +impl RangeRequestId { + pub fn batch_id(&self) -> Epoch { + match &self { + RangeRequestId::BackfillSync { batch_id } => *batch_id, + RangeRequestId::RangeSync { + chain_id: _, + batch_id, + } => *batch_id, + } + } + + pub fn batch_type(&self) -> RangeRequestType { + match &self { + RangeRequestId::BackfillSync { .. } => RangeRequestType::BackfillSync, + RangeRequestId::RangeSync { .. } => RangeRequestType::ForwardSync, + } + } +} // TODO(das) refactor in a separate PR. We might be able to remove this and replace // [`DataColumnsByRootRequestId`] with a [`SingleLookupReqId`]. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Custody(CustodyId), + RangeSync { parent: ComponentsByRangeRequestId }, } #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -222,6 +262,7 @@ impl Display for DataColumnsByRootRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Self::Custody(id) => write!(f, "Custody/{id}"), + Self::RangeSync { parent } => write!(f, "Range/{parent}"), } } } @@ -255,6 +296,7 @@ mod tests { lookup_id: 101, }), }), + peer: PeerId::random(), }; assert_eq!(format!("{id}"), "123/Custody/121/Lookup/101"); } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index a2b5af8b086..6878d1f0755 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -484,6 +484,12 @@ pub static SYNC_ACTIVE_NETWORK_REQUESTS: LazyLock> = LazyLoc &["type"], ) }); +pub static SYNC_PENDING_ROOT_RANGE_REQUESTS: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "sync_pending_root_range_requests", + "Current count of pending columns by root requests waiting for peers", + ) +}); pub static SYNC_UNKNOWN_NETWORK_REQUESTS: LazyLock> = LazyLock::new(|| { try_create_int_counter_vec( "sync_unknwon_network_request", diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 5fc94c29587..20ed7a884a2 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1330,7 +1330,9 @@ impl NetworkBeaconProcessor { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return None; } - Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { + Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) + if !epe.penalize_gossip_peer() => + { debug!(error = %e, "Could not verify block for gossip. Ignoring the block"); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return None; @@ -1562,7 +1564,7 @@ impl NetworkBeaconProcessor { "Block with unknown parent attempted to be processed" ); } - Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => { + Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_gossip_peer() => { debug!( error = %e, "Failed to verify execution payload" diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index f139724702f..c8bc1b0ef44 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -1,6 +1,7 @@ use crate::metrics::{self, register_process_result_metrics}; use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProcessor}; use crate::sync::BatchProcessResult; +use crate::sync::manager::FaultyComponent; use crate::sync::{ ChainId, manager::{BlockProcessType, SyncMessage}, @@ -46,6 +47,8 @@ struct ChainSegmentFailed { message: String, /// Used to penalize peers. peer_action: Option, + /// Used to identify the faulty component + faulty_component: Option, } impl NetworkBeaconProcessor { @@ -498,6 +501,7 @@ impl NetworkBeaconProcessor { Some(penalty) => BatchProcessResult::FaultyFailure { imported_blocks, penalty, + faulty_component: e.faulty_component, }, None => BatchProcessResult::NonFaultyFailure, } @@ -577,6 +581,7 @@ impl NetworkBeaconProcessor { Some(penalty) => BatchProcessResult::FaultyFailure { imported_blocks: 0, penalty, + faulty_component: e.faulty_component, }, None => BatchProcessResult::NonFaultyFailure, } @@ -649,15 +654,18 @@ impl NetworkBeaconProcessor { Err(ChainSegmentFailed { peer_action: None, message: "Failed to check block availability".into(), + faulty_component: None, }), ); } + e => { return ( 0, Err(ChainSegmentFailed { peer_action: Some(PeerAction::LowToleranceError), message: format!("Failed to check block availability : {:?}", e), + faulty_component: None, // Todo(pawan): replicate behaviour in forward sync once its proven }), ); } @@ -674,6 +682,7 @@ impl NetworkBeaconProcessor { (total_blocks - available_blocks.len()), total_blocks ), + faulty_component: Some(FaultyComponent::Blocks), }), ); } @@ -689,7 +698,7 @@ impl NetworkBeaconProcessor { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_FAILED_TOTAL, ); - let peer_action = match &e { + let (peer_action, faulty_component) = match &e { HistoricalBlockError::MismatchedBlockRoot { block_root, expected_block_root, @@ -701,7 +710,10 @@ impl NetworkBeaconProcessor { "Backfill batch processing error" ); // The peer is faulty if they send blocks with bad roots. - Some(PeerAction::LowToleranceError) + ( + Some(PeerAction::LowToleranceError), + Some(FaultyComponent::Blocks), + ) } HistoricalBlockError::InvalidSignature | HistoricalBlockError::SignatureSet(_) => { @@ -710,7 +722,10 @@ impl NetworkBeaconProcessor { "Backfill batch processing error" ); // The peer is faulty if they bad signatures. - Some(PeerAction::LowToleranceError) + ( + Some(PeerAction::LowToleranceError), + Some(FaultyComponent::Blocks), + ) } HistoricalBlockError::ValidatorPubkeyCacheTimeout => { warn!( @@ -718,7 +733,7 @@ impl NetworkBeaconProcessor { "Backfill batch processing error" ); // This is an internal error, do not penalize the peer. - None + (None, None) } HistoricalBlockError::IndexOutOfBounds => { error!( @@ -726,12 +741,12 @@ impl NetworkBeaconProcessor { "Backfill batch OOB error" ); // This should never occur, don't penalize the peer. - None + (None, None) } HistoricalBlockError::StoreError(e) => { warn!(error = ?e, "Backfill batch processing error"); // This is an internal error, don't penalize the peer. - None + (None, None) } // // Do not use a fallback match, handle all errors explicitly }; @@ -742,6 +757,7 @@ impl NetworkBeaconProcessor { message: format!("{:?}", err_str), // This is an internal error, don't penalize the peer. peer_action, + faulty_component, }), ) } @@ -757,6 +773,7 @@ impl NetworkBeaconProcessor { message: format!("Block has an unknown parent: {}", parent_root), // Peers are faulty if they send non-sequential blocks. peer_action: Some(PeerAction::LowToleranceError), + faulty_component: Some(FaultyComponent::Blocks), }) } BlockError::DuplicateFullyImported(_) @@ -795,6 +812,7 @@ impl NetworkBeaconProcessor { ), // Peers are faulty if they send blocks from the future. peer_action: Some(PeerAction::LowToleranceError), + faulty_component: Some(FaultyComponent::Blocks), }) } BlockError::WouldRevertFinalizedSlot { .. } => { @@ -811,6 +829,7 @@ impl NetworkBeaconProcessor { block_parent_root ), peer_action: Some(PeerAction::Fatal), + faulty_component: Some(FaultyComponent::Blocks), }) } BlockError::GenesisBlock => { @@ -828,10 +847,11 @@ impl NetworkBeaconProcessor { message: format!("Internal error whilst processing block: {:?}", e), // Do not penalize peers for internal errors. peer_action: None, + faulty_component: None, }) } ref err @ BlockError::ExecutionPayloadError(ref epe) => { - if !epe.penalize_peer() { + if !epe.penalize_sync_peer() { // These errors indicate an issue with the EL and not the `ChainSegment`. // Pause the syncing while the EL recovers debug!( @@ -843,6 +863,7 @@ impl NetworkBeaconProcessor { message: format!("Execution layer offline. Reason: {:?}", err), // Do not penalize peers for internal errors. peer_action: None, + faulty_component: None, }) } else { debug!( @@ -855,6 +876,7 @@ impl NetworkBeaconProcessor { err ), peer_action: Some(PeerAction::LowToleranceError), + faulty_component: Some(FaultyComponent::Blocks), }) } } @@ -870,6 +892,7 @@ impl NetworkBeaconProcessor { // of a faulty EL it will usually require manual intervention to fix anyway, so // it's not too bad if we drop most of our peers. peer_action: Some(PeerAction::LowToleranceError), + faulty_component: Some(FaultyComponent::Blocks), }) } // Penalise peers for sending us banned blocks. @@ -878,8 +901,49 @@ impl NetworkBeaconProcessor { Err(ChainSegmentFailed { message: format!("Banned block: {block_root:?}"), peer_action: Some(PeerAction::Fatal), + faulty_component: Some(FaultyComponent::Blocks), }) } + ref err @ BlockError::AvailabilityCheck(ref e) => { + match &e { + AvailabilityCheckError::InvalidBlobs(_) + | AvailabilityCheckError::BlobIndexInvalid(_) => Err(ChainSegmentFailed { + message: format!("Peer sent invalid blobs. Reason: {:?}", err), + // Do not penalize peers for internal errors. + peer_action: Some(PeerAction::LowToleranceError), + faulty_component: Some(FaultyComponent::Blobs), + }), + AvailabilityCheckError::InvalidColumn((column_opt, _)) => { + let (peer_action, faulty_component) = if let Some(column) = column_opt { + ( + Some(PeerAction::LowToleranceError), + Some(FaultyComponent::Columns(vec![*column])), + ) + } else { + (None, None) + }; + Err(ChainSegmentFailed { + message: format!("Peer sent invalid columns. Reason: {:?}", err), + peer_action, + faulty_component, + }) + } + AvailabilityCheckError::DataColumnIndexInvalid(column) => { + Err(ChainSegmentFailed { + message: format!("Peer sent invalid columns. Reason: {:?}", err), + // Do not penalize peers for internal errors. + peer_action: Some(PeerAction::LowToleranceError), + faulty_component: Some(FaultyComponent::Columns(vec![*column])), + }) + } + _ => Err(ChainSegmentFailed { + message: format!("Peer sent invalid block. Reason: {:?}", err), + // Do not penalize peers for internal errors. + peer_action: None, + faulty_component: None, + }), + } + } other => { debug!( msg = "peer sent invalid block", @@ -891,6 +955,7 @@ impl NetworkBeaconProcessor { message: format!("Peer sent invalid block. Reason: {:?}", other), // Do not penalize peers for internal errors. peer_action: None, + faulty_component: None, }) } } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index d5a4e9b73a8..f92c666832b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -10,16 +10,17 @@ use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_sidecar_coupling::CouplingError; -use crate::sync::manager::BatchProcessResult; +use crate::sync::manager::{BatchProcessResult, FaultyComponent}; use crate::sync::network_context::{ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, }; use crate::sync::range_sync::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchPeers, BatchProcessingResult, + BatchState, }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use lighthouse_network::service::api_types::Id; +use lighthouse_network::service::api_types::{Id, RangeRequestType}; use lighthouse_network::types::{BackFillState, NetworkGlobals}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; @@ -380,9 +381,9 @@ impl BackFillSync { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: &PeerId, request_id: Id, blocks: Vec>, + batch_peers: BatchPeers, ) -> Result { // check if we have this batch let Some(batch) = self.batches.get_mut(&batch_id) else { @@ -401,7 +402,7 @@ impl BackFillSync { return Ok(ProcessResult::Successful); } - match batch.download_completed(blocks, *peer_id) { + match batch.download_completed(blocks, batch_peers) { Ok(received) => { let awaiting_batches = self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH; @@ -557,20 +558,11 @@ impl BackFillSync { } }; - let Some(peer) = batch.processing_peer() else { - self.fail_sync(BackFillError::BatchInvalidState( - batch_id, - String::from("Peer does not exist"), - ))?; - return Ok(ProcessResult::Successful); - }; - debug!( ?result, %batch, batch_epoch = %batch_id, - %peer, - client = %network.client_type(peer), + // client = %network.client_type(peer), "Backfill batch processed" ); @@ -613,7 +605,32 @@ impl BackFillSync { BatchProcessResult::FaultyFailure { imported_blocks, penalty, + faulty_component, } => { + if let Some(batch_peers) = batch.processing_peers() { + // Penalize the peer appropriately. + match faulty_component { + Some(FaultyComponent::Blocks) | Some(FaultyComponent::Blobs) => { + network.report_peer( + batch_peers.block_and_blob, + *penalty, + "faulty_batch", + ); + } + Some(FaultyComponent::Columns(faulty_columns)) => { + for (peer, columns) in batch_peers.data_columns.iter() { + for faulty_column in faulty_columns { + if columns.contains(faulty_column) { + network.report_peer(*peer, *penalty, "faulty_batch"); + } + } + } + } + None => {} + } + } else { + warn!(?batch_id, "Responsible peers not found for a failed batch"); + } match batch.processing_completed(BatchProcessingResult::FaultyFailure) { Err(e) => { // Batch was in the wrong state @@ -690,7 +707,7 @@ impl BackFillSync { // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), - BatchState::Failed | BatchState::Processing(_) => { + BatchState::Failed | BatchState::Processing(_, _) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - Processing -> `self.current_processing_batch` is None @@ -796,7 +813,7 @@ impl BackFillSync { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} - BatchState::Processing(_) => { + BatchState::Processing(..) => { debug!(batch = %id, %batch, "Advancing chain while processing a batch"); if let Some(processing_id) = self.current_processing_batch && id >= processing_id @@ -1089,7 +1106,7 @@ impl BackFillSync { self.include_next_batch(network) } Entry::Vacant(entry) => { - let batch_type = network.batch_type(batch_id); + let batch_type = network.batch_type(batch_id, RangeRequestType::BackfillSync); entry.insert(BatchInfo::new( &batch_id, BACKFILL_EPOCHS_PER_BATCH, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8ffd298caf..dfc106383ed 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -617,7 +617,9 @@ impl BlockLookups { request_state.revert_to_awaiting_processing()?; Action::ParentUnknown { parent_root } } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { + ref e @ BlockError::ExecutionPayloadError(ref epe) + if !epe.penalize_sync_peer() => + { // These errors indicate that the execution layer is offline // and failed to validate the execution payload. Do not downscore peer. debug!( diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index ffc79c1550d..40bd9717a3c 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,3 +1,5 @@ +use crate::sync::network_context::MAX_COLUMN_RETRIES; +use crate::sync::range_sync::BatchPeers; use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; @@ -5,17 +7,19 @@ use lighthouse_network::{ PeerAction, PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + DataColumnsByRootRequestId, }, }; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use tracing::Span; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; -use crate::sync::network_context::MAX_COLUMN_RETRIES; - /// Accumulates and couples beacon blocks with their associated data (blobs or data columns) /// from range sync network responses. /// @@ -30,6 +34,10 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES; pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, + /// We store the peer that we requested the blocks from for this particular `RangeBlockComponentsRequest`. + /// This is to ensure that we penalize the block peer if the blocks turn out to be invalid + /// during processing. + block_peer: PeerId, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, /// Span to track the range request and all children range requests. @@ -44,16 +52,34 @@ enum ByRangeRequest { enum RangeBlockDataRequest { NoData, Blobs(ByRangeRequest>>>), - DataColumns { + /// These are data columns fetched by a range request. + DataColumnsFromRange { requests: HashMap< DataColumnsByRangeRequestId, ByRangeRequest>, >, /// The column indices corresponding to the request - column_peers: HashMap>, + request_to_column_indices: HashMap>, expected_custody_columns: Vec, attempt: usize, }, + /// These are data columns fetched by root instead of by range like the previous variant. + /// + /// Note: this variant starts out in an uninitialized state because we typically make + /// the column requests by root only **after** we have fetched the corresponding blocks. + /// We can initialize this variant only after the columns requests have been made. + DataColumnsFromRoot { + requests: HashMap< + DataColumnsByRootRequestId, + ByRangeRequest>, + >, + // Indicates if we have made column requests for each of the `expected_custody_columns` or not + all_requests_made: bool, + /// The column indices corresponding to the request + request_to_column_indices: HashMap>, + expected_custody_columns: HashSet, + attempt: usize, + }, } #[derive(Debug)] @@ -76,40 +102,77 @@ impl RangeBlockComponentsRequest { /// * `blocks_req_id` - Request ID for the blocks /// * `blobs_req_id` - Optional request ID for blobs (pre-Fulu fork) /// * `data_columns` - Optional tuple of (request_id->column_indices pairs, expected_custody_columns) for Fulu fork + /// * `data_columns_by_root` - Creates an uninitialized `RangeBlockDataRequest::DataColumnsFromRoot` variant if this is `Some`. + /// Note: this is only relevant is `data_columns == None`. #[allow(clippy::type_complexity)] pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, - data_columns: Option<( + data_columns_by_range: Option<( Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + data_columns_by_root: Option>, request_span: Span, ) -> Self { + let block_peer = blocks_req_id.peer_id; let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) - } else if let Some((requests, expected_custody_columns)) = data_columns { - let column_peers: HashMap<_, _> = requests.into_iter().collect(); - RangeBlockDataRequest::DataColumns { - requests: column_peers + } else if let Some((requests, expected_custody_columns)) = data_columns_by_range { + let request_to_column_indices: HashMap<_, _> = requests.into_iter().collect(); + RangeBlockDataRequest::DataColumnsFromRange { + requests: request_to_column_indices .keys() .map(|id| (*id, ByRangeRequest::Active(*id))) .collect(), - column_peers, + request_to_column_indices, expected_custody_columns, attempt: 0, } + } else if let Some(expected_custody_columns) = data_columns_by_root { + RangeBlockDataRequest::DataColumnsFromRoot { + requests: HashMap::new(), + all_requests_made: false, + attempt: 0, + request_to_column_indices: HashMap::new(), + expected_custody_columns, + } } else { RangeBlockDataRequest::NoData }; Self { blocks_request: ByRangeRequest::Active(blocks_req_id), + block_peer, block_data_request, request_span, } } + /// Returns the peers that we requested the blocks, blobs and columns for this component. + pub fn responsible_peers(&self) -> BatchPeers { + BatchPeers { + block_and_blob: self.block_peer, + data_columns: match &self.block_data_request { + RangeBlockDataRequest::NoData | RangeBlockDataRequest::Blobs(_) => HashMap::new(), + RangeBlockDataRequest::DataColumnsFromRange { + request_to_column_indices, + .. + } => request_to_column_indices + .iter() + .map(|(k, v)| (k.peer, v.clone())) + .collect(), + RangeBlockDataRequest::DataColumnsFromRoot { + request_to_column_indices, + .. + } => request_to_column_indices + .iter() + .map(|(k, v)| (k.peer, v.clone())) + .collect(), + }, + } + } + /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed /// request for some columns. pub fn reinsert_failed_column_requests( @@ -117,15 +180,15 @@ impl RangeBlockComponentsRequest { failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, ) -> Result<(), String> { match &mut self.block_data_request { - RangeBlockDataRequest::DataColumns { + RangeBlockDataRequest::DataColumnsFromRange { requests, expected_custody_columns: _, - column_peers, + request_to_column_indices, attempt: _, } => { for (request, columns) in failed_column_requests.into_iter() { requests.insert(request, ByRangeRequest::Active(request)); - column_peers.insert(request, columns); + request_to_column_indices.insert(request, columns); } Ok(()) } @@ -133,6 +196,59 @@ impl RangeBlockComponentsRequest { } } + /// Initialize the entries for this component after the column requests have been sent. + pub fn initialize_data_columns_from_root_component( + &mut self, + column_requests: Vec<(DataColumnsByRootRequestId, Vec)>, + ) -> Result<(), String> { + // Nothing to insert, do not initialize + if column_requests.is_empty() { + return Ok(()); + } + match &mut self.block_data_request { + RangeBlockDataRequest::DataColumnsFromRoot { + requests, + attempt: _, + all_requests_made, + request_to_column_indices, + expected_custody_columns, + } => { + for (request, indices) in column_requests { + requests.insert(request, ByRangeRequest::Active(request)); + request_to_column_indices.insert(request, indices); + } + + if !*all_requests_made { + let mut all_columns_requested = HashSet::new(); + for columns in request_to_column_indices.values() { + all_columns_requested.extend(columns.iter()); + } + *all_requests_made = all_columns_requested == *expected_custody_columns; + } + + Ok(()) + } + _ => Err("Invalid state: expected DataColumnsFromRoot".to_string()), + } + } + + /// This modifies the internal variant to `NoData`. + /// + /// Once we make the block request for a batch and get responses, it is possible + /// that the entire batch contained no blobs based on the values of `expected_kzg_commitments`. + /// + /// At this point, we do not need to make any requests and the blocks correspond to all the + /// available data for this batch. Hence, we indicate here that this component requires no data. + pub fn no_columns_for_batch(&mut self) -> Result<(), String> { + match self.block_data_request { + RangeBlockDataRequest::DataColumnsFromRoot { .. } => { + self.block_data_request = RangeBlockDataRequest::NoData; + Ok(()) + } + _ => Err("Invalid state: expected DataColumnsFromRoot".to_owned()), + } + } + /// Adds received blocks to the request. /// /// Returns an error if the request ID doesn't match the expected blocks request. @@ -155,8 +271,11 @@ impl RangeBlockComponentsRequest { ) -> Result<(), String> { match &mut self.block_data_request { RangeBlockDataRequest::NoData => Err("received blobs but expected no data".to_owned()), + RangeBlockDataRequest::DataColumnsFromRoot { .. } => { + Err("received blobs but expected data columns by root".to_owned()) + } RangeBlockDataRequest::Blobs(req) => req.finish(req_id, blobs), - RangeBlockDataRequest::DataColumns { .. } => { + RangeBlockDataRequest::DataColumnsFromRange { .. } => { Err("received blobs but expected data columns".to_owned()) } } @@ -178,7 +297,38 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::Blobs(_) => { Err("received data columns but expected blobs".to_owned()) } - RangeBlockDataRequest::DataColumns { requests, .. } => { + RangeBlockDataRequest::DataColumnsFromRoot { .. } => { + Err("received data columns by root but expected range".to_owned()) + } + RangeBlockDataRequest::DataColumnsFromRange { requests, .. } => { + let req = requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.finish(req_id, columns) + } + } + } + + /// Adds received custody columns to the request. + /// + /// Returns an error if this request expects blobs instead of data columns, + /// or if the request ID is unknown. + pub fn add_custody_columns_by_root( + &mut self, + req_id: DataColumnsByRootRequestId, + columns: Vec>>, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::NoData => { + Err("received data columns but expected no data".to_owned()) + } + RangeBlockDataRequest::Blobs(_) => { + Err("received data columns but expected blobs".to_owned()) + } + RangeBlockDataRequest::DataColumnsFromRange { .. } => { + Err("received data columns by range but expected root".to_owned()) + } + RangeBlockDataRequest::DataColumnsFromRoot { requests, .. } => { let req = requests .get_mut(&req_id) .ok_or(format!("unknown data columns by range req_id {req_id}"))?; @@ -215,10 +365,11 @@ impl RangeBlockComponentsRequest { spec, )) } - RangeBlockDataRequest::DataColumns { + + RangeBlockDataRequest::DataColumnsFromRange { requests, expected_custody_columns, - column_peers, + request_to_column_indices, attempt, } => { let mut data_columns = vec![]; @@ -236,7 +387,7 @@ impl RangeBlockComponentsRequest { // Note: this assumes that only 1 peer is responsible for a column // with a batch. - for (id, columns) in column_peers { + for (id, columns) in request_to_column_indices.iter() { for column in columns { column_to_peer_id.insert(*column, id.peer); } @@ -262,6 +413,72 @@ impl RangeBlockComponentsRequest { // delete it from the entries as we are going to make // a separate attempt for those components. requests.retain(|&k, _| k.peer != *peer); + request_to_column_indices.retain(|&k, _| k.peer != *peer); + } + } + + Some(resp) + } + // Reuse same logic that we use for coupling data columns for now. + // todo(pawan): we should never get a coupling error here, so simplify this + // variant's handling. + RangeBlockDataRequest::DataColumnsFromRoot { + all_requests_made, + attempt, + request_to_column_indices, + expected_custody_columns, + requests, + } => { + // Do not couple until requests covering all required columns + // have been made + if !*all_requests_made { + return None; + } + + let mut data_columns = vec![]; + let mut column_to_peer_id: HashMap = HashMap::new(); + for req in requests.values() { + let Some(data) = req.to_finished() else { + return None; + }; + data_columns.extend(data.clone()) + } + + // An "attempt" is complete here after we have received a response for all the + // requests we made. i.e. `req.to_finished()` returns Some for all requests. + *attempt += 1; + + // Note: this assumes that only 1 peer is responsible for a column + // with a batch. + for (id, columns) in request_to_column_indices.iter() { + for column in columns { + column_to_peer_id.insert(*column, id.peer); + } + } + + let expected_custody_columns: Vec<_> = + expected_custody_columns.iter().copied().collect(); + let resp = Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + column_to_peer_id, + &expected_custody_columns, + *attempt, + ); + + if let Err(CouplingError::DataColumnPeerFailure { + error: _, + faulty_peers, + action: _, + exceeded_retries: _, + }) = &resp + { + for (_, peer) in faulty_peers.iter() { + // find the req id associated with the peer and + // delete it from the entries as we are going to make + // a separate attempt for those components. + requests.retain(|&k, _| k.peer != *peer); + request_to_column_indices.retain(|&k, _| k.peer != *peer); } } @@ -377,7 +594,8 @@ impl RangeBlockComponentsRequest { return Err(CouplingError::DataColumnPeerFailure { error: format!("No columns for block {block_root:?} with data"), faulty_peers: responsible_peers, - action: PeerAction::LowToleranceError, + // The block peer might be malcicious so don't downscore the column peer too bad + action: PeerAction::HighToleranceError, exceeded_retries, }); @@ -402,7 +620,8 @@ impl RangeBlockComponentsRequest { return Err(CouplingError::DataColumnPeerFailure { error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), faulty_peers: naughty_peers, - action: PeerAction::LowToleranceError, + // The block peer might be malcicious so don't downscore the column peer too bad + action: PeerAction::HighToleranceError, exceeded_retries }); } @@ -463,7 +682,6 @@ impl ByRangeRequest { #[cfg(test)] mod tests { use super::RangeBlockComponentsRequest; - use crate::sync::network_context::MAX_COLUMN_RETRIES; use beacon_chain::test_utils::{ NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, }; @@ -493,6 +711,7 @@ mod tests { BlocksByRangeRequestId { id: 1, parent_request_id, + peer_id: PeerId::random(), } } @@ -533,7 +752,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -567,6 +786,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -608,6 +828,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -668,6 +889,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); @@ -748,6 +970,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + None, Span::none(), ); @@ -828,6 +1051,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_custody_columns.clone())), + None, Span::none(), ); @@ -881,92 +1105,4 @@ mod tests { let rpc_blocks = result.unwrap(); assert_eq!(rpc_blocks.len(), 2); } - - #[test] - fn max_retries_exceeded_behavior() { - // GIVEN: A request where peers consistently fail to provide required columns - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2]; - let mut rng = XorShiftRng::from_seed([42; 16]); - let blocks = (0..1) - .map(|_| { - generate_rand_block_and_data_columns::( - ForkName::Fulu, - NumBlobs::Number(1), - &mut rng, - &spec, - ) - }) - .collect::>(); - - let components_id = components_id(); - let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_custody_columns - .iter() - .enumerate() - .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) - .collect::>(); - let mut info = RangeBlockComponentsRequest::::new( - blocks_req_id, - None, - Some((columns_req_id.clone(), expected_custody_columns.clone())), - Span::none(), - ); - - // AND: All blocks are received - info.add_blocks( - blocks_req_id, - blocks.iter().map(|b| b.0.clone().into()).collect(), - ) - .unwrap(); - - // AND: Only partial custody columns are provided (column 1 but not 2) - let (req1, _) = columns_req_id.first().unwrap(); - info.add_custody_columns( - *req1, - blocks - .iter() - .flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned()) - .collect(), - ) - .unwrap(); - - // AND: Column 2 request completes with empty data (persistent peer failure) - let (req2, _) = columns_req_id.get(1).unwrap(); - info.add_custody_columns(*req2, vec![]).unwrap(); - - // WHEN: Multiple retry attempts are made (up to max retries) - for _ in 0..MAX_COLUMN_RETRIES { - let result = info.responses(&spec).unwrap(); - assert!(result.is_err()); - - if let Err(super::CouplingError::DataColumnPeerFailure { - exceeded_retries, .. - }) = &result - && *exceeded_retries - { - break; - } - } - - // AND: One final attempt after exceeding max retries - let result = info.responses(&spec).unwrap(); - - // THEN: Should fail with exceeded_retries = true - assert!(result.is_err()); - if let Err(super::CouplingError::DataColumnPeerFailure { - error: _, - faulty_peers, - action, - exceeded_retries, - }) = result - { - assert_eq!(faulty_peers.len(), 1); // column 2 missing - assert_eq!(faulty_peers[0].0, 2); // column index 2 - assert!(matches!(action, PeerAction::LowToleranceError)); - assert!(exceeded_retries); // Should be true after max retries - } else { - panic!("Expected PeerFailure error with exceeded_retries=true"); - } - } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index d7ba0280542..2bf4f831e82 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -70,7 +70,8 @@ use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -205,10 +206,19 @@ pub enum BatchProcessResult { FaultyFailure { imported_blocks: usize, penalty: PeerAction, + faulty_component: Option, }, NonFaultyFailure, } +/// Identifies the specific component that was faulty if the batch was a faulty failure. +#[derive(Debug)] +pub enum FaultyComponent { + Blocks, + Blobs, + Columns(Vec), +} + /// The primary object for handling and driving all the current syncing logic. It maintains the /// current state of the syncing process, the number of useful peers, downloaded blocks and /// controls the logic behind both the long-range (batch) sync and the on-going potential parent @@ -403,6 +413,9 @@ impl SyncManager { for (id, result) in self.network.continue_custody_by_root_requests() { self.on_custody_by_root_result(id, result); } + + // Try to make range requests that we failed to make because of lack of peers. + let _ = self.network.retry_pending_root_range_requests(); } /// Trigger range sync for a set of peers that claim to have imported a head unknown to us. @@ -1129,6 +1142,13 @@ impl SyncManager { self.on_custody_by_root_result(custody_id.requester, result); } } + DataColumnsByRootRequester::RangeSync { parent } => { + self.on_range_components_response( + parent, + peer_id, + RangeBlockComponent::CustodyColumnsFromRoot(req_id, resp), + ); + } } } } @@ -1202,7 +1222,7 @@ impl SyncManager { peer_id: PeerId, range_block_component: RangeBlockComponent, ) { - if let Some(resp) = self + if let Some((resp, responsible_peers)) = self .network .range_block_component_response(range_request_id, range_block_component) { @@ -1212,7 +1232,7 @@ impl SyncManager { RangeRequestId::RangeSync { chain_id, batch_id } => { self.range_sync.blocks_by_range_response( &mut self.network, - peer_id, + responsible_peers, chain_id, batch_id, range_request_id.id, @@ -1224,9 +1244,9 @@ impl SyncManager { match self.backfill_sync.on_block_response( &mut self.network, batch_id, - &peer_id, range_request_id.id, blocks, + responsible_peers, ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -1243,7 +1263,7 @@ impl SyncManager { RangeRequestId::RangeSync { chain_id, batch_id } => { self.range_sync.inject_error( &mut self.network, - peer_id, + responsible_peers, batch_id, chain_id, range_request_id.id, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ac2991c1474..7870a0b4904 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,7 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; +pub use self::requests::BlocksByRootSingleRequest; use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -16,6 +16,7 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; +use crate::sync::range_sync::BatchPeers; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; @@ -26,15 +27,17 @@ pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, RangeRequestType, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST}; use parking_lot::RwLock; +use rand::seq::IteratorRandom; pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, - BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootBatchBlockRequest, + DataColumnsByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -49,8 +52,8 @@ use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, + DataColumnSubnetId, Epoch, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, }; pub mod custody; @@ -99,6 +102,7 @@ pub enum RpcResponseError { VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), BlockComponentCouplingError(CouplingError), + InternalError(#[allow(dead_code)] String), } #[derive(Debug, PartialEq, Eq)] @@ -211,7 +215,6 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, - /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -219,6 +222,14 @@ pub struct SyncNetworkContext { components_by_range_requests: FnvHashMap>, + // A hashmap with the key being the parent request and the value being the data column by root + // requests that we have to retry because of one of the following reasons: + // 1. The root requests couldn't be made after the parent blocks request because there were no + // column peers available + // 2. The root request errored (either peer sent an RPC error or an empty response) + pending_column_by_root_range_requests: + HashMap, + /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. execution_engine_state: EngineState, @@ -245,6 +256,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + CustodyColumnsFromRoot( + DataColumnsByRootRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -295,6 +310,7 @@ impl SyncNetworkContext { data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), + pending_column_by_root_range_requests: Default::default(), network_beacon_processor, chain, fork_context, @@ -324,6 +340,7 @@ impl SyncNetworkContext { custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests components_by_range_requests: _, + pending_column_by_root_range_requests: _, execution_engine_state: _, network_beacon_processor: _, chain: _, @@ -423,6 +440,7 @@ impl SyncNetworkContext { components_by_range_requests: _, execution_engine_state: _, network_beacon_processor: _, + pending_column_by_root_range_requests: _, chain: _, fork_context: _, // Don't use a fallback match. We want to be sure that all requests are considered when @@ -482,7 +500,7 @@ impl SyncNetworkContext { // Attempt to find all required custody peers to request the failed columns from let columns_by_range_peers_to_request = self - .select_columns_by_range_peers_to_request( + .select_column_peers_to_request( failed_columns, peers, active_request_count_by_peer, @@ -527,6 +545,96 @@ impl SyncNetworkContext { Ok(()) } + /// Try to make all the requests that we failed to make earlier because of lack of peers + /// in the required columns. + /// + /// This function must be manually invoked at regular intervals or when a new peer + /// gets added. + pub fn retry_pending_root_range_requests(&mut self) -> Result<(), String> { + let active_requests = self.active_request_count_by_peer(); + + // Collect entries to process and remove from requests_to_retry + let entries_to_process: Vec<_> = + self.pending_column_by_root_range_requests.drain().collect(); + let mut entries_to_keep = Vec::new(); + + for (parent_request, requests) in entries_to_process { + let mut data_column_requests = Vec::new(); + let requester = DataColumnsByRootRequester::RangeSync { + parent: parent_request, + }; + let custody_indices = requests.indices.iter().cloned().collect(); + let synced_peers = self + .network_globals() + .peers + .read() + .synced_peers_for_epoch(parent_request.requester.batch_id(), None) + .cloned() + .collect(); + + match self.select_column_peers_to_request( + &custody_indices, + &synced_peers, + active_requests.clone(), + &HashSet::new(), + ) { + Ok(peer_to_columns) => { + for (peer, indices) in peer_to_columns.into_iter() { + let data_columns_by_root_request = DataColumnsByRootBatchBlockRequest { + block_roots: requests.block_roots.clone(), + indices: indices.clone(), + }; + + data_column_requests.push(( + self.send_data_columns_by_root_request( + requester, + peer, + data_columns_by_root_request, + true, + ) + .map_err(|e| { + format!("Failed to send data columns by root request {:?}", e) + })?, + indices, + )); + } + // we have sent out requests to peers, register these requests with the coupling service. + if let Some(req) = self.components_by_range_requests.get_mut(&parent_request) { + req.initialize_data_columns_from_root_component( + data_column_requests, + ) + .map_err(|e| { + format!( + "Inconsistent state when inserting columns by root request {:?}", + e + ) + })?; + } + debug!(?requests, "Successfully retried requests"); + } + Err(err) => { + debug!( + ?err, + ?parent_request, + "Failed to retry request, no peers in subnets", + ); + // Still no peers, keep this entry for next retry + entries_to_keep.push((parent_request, requests)); + } + } + } + + metrics::set_gauge( + &metrics::SYNC_PENDING_ROOT_RANGE_REQUESTS, + self.pending_column_by_root_range_requests.len() as i64, + ); + // Re-insert entries that still need to be retried + self.pending_column_by_root_range_requests + .extend(entries_to_keep); + + Ok(()) + } + /// A blocks by range request sent by the range sync algorithm pub fn block_components_by_range_request( &mut self, @@ -577,7 +685,7 @@ impl SyncNetworkContext { .iter() .cloned() .collect(); - Some(self.select_columns_by_range_peers_to_request( + Some(self.select_column_peers_to_request( &column_indexes, peers, active_request_count_by_peer, @@ -650,6 +758,17 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + + let data_column_by_root_range_request = + // with this variant, we request columns by root after we receive + // a successful blocks by range response. + if matches!(batch_type, ByRangeRequestType::BlocksAndColumnsSeparate) { + Some(HashSet::from_iter( + self.chain.sampling_columns_for_epoch(epoch).iter().copied(), + )) + } else { + None + }; let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, @@ -659,6 +778,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + data_column_by_root_range_request, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -666,7 +786,7 @@ impl SyncNetworkContext { Ok(id.id) } - fn select_columns_by_range_peers_to_request( + fn select_column_peers_to_request( &self, custody_indexes: &HashSet, peers: &HashSet, @@ -723,11 +843,15 @@ impl SyncNetworkContext { /// Received a blocks by range or blobs by range response for a request that couples blocks ' /// and blobs. + #[allow(clippy::type_complexity)] pub fn range_block_component_response( &mut self, id: ComponentsByRangeRequestId, range_block_component: RangeBlockComponent, - ) -> Option>, RpcResponseError>> { + ) -> Option<( + Result>, RpcResponseError>, + BatchPeers, + )> { let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]); return None; @@ -761,14 +885,27 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::CustodyColumnsFromRoot(req_id, resp) => { + resp.and_then(|(custody_columns, _)| { + request + .add_custody_columns_by_root(req_id, custody_columns) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { + let responsible_peers = entry.get().responsible_peers(); entry.remove(); - return Some(Err(e)); + return Some((Err(e), responsible_peers)); } let range_req = entry.get_mut(); if let Some(blocks_result) = range_req.responses(&self.chain.spec) { + let responsible_peers = range_req.responsible_peers(); if let Err(CouplingError::DataColumnPeerFailure { action: _, error, @@ -791,7 +928,10 @@ impl SyncNetworkContext { entry.remove(); } // If the request is finished, dequeue everything - Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError)) + Some(( + blocks_result.map_err(RpcResponseError::BlockComponentCouplingError), + responsible_peers, + )) } else { None } @@ -1019,32 +1159,34 @@ impl SyncNetworkContext { } /// Request to send a single `data_columns_by_root` request to the network. - pub fn data_column_lookup_request( + pub fn send_data_columns_by_root_request( &mut self, requester: DataColumnsByRootRequester, peer_id: PeerId, - request: DataColumnsByRootSingleBlockRequest, + request: DataColumnsByRootBatchBlockRequest, expect_max_responses: bool, - ) -> Result, &'static str> { + ) -> Result { let id = DataColumnsByRootRequestId { id: self.next_id(), requester, + peer: peer_id, }; self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: RequestType::DataColumnsByRoot( - request.clone().try_into_request::( - self.fork_context.current_fork_name(), - &self.chain.spec, - )?, + request + .clone() + .try_into_request(self.fork_context.current_fork_name(), &self.chain.spec) + .map_err(|_| "invalid count of data column indices")?, ), app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)), - })?; + }) + .map_err(|_| "network send error")?; debug!( method = "DataColumnsByRoot", - block_root = ?request.block_root, + block_roots = ?request.block_roots, indices = ?request.indices, peer = %peer_id, %id, @@ -1056,12 +1198,10 @@ impl SyncNetworkContext { peer_id, expect_max_responses, DataColumnsByRootRequestItems::new(request), - // Span is tracked in `self.custody_columns_by_root_requests` in the - // `ActiveCustodyRequest` struct. Span::none(), ); - Ok(LookupRequestResult::RequestSent(id)) + Ok(id) } /// Request to fetch all needed custody columns of a specific block. This function may not send @@ -1159,6 +1299,7 @@ impl SyncNetworkContext { let id = BlocksByRangeRequestId { id: self.next_id(), parent_request_id, + peer_id, }; self.network_send .send(NetworkMessage::SendRequest { @@ -1347,7 +1488,7 @@ impl SyncNetworkContext { /// Check whether a batch for this epoch (and only this epoch) should request just blocks or /// blocks and blobs. - pub fn batch_type(&self, epoch: types::Epoch) -> ByRangeRequestType { + pub fn batch_type(&self, epoch: Epoch, request_type: RangeRequestType) -> ByRangeRequestType { // Induces a compile time panic if this doesn't hold true. #[allow(clippy::assertions_on_constants)] const _: () = assert!( @@ -1361,7 +1502,14 @@ impl SyncNetworkContext { .data_availability_checker .data_columns_required_for_epoch(epoch) { - ByRangeRequestType::BlocksAndColumns + match request_type { + // Currently, we download blocks and columns separately when we forward sync as + // requesting columns by root is less ambiguous when there are multiple heads. + // For backfill, since there is just one chain, it makes more sense to download + // blocks and columns together. + RangeRequestType::BackfillSync => ByRangeRequestType::BlocksAndColumns, + RangeRequestType::ForwardSync => ByRangeRequestType::BlocksAndColumnsSeparate, + } } else if self .chain .data_availability_checker @@ -1465,6 +1613,160 @@ impl SyncNetworkContext { self.on_rpc_response_result(id, "DataColumnsByRoot", resp, peer_id, |_| 1) } + /// Requests data columns for the given blocks by root. + /// + /// We request by root because it is much easier to reason about + /// and handle for failure cases when we ask for the same roots that + /// we are trying to sync the blocks for. + /// + /// This is specially relevant in periods of non-finality when there are multiple + /// head chains to sync. + /// + /// This function piggybacks on the existing parent block request and inserts the + /// column requests made into `self.components_by_range_requests` such that when + /// the column requests complete, we return the coupled batch to range sync to progress. + /// + /// If there are no peers to serve the column requests, we add them to a queue for retrying + /// the requests once more peers become available. + /// + /// Note: we do not use the by root syncing mechanism for backfill since there is only + /// one canonical chain to sync. + fn request_columns_on_successful_blocks( + &mut self, + id: BlocksByRangeRequestId, + blocks: &[Arc>], + ) -> Result<(), RpcResponseError> { + let batch_epoch = id.batch_id(); + // Return early if no columns are required for this epoch + if !matches!( + self.batch_type(batch_epoch, id.parent_request_id.requester.batch_type()), + ByRangeRequestType::BlocksAndColumnsSeparate + ) { + return Ok(()); + } + debug!(count = blocks.len(), "Received blocks from byrange query"); + + let mut block_roots = Vec::new(); + + // We have blocks here, check if they need data columns and request them + for block in blocks.iter() { + // Request columns only if the blob_kzg_commitments is non-empty + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + block_roots.push(block.canonical_root()); + } + } + + // No blobs for the entire epoch, let the coupling logic know not to expect anything + // and return early + if block_roots.is_empty() { + if let Some(req) = self + .components_by_range_requests + .get_mut(&id.parent_request_id) + { + if let Err(e) = req.no_columns_for_batch() { + debug!(?e, "Created range request in inconsistent state"); + return Err(RpcResponseError::InternalError(e)); + } + return Ok(()); + } else { + return Err(RpcResponseError::InternalError( + "Block request sent without creating a components_by_range entry".to_string(), + )); + } + } + // Generate the data column by root requests + let mut peer_to_columns: HashMap> = HashMap::new(); + let mut no_peers_for_column: Vec = Vec::new(); + for column in self.chain.sampling_columns_for_epoch(batch_epoch).iter() { + let subnet_id = DataColumnSubnetId::new(*column); + if let Some(custody_peer) = self + .network_globals() + .peers + .read() + .good_custody_subnet_peer_range_sync(subnet_id, batch_epoch) + .choose(&mut rand::rng()) + { + peer_to_columns + .entry(*custody_peer) + .or_default() + .push(*column); + } else { + no_peers_for_column.push(*column); + } + } + + // Send the requests for all columns that we have peers for + let mut data_column_requests = Vec::new(); + for (peer, indices) in peer_to_columns.into_iter() { + let data_columns_by_root_request = DataColumnsByRootBatchBlockRequest { + block_roots: block_roots.clone(), + indices: indices.clone(), + }; + + let requester = DataColumnsByRootRequester::RangeSync { + parent: id.parent_request_id, + }; + + data_column_requests.push(( + self.send_data_columns_by_root_request( + requester, + peer, + data_columns_by_root_request, + true, + ) + .map_err(|e| { + RpcResponseError::InternalError(format!( + "Failed to send data columns by root request {:?}", + e + )) + })?, + indices, + )); + } + + // There are columns for which we have no peers, queue them up for retry later + if !no_peers_for_column.is_empty() { + debug!( + block_request_id=?id, + ?no_peers_for_column, + "Not enough column peers for batch, will retry request" + ); + let data_columns_by_root_request = DataColumnsByRootBatchBlockRequest { + block_roots: block_roots.clone(), + indices: no_peers_for_column, + }; + + self.pending_column_by_root_range_requests + .insert(id.parent_request_id, data_columns_by_root_request); + + metrics::set_gauge( + &metrics::SYNC_PENDING_ROOT_RANGE_REQUESTS, + self.pending_column_by_root_range_requests.len() as i64, + ); + } + + // Insert the requests into the existing block parent request + if let Some(req) = self + .components_by_range_requests + .get_mut(&id.parent_request_id) + { + req.initialize_data_columns_from_root_component(data_column_requests) + .map_err(|e| { + RpcResponseError::InternalError(format!( + "Inconsistent state when inserting columns by root request {:?}", + e + )) + })?; + } else { + return Err(RpcResponseError::InternalError( + "Request sent without creating an entry".to_string(), + )); + } + Ok(()) + } + #[allow(clippy::type_complexity)] pub(crate) fn on_blocks_by_range_response( &mut self, @@ -1473,6 +1775,29 @@ impl SyncNetworkContext { rpc_event: RpcEvent>>, ) -> Option>>>> { let resp = self.blocks_by_range_requests.on_response(id, rpc_event); + match &resp { + Some(Ok((blocks, _))) => { + // On receving a successful response for a blocks by range request, + // request the corresponding data columns for this batch by root (if required). + // + // We request the columns by root instead of by range to avoid peers responding + // with the columns corresponding to their view of the canonical chain + // instead of the chain that we are trying to sync. Requesting by root allows + // us to be more specific and reduces the number of failure cases we have to handle. + // + // This is specially relevant when we are syncing at times when there are a lot of + // head chains in a non-finality scenario. + if let Err(e) = self.request_columns_on_successful_blocks(id, blocks) { + debug!( + ?e, + "Error requesting columns on succesful blocks by range request" + ); + return Some(Err(e)); + } + } + None => {} + Some(Err(_)) => {} + } self.on_rpc_response_result(id, "BlocksByRange", resp, peer_id, |b| b.len()) } @@ -1527,8 +1852,43 @@ impl SyncNetworkContext { ); } } + if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); + warn!(?e, "Verification error on rpc response"); + match e { + LookupVerifyError::NotEnoughResponsesReturned { .. } => { + // This is a special case because in the case of a columns by root requests, there are 3 cases + // 1. the columns peer is honest and doesn't have the columns that we requested from it + // because its on a different chain. + // 2. the columns peer is honest but the block peer maliciously fed us bogus blocks for which + // there are no corresponding columns. + // 3. The column peer is buggy but non-malicious + // + // There is no way to differentiate between these 3 cases until we can verify the block + // before requesting the columns. + // Hence, we currently do not downscore them with a `LowToleranceError`. + // + // However, since majority of these errors are of type 3 currently, we downscore these errors with a + // HighTolerance error to avoid getting stuck in sync with buggy peers. + if method.contains("DataColumns") { + self.report_peer(peer_id, PeerAction::HighToleranceError, e.into()) + } else { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()) + } + } + LookupVerifyError::UnrequestedSlot(_) + | LookupVerifyError::DuplicatedData(_, _) + | LookupVerifyError::TooManyResponses + | LookupVerifyError::UnrequestedBlockRoot(_) + | LookupVerifyError::UnrequestedIndex(_) => { + // Recoverable errors, don't downscore heavily + self.report_peer(peer_id, PeerAction::HighToleranceError, e.into()) + } + LookupVerifyError::InternalError(_) => {} // do not downscore peer for internal errors + LookupVerifyError::InvalidInclusionProof => { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()) + } + } } resp } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 71e002cc422..147948a20ee 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -1,11 +1,11 @@ use crate::sync::network_context::{ - DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest, + DataColumnsByRootBatchBlockRequest, DataColumnsByRootRequestId, }; use beacon_chain::BeaconChainTypes; use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; -use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; +use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester, Id}; use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST; use parking_lot::RwLock; use std::collections::HashSet; @@ -16,7 +16,7 @@ use tracing::{Span, debug, debug_span, warn}; use types::{DataColumnSidecar, Hash256, data_column_sidecar::ColumnIndex}; use types::{DataColumnSidecarList, EthSpec}; -use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; +use super::{PeerGroup, RpcResponseResult, SyncNetworkContext}; const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); @@ -46,8 +46,8 @@ pub enum Error { /// There should only exist a single request at a time. Having multiple requests is a bug and /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. UnexpectedRequestId { - expected_req_id: DataColumnsByRootRequestId, - req_id: DataColumnsByRootRequestId, + expected_req_id: Id, + req_id: Id, }, } @@ -279,12 +279,12 @@ impl ActiveCustodyRequest { } for (peer_id, indices) in columns_to_request_by_peer.into_iter() { - let request_result = cx - .data_column_lookup_request( + let req_id = cx + .send_data_columns_by_root_request( DataColumnsByRootRequester::Custody(self.custody_id), peer_id, - DataColumnsByRootSingleBlockRequest { - block_root: self.block_root, + DataColumnsByRootBatchBlockRequest { + block_roots: vec![self.block_root], indices: indices.clone(), }, // If peer is in the lookup peer set, it claims to have imported the block and @@ -295,38 +295,32 @@ impl ActiveCustodyRequest { ) .map_err(Error::SendFailed)?; - match request_result { - LookupRequestResult::RequestSent(req_id) => { - *self.peer_attempts.entry(peer_id).or_insert(0) += 1; - - let client = cx.network_globals().client(&peer_id).kind; - let batch_columns_req_span = debug_span!( - "batch_columns_req", - %peer_id, - %client, - ); - let _guard = batch_columns_req_span.clone().entered(); - for column_index in &indices { - let column_request = self - .column_requests - .get_mut(column_index) - // Should never happen: column_index is iterated from column_requests - .ok_or(Error::BadState("unknown column_index".to_owned()))?; - - column_request.on_download_start(req_id)?; - } + *self.peer_attempts.entry(peer_id).or_insert(0) += 1; - self.active_batch_columns_requests.insert( - req_id, - ActiveBatchColumnsRequest { - indices, - span: batch_columns_req_span, - }, - ); - } - LookupRequestResult::NoRequestNeeded(_) => unreachable!(), - LookupRequestResult::Pending(_) => unreachable!(), + let client = cx.network_globals().client(&peer_id).kind; + let batch_columns_req_span = debug_span!( + "batch_columns_req", + %peer_id, + %client, + ); + let _guard = batch_columns_req_span.clone().entered(); + for column_index in &indices { + let column_request = self + .column_requests + .get_mut(column_index) + // Should never happen: column_index is iterated from column_requests + .ok_or(Error::BadState("unknown column_index".to_owned()))?; + + column_request.on_download_start(req_id)?; } + + self.active_batch_columns_requests.insert( + req_id, + ActiveBatchColumnsRequest { + indices, + span: batch_columns_req_span, + }, + ); } Ok(None) @@ -430,8 +424,8 @@ impl ColumnRequest { Status::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(Error::UnexpectedRequestId { - expected_req_id: *expected_req_id, - req_id, + expected_req_id: expected_req_id.id, + req_id: req_id.id, }); } self.status = Status::NotStarted(Instant::now()); @@ -463,8 +457,8 @@ impl ColumnRequest { Status::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(Error::UnexpectedRequestId { - expected_req_id: *expected_req_id, - req_id, + expected_req_id: expected_req_id.id, + req_id: req_id.id, }); } self.status = Status::Downloaded(peer_id, data_column, seen_timestamp); diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 3183c06d762..2134860ef44 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -12,9 +12,7 @@ pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest}; pub use blocks_by_range::BlocksByRangeRequestItems; pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest}; pub use data_columns_by_range::DataColumnsByRangeRequestItems; -pub use data_columns_by_root::{ - DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, -}; +pub use data_columns_by_root::{DataColumnsByRootBatchBlockRequest, DataColumnsByRootRequestItems}; use crate::metrics; diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index 34df801eaa8..c8bea7cc186 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -1,6 +1,6 @@ use lighthouse_network::rpc::methods::DataColumnsByRootRequest; use ssz_types::VariableList; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use types::{ ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkName, Hash256, }; @@ -8,12 +8,12 @@ use types::{ use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] -pub struct DataColumnsByRootSingleBlockRequest { - pub block_root: Hash256, +pub struct DataColumnsByRootBatchBlockRequest { + pub block_roots: Vec, pub indices: Vec, } -impl DataColumnsByRootSingleBlockRequest { +impl DataColumnsByRootBatchBlockRequest { pub fn try_into_request( self, fork_name: ForkName, @@ -21,26 +21,29 @@ impl DataColumnsByRootSingleBlockRequest { ) -> Result, &'static str> { let columns = VariableList::new(self.indices) .map_err(|_| "Number of indices exceeds total number of columns")?; - DataColumnsByRootRequest::new( - vec![DataColumnsByRootIdentifier { - block_root: self.block_root, - columns, - }], - spec.max_request_blocks(fork_name), - ) + let ids: Vec<_> = self + .block_roots + .into_iter() + .map(|block_root| DataColumnsByRootIdentifier { + block_root, + columns: columns.clone(), + }) + .collect(); + assert!(ids.len() <= 32); + DataColumnsByRootRequest::new(ids, spec.max_request_blocks(fork_name)) } } pub struct DataColumnsByRootRequestItems { - request: DataColumnsByRootSingleBlockRequest, - items: Vec>>, + request: DataColumnsByRootBatchBlockRequest, + items: HashMap>>>, } impl DataColumnsByRootRequestItems { - pub fn new(request: DataColumnsByRootSingleBlockRequest) -> Self { + pub fn new(request: DataColumnsByRootBatchBlockRequest) -> Self { Self { request, - items: vec![], + items: HashMap::new(), } } } @@ -53,7 +56,7 @@ impl ActiveRequestItems for DataColumnsByRootRequestItems { /// The active request SHOULD be dropped after `add_response` returns an error fn add(&mut self, data_column: Self::Item) -> Result { let block_root = data_column.block_root(); - if self.request.block_root != block_root { + if !self.request.block_roots.contains(&block_root) { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } if !data_column.verify_inclusion_proof() { @@ -62,19 +65,37 @@ impl ActiveRequestItems for DataColumnsByRootRequestItems { if !self.request.indices.contains(&data_column.index) { return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); } - if self.items.iter().any(|d| d.index == data_column.index) { + if self + .items + .values() + .flatten() + .any(|d| d.index == data_column.index && d.block_root() == block_root) + { + tracing::trace!( + ?data_column, + existing_items=?self.items, + "Duplicated data", + ); return Err(LookupVerifyError::DuplicatedData( data_column.slot(), data_column.index, )); } - self.items.push(data_column); + self.items.entry(block_root).or_default().push(data_column); - Ok(self.items.len() >= self.request.indices.len()) + Ok(self + .items + .values() + .map(|columns| columns.len()) + .sum::() + >= self.request.indices.len() * self.request.block_roots.len()) } fn consume(&mut self) -> Vec { std::mem::take(&mut self.items) + .into_values() + .flatten() + .collect() } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 31e65941390..d868467f22c 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -3,13 +3,13 @@ use derivative::Derivative; use lighthouse_network::PeerId; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::service::api_types::Id; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::time::{Duration, Instant}; use strum::Display; -use types::{Epoch, EthSpec, Slot}; +use types::{ColumnIndex, Epoch, EthSpec, Slot}; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -22,7 +22,15 @@ const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; #[derive(Debug, Copy, Clone, Display)] #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { + /// This variant requests the blocks and columns + /// simaltaneously and then tries to couple the + /// responses. BlocksAndColumns, + /// This variant requests the blocks first using + /// a byrange request and then requests the data columns + /// for the received blocks using the `DataColumnsByRoot` + /// root request. + BlocksAndColumnsSeparate, BlocksAndBlobs, Blocks, } @@ -130,6 +138,17 @@ impl fmt::Display for BatchInfo { } } +/// The peers that we got responses for this batch from. +/// +/// This is used for penalizing in case of invalid batches. +#[derive(Debug, Clone)] +pub struct BatchPeers { + /// Note: we send the blob request to the same peer as the block request + /// Hence, block and blob peers would be the same. + pub block_and_blob: PeerId, + pub data_columns: HashMap>, +} + #[derive(Display)] /// Current state of a batch pub enum BatchState { @@ -138,9 +157,9 @@ pub enum BatchState { /// The batch is being downloaded. Downloading(Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>, Instant), + AwaitingProcessing(BatchPeers, Vec>, Instant), /// The batch is being processed. - Processing(Attempt), + Processing(Attempt, BatchPeers), /// The batch was successfully processed and is waiting to be validated. /// /// It is not sufficient to process a batch successfully to consider it correct. This is @@ -216,13 +235,15 @@ impl BatchInfo { false } - /// Returns the peer that is currently responsible for progressing the state of the batch. - pub fn processing_peer(&self) -> Option<&PeerId> { + /// Returns the peers that are currently responsible for progressing the state of the batch. + pub fn processing_peers(&self) -> Option<&BatchPeers> { match &self.state { - BatchState::AwaitingDownload | BatchState::Failed | BatchState::Downloading(..) => None, - BatchState::AwaitingProcessing(peer_id, _, _) - | BatchState::Processing(Attempt { peer_id, .. }) - | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), + BatchState::AwaitingDownload + | BatchState::Failed + | BatchState::Downloading(..) + | BatchState::AwaitingValidation(..) => None, + BatchState::AwaitingProcessing(responsible_peers, _, _) + | BatchState::Processing(Attempt { .. }, responsible_peers) => Some(responsible_peers), BatchState::Poisoned => unreachable!("Poisoned batch"), } } @@ -279,12 +300,13 @@ impl BatchInfo { pub fn download_completed( &mut self, blocks: Vec>, - peer: PeerId, + responsible_peers: BatchPeers, ) -> Result { match self.state.poison() { BatchState::Downloading(_) => { let received = blocks.len(); - self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); + self.state = + BatchState::AwaitingProcessing(responsible_peers, blocks, Instant::now()); Ok(received) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -353,8 +375,11 @@ impl BatchInfo { pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { match self.state.poison() { - BatchState::AwaitingProcessing(peer, blocks, start_instant) => { - self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); + BatchState::AwaitingProcessing(responsible_peers, blocks, start_instant) => { + self.state = BatchState::Processing( + Attempt::new::(responsible_peers.block_and_blob, &blocks), + responsible_peers, + ); Ok((blocks, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -373,7 +398,7 @@ impl BatchInfo { processing_result: BatchProcessingResult, ) -> Result { match self.state.poison() { - BatchState::Processing(attempt) => { + BatchState::Processing(attempt, _responsible_peers) => { self.state = match processing_result { BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), BatchProcessingResult::FaultyFailure => { @@ -447,6 +472,7 @@ impl BatchInfo { #[derive(PartialEq, Debug)] pub struct Attempt { /// The peer that made the attempt. + /// This peer is effectively the peer that we requested the blocks from. pub peer_id: PeerId, /// The hash of the blocks of the attempt. pub hash: u64, @@ -462,16 +488,21 @@ impl Attempt { impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(Attempt { peer_id, hash: _ }) => { - write!(f, "Processing({})", peer_id) + BatchState::Processing(Attempt { peer_id, hash: _ }, batch_peers) => { + write!(f, "Processing({}) {}", peer_id, batch_peers.block_and_blob) } BatchState::AwaitingValidation(Attempt { peer_id, hash: _ }) => { write!(f, "AwaitingValidation({})", peer_id) } BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), - BatchState::AwaitingProcessing(peer, blocks, _) => { - write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) + BatchState::AwaitingProcessing(batch_peers, blocks, _) => { + write!( + f, + "AwaitingProcessing({}, {:?} blocks)", + batch_peers.block_and_blob, + blocks.len() + ) } BatchState::Downloading(request_id) => { write!(f, "Downloading({})", request_id) @@ -487,7 +518,7 @@ impl BatchState { fn visualize(&self) -> char { match self { BatchState::Downloading(..) => 'D', - BatchState::Processing(_) => 'P', + BatchState::Processing(_, _) => 'P', BatchState::AwaitingValidation(_) => 'v', BatchState::AwaitingDownload => 'd', BatchState::Failed => 'F', diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 3b816c09224..08413120559 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -3,11 +3,13 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::block_sidecar_coupling::CouplingError; +use crate::sync::manager::FaultyComponent; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; +use crate::sync::range_sync::batch::BatchPeers; use crate::sync::{BatchOperationOutcome, BatchProcessResult, network_context::SyncNetworkContext}; use beacon_chain::BeaconChainTypes; use beacon_chain::block_verification_types::RpcBlock; -use lighthouse_network::service::api_types::Id; +use lighthouse_network::service::api_types::{Id, RangeRequestType}; use lighthouse_network::{PeerAction, PeerId}; use lighthouse_tracing::SPAN_SYNCING_CHAIN; use logging::crit; @@ -229,8 +231,8 @@ impl SyncingChain { pub fn on_block_response( &mut self, network: &mut SyncNetworkContext, + batch_peers: BatchPeers, batch_id: BatchId, - peer_id: &PeerId, request_id: Id, blocks: Vec>, ) -> ProcessingResult { @@ -259,8 +261,7 @@ impl SyncingChain { // A stream termination has been sent. This batch has ended. Process a completed batch. // Remove the request from the peer's active batches - // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 - let received = batch.download_completed(blocks, *peer_id)?; + let received = batch.download_completed(blocks, batch_peers.clone())?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) / EPOCHS_PER_BATCH; @@ -269,7 +270,7 @@ impl SyncingChain { blocks = received, batch_state = self.visualize_batch_state(), %awaiting_batches, - %peer_id, + ?batch_peers, "Batch downloaded" ); @@ -359,7 +360,7 @@ impl SyncingChain { // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(KeepChain), - BatchState::Processing(_) | BatchState::Failed => { + BatchState::Processing(_, _) | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should @@ -396,7 +397,7 @@ impl SyncingChain { // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(KeepChain), - BatchState::Failed | BatchState::Processing(_) => { + BatchState::Failed | BatchState::Processing(_, _) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - AwaitingDownload -> A recoverable failed batch should have been @@ -475,18 +476,10 @@ impl SyncingChain { } }; - let peer = batch.processing_peer().cloned().ok_or_else(|| { - RemoveChain::WrongBatchState(format!( - "Processing target is in wrong state: {:?}", - batch.state(), - )) - })?; - // Log the process result and the batch for debugging purposes. debug!( result = ?result, batch_epoch = %batch_id, - client = %network.client_type(&peer), batch_state = ?batch_state, ?batch, "Batch processing result" @@ -550,9 +543,35 @@ impl SyncingChain { BatchProcessResult::FaultyFailure { imported_blocks, penalty, + faulty_component, } => { - // Penalize the peer appropriately. - network.report_peer(peer, *penalty, "faulty_batch"); + if let Some(batch_peers) = batch.processing_peers() { + // Penalize the peer appropriately. + match faulty_component { + Some(FaultyComponent::Blocks) | Some(FaultyComponent::Blobs) => { + network.report_peer( + batch_peers.block_and_blob, + *penalty, + "faulty_batch", + ); + } + Some(FaultyComponent::Columns(faulty_columns)) => { + for (peer, columns) in batch_peers.data_columns.iter() { + for faulty_column in faulty_columns { + if columns.contains(faulty_column) { + network.report_peer(*peer, *penalty, "faulty_batch"); + } + } + } + } + None => {} + } + } else { + warn!( + current_state = ?batch.state(), + "Inconsistent state, batch must have been in processing state" + ); + }; // Check if this batch is allowed to continue match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { @@ -659,7 +678,7 @@ impl SyncingChain { for attempt in batch.attempts() { // The validated batch has been re-processed if attempt.hash != processed_attempt.hash { - // The re-downloaded version was different + // The re-downloaded version had a different block peer if processed_attempt.peer_id != attempt.peer_id { // A different peer sent the correct batch, the previous peer did not // We negatively score the original peer. @@ -699,7 +718,7 @@ impl SyncingChain { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} - BatchState::Processing(_) => { + BatchState::Processing(_, _) => { debug!(batch = %id, %batch, "Advancing chain while processing a batch"); if let Some(processing_id) = self.current_processing_batch && id <= processing_id @@ -858,8 +877,8 @@ impl SyncingChain { pub fn inject_error( &mut self, network: &mut SyncNetworkContext, + batch_peers: BatchPeers, batch_id: BatchId, - peer_id: &PeerId, request_id: Id, err: RpcResponseError, ) -> ProcessingResult { @@ -913,7 +932,7 @@ impl SyncingChain { debug!( batch_epoch = %batch_id, batch_state = ?batch.state(), - %peer_id, + ?batch_peers, %request_id, ?batch_state, "Batch not expecting block" @@ -924,23 +943,23 @@ impl SyncingChain { batch_epoch = %batch_id, batch_state = ?batch.state(), error = ?err, - %peer_id, + ?batch_peers, %request_id, "Batch download error" ); if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(Some(*peer_id))? + batch.download_failed(Some(batch_peers.block_and_blob))? { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, }); } - self.send_batch(network, batch_id) + self.attempt_send_awaiting_download_batches(network, "injecting error") } else { debug!( batch_epoch = %batch_id, - %peer_id, + ?batch_peers, %request_id, batch_state, "Batch not found" @@ -969,7 +988,7 @@ impl SyncingChain { .collect(); debug!( ?awaiting_downloads, - src, "Attempting to send batches awaiting downlaod" + src, "Attempting to send batches awaiting download" ); for batch_id in awaiting_downloads { @@ -992,7 +1011,6 @@ impl SyncingChain { batch_id: BatchId, ) -> ProcessingResult { let _guard = self.span.clone().entered(); - debug!(batch_epoch = %batch_id, "Requesting batch"); let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); @@ -1002,7 +1020,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, Some(&self.peers)) + .synced_peers_for_epoch(batch_id, None) .cloned() .collect::>(); @@ -1057,6 +1075,8 @@ impl SyncingChain { } }, } + } else { + debug!(?self.to_be_downloaded, ?self.processing_target, "Did not get batch"); } Ok(KeepChain) @@ -1081,7 +1101,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, Some(&self.peers)) + .synced_peers_for_epoch(batch_id, None) .cloned() .collect::>(); @@ -1123,6 +1143,9 @@ impl SyncingChain { ) -> Result { let _guard = self.span.clone().entered(); debug!("Resuming chain"); + // attempt to download any batches stuck in the `AwaitingDownload` state because of + // a lack of peers earlier + self.attempt_send_awaiting_download_batches(network, "resume")?; // Request more batches if needed. self.request_batches(network)?; // If there is any batch ready for processing, send it. @@ -1149,7 +1172,7 @@ impl SyncingChain { } if let Entry::Vacant(entry) = self.batches.entry(epoch) { - let batch_type = network.batch_type(epoch); + let batch_type = network.batch_type(epoch, RangeRequestType::ForwardSync); let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); entry.insert(optimistic_batch); self.send_batch(network, epoch)?; @@ -1246,7 +1269,7 @@ impl SyncingChain { self.include_next_batch(network) } Entry::Vacant(entry) => { - let batch_type = network.batch_type(next_batch_id); + let batch_type = network.batch_type(next_batch_id, RangeRequestType::ForwardSync); entry.insert(BatchInfo::new(&next_batch_id, EPOCHS_PER_BATCH, batch_type)); self.to_be_downloaded += EPOCHS_PER_BATCH; Some(next_batch_id) diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index 8f881fba90f..1218e0cd09c 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,7 +8,7 @@ mod range; mod sync_type; pub use batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchInfo, BatchOperationOutcome, BatchPeers, BatchProcessingResult, BatchState, ByRangeRequestType, }; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 465edd3697f..8f18f4e8f06 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -46,6 +46,7 @@ use crate::metrics; use crate::status::ToStatusMessage; use crate::sync::BatchProcessResult; use crate::sync::network_context::{RpcResponseError, SyncNetworkContext}; +use crate::sync::range_sync::BatchPeers; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes}; use lighthouse_network::rpc::GoodbyeReason; @@ -203,7 +204,7 @@ where pub fn blocks_by_range_response( &mut self, network: &mut SyncNetworkContext, - peer_id: PeerId, + batch_peers: BatchPeers, chain_id: ChainId, batch_id: BatchId, request_id: Id, @@ -211,7 +212,7 @@ where ) { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { - chain.on_block_response(network, batch_id, &peer_id, request_id, blocks) + chain.on_block_response(network, batch_peers, batch_id, request_id, blocks) }) { Ok((removed_chain, sync_type)) => { if let Some((removed_chain, remove_reason)) = removed_chain { @@ -295,7 +296,7 @@ where pub fn inject_error( &mut self, network: &mut SyncNetworkContext, - peer_id: PeerId, + batch_peers: BatchPeers, batch_id: BatchId, chain_id: ChainId, request_id: Id, @@ -303,7 +304,7 @@ where ) { // check that this request is pending match self.chains.call_by_id(chain_id, |chain| { - chain.inject_error(network, batch_id, &peer_id, request_id, err) + chain.inject_error(network, batch_peers, batch_id, request_id, err) }) { Ok((removed_chain, sync_type)) => { if let Some((removed_chain, remove_reason)) = removed_chain { diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cb728a90c1b..516b66c45eb 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -16,7 +16,7 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, - SyncRequestId, + DataColumnsByRootRequestId, SyncRequestId, }; use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; @@ -36,6 +36,7 @@ enum ByRangeDataRequestIds { PreDeneb, PrePeerDAS(BlobsByRangeRequestId, PeerId), PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>), + PostPeerDASByRoot(Vec<(DataColumnsByRootRequestId, PeerId)>), } /// Sync tests are usually written in the form: @@ -233,7 +234,8 @@ impl TestRig { }); let by_range_data_requests = if self.after_fulu() { - let mut data_columns_requests = vec![]; + // First check for DataColumnsByRange requests (old paradigm) + let mut data_columns_range_requests = vec![]; while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, @@ -245,12 +247,34 @@ impl TestRig { } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) { - data_columns_requests.push(data_columns_request); + data_columns_range_requests.push(data_columns_request); } - if data_columns_requests.is_empty() { - panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}"); + + // If we found range requests, use the `ByRangeRequestType::BlocksAndColumns` paradigm + if !data_columns_range_requests.is_empty() { + ByRangeDataRequestIds::PostPeerDAS(data_columns_range_requests) + } else { + // Try to find the byroot requests associated with the `ByRangeRequestType::BlocksAndColumnsSeparate` + let mut data_columns_root_requests = vec![]; + while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRoot(_), + app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)), + } => Some((*id, *peer_id)), + _ => None, + }) { + data_columns_root_requests.push(data_columns_request); + } + + if !data_columns_root_requests.is_empty() { + ByRangeDataRequestIds::PostPeerDASByRoot(data_columns_root_requests) + } else { + // No data column requests found - this is expected for the new paradigm + // since DataColumnsByRoot requests are sent after blocks are received + ByRangeDataRequestIds::PostPeerDASByRoot(vec![]) + } } - ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) } else if self.after_deneb() { let (id, peer) = self .pop_received_network_event(|ev| match ev { @@ -318,11 +342,54 @@ impl TestRig { }); } } + ByRangeDataRequestIds::PostPeerDASByRoot(data_column_req_ids) => { + // Complete the DataColumnsByRoot requests with stream termination + for (id, peer_id) in data_column_req_ids { + self.log(&format!( + "Completing DataColumnsByRoot request {id:?} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcDataColumn { + sync_request_id: SyncRequestId::DataColumnsByRoot(id), + peer_id, + data_column: None, + seen_timestamp: D, + }); + } + } } blocks_req_id.parent_request_id.requester } + fn find_and_complete_data_columns_by_root_requests(&mut self) { + // In the new paradigm, DataColumnsByRoot requests are sent after blocks are received + // We need to complete any pending DataColumnsByRoot requests + let mut data_columns_root_requests = vec![]; + while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRoot(_), + app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)), + } => Some((*id, *peer_id)), + _ => None, + }) { + data_columns_root_requests.push(data_columns_request); + } + + // Complete the DataColumnsByRoot requests + for (id, peer_id) in data_columns_root_requests { + self.log(&format!( + "Completing DataColumnsByRoot request {id:?} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcDataColumn { + sync_request_id: SyncRequestId::DataColumnsByRoot(id), + peer_id, + data_column: None, + seen_timestamp: D, + }); + } + } + fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) { self.pop_received_processor_event(|ev| { (ev.work_type() == WorkType::ChainSegment).then_some(()) @@ -366,6 +433,11 @@ impl TestRig { }; self.find_and_complete_processing_chain_segment(id); + + // In the new paradigm, DataColumnsByRoot requests are sent after blocks are processed + // We need to complete any pending DataColumnsByRoot requests + self.find_and_complete_data_columns_by_root_requests(); + if epoch < last_epoch - 1 { self.assert_state(RangeSyncType::Finalized); } else {