diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index ceb563ffc25..3c28ac9a44d 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -13,7 +13,7 @@ use crate::observed_data_sidecars::DoNotObserve; use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError}; use execution_layer::json_structures::BlobAndProofV1; use execution_layer::Error as ExecutionLayerError; -use metrics::{inc_counter, inc_counter_by, TryExt}; +use metrics::{inc_counter, TryExt}; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::sync::Arc; @@ -73,13 +73,20 @@ pub async fn fetch_and_process_engine_blobs( .as_ref() .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); debug!(num_expected_blobs, "Fetching blobs from the EL"); let response = execution_layer .get_blobs(versioned_hashes) .await + .inspect_err(|_| { + inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL); + }) .map_err(FetchEngineBlobError::RequestFailed)?; - if response.is_empty() || response.iter().all(|opt| opt.is_none()) { + let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count(); + metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64); + + if num_fetched_blobs == 0 { debug!(num_expected_blobs, "No blobs fetched from the EL"); inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); return Ok(None); @@ -99,20 +106,6 @@ pub async fn fetch_and_process_engine_blobs( &chain.spec, )?; - let num_fetched_blobs = fixed_blob_sidecar_list - .iter() - .filter(|b| b.is_some()) - .count(); - - inc_counter_by( - &metrics::BLOBS_FROM_EL_EXPECTED_TOTAL, - num_expected_blobs as u64, - ); - inc_counter_by( - &metrics::BLOBS_FROM_EL_RECEIVED_TOTAL, - num_fetched_blobs as u64, - ); - // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from // the EL making it into the data availability checker. We do not immediately add these // blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index d1c7a2a5dff..463319a1f5d 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1662,28 +1662,37 @@ pub static DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: LazyLock> = LazyLock::new(|| { try_create_int_counter( "beacon_blobs_from_el_hit_total", - "Number of blob batches fetched from the execution layer", + "Number of non-empty blob batches fetched from the execution layer", ) }); pub static BLOBS_FROM_EL_MISS_TOTAL: LazyLock> = LazyLock::new(|| { try_create_int_counter( "beacon_blobs_from_el_miss_total", - "Number of blob batches failed to fetch from the execution layer", + "Number of empty blob responses from the execution layer", ) }); -pub static BLOBS_FROM_EL_EXPECTED_TOTAL: LazyLock> = LazyLock::new(|| { +pub static BLOBS_FROM_EL_ERROR_TOTAL: LazyLock> = LazyLock::new(|| { try_create_int_counter( - "beacon_blobs_from_el_expected_total", + "beacon_blobs_from_el_error_total", + "Number of failed blob fetches from the execution layer", + ) +}); + +pub static BLOBS_FROM_EL_EXPECTED: LazyLock> = LazyLock::new(|| { + try_create_histogram_with_buckets( + "beacon_blobs_from_el_expected", "Number of blobs expected from the execution layer", + Ok(vec![0.0, 3.0, 6.0, 9.0, 12.0, 18.0, 24.0, 30.0]), ) }); -pub static BLOBS_FROM_EL_RECEIVED_TOTAL: LazyLock> = LazyLock::new(|| { - try_create_int_counter( +pub static BLOBS_FROM_EL_RECEIVED: LazyLock> = LazyLock::new(|| { + try_create_histogram_with_buckets( "beacon_blobs_from_el_received_total", "Number of blobs fetched from the execution layer", + linear_buckets(0.0, 4.0, 20), ) }); diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index aa5f54ac1fb..625886a4cf6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -9,11 +9,14 @@ use crate::{ sync::{manager::BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; use beacon_chain::test_utils::{ - test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, + get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, + EphemeralHarnessType, }; use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; +use itertools::Itertools; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::{ @@ -29,9 +32,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec, - ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, - SubnetId, + Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, + DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, + SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -52,6 +55,7 @@ struct TestRig { chain: Arc>, next_block: Arc>, next_blobs: Option>, + next_data_columns: Option>, attestations: Vec<(Attestation, SubnetId)>, next_block_attestations: Vec<(Attestation, SubnetId)>, next_block_aggregate_attestations: Vec>, @@ -241,7 +245,7 @@ impl TestRig { let network_beacon_processor = Arc::new(network_beacon_processor); let beacon_processor = BeaconProcessor { - network_globals, + network_globals: network_globals.clone(), executor, current_workers: 0, config: beacon_processor_config, @@ -262,15 +266,35 @@ impl TestRig { assert!(beacon_processor.is_ok()); let block = next_block_tuple.0; - let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 { - Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap()) + let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 { + if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + let kzg = get_kzg(&chain.spec); + let custody_columns: DataColumnSidecarList = blobs_to_data_column_sidecars( + &blobs.iter().collect_vec(), + &block, + &kzg, + &chain.spec, + ) + .unwrap() + .into_iter() + .filter(|c| network_globals.sampling_columns.contains(&c.index)) + .collect::>(); + + (None, Some(custody_columns)) + } else { + let blob_sidecars = + BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap(); + (Some(blob_sidecars), None) + } } else { - None + (None, None) }; + Self { chain, next_block: block, next_blobs: blob_sidecars, + next_data_columns: data_columns, attestations, next_block_attestations, next_block_aggregate_attestations, @@ -323,6 +347,22 @@ impl TestRig { } } + pub fn enqueue_gossip_data_columns(&self, col_index: usize) { + if let Some(data_columns) = self.next_data_columns.as_ref() { + let data_column = data_columns.get(col_index).unwrap(); + self.network_beacon_processor + .send_gossip_data_column_sidecar( + junk_message_id(), + junk_peer_id(), + Client::default(), + DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec), + data_column.clone(), + Duration::from_secs(0), + ) + .unwrap(); + } + } + pub fn enqueue_rpc_block(&self) { let block_root = self.next_block.canonical_root(); self.network_beacon_processor @@ -361,6 +401,19 @@ impl TestRig { } } + pub fn enqueue_single_lookup_rpc_data_columns(&self) { + if let Some(data_columns) = self.next_data_columns.clone() { + self.network_beacon_processor + .send_rpc_custody_columns( + self.next_block.canonical_root(), + data_columns, + Duration::default(), + BlockProcessType::SingleCustodyColumn(1), + ) + .unwrap(); + } + } + pub fn enqueue_blobs_by_range_request(&self, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( @@ -618,6 +671,13 @@ async fn import_gossip_block_acceptably_early() { .await; } + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock // and check the head in the time between the block arrived early and when its due for // processing. @@ -694,19 +754,20 @@ async fn import_gossip_block_at_current_slot() { rig.assert_event_journal_completes(&[WorkType::GossipBlock]) .await; - let num_blobs = rig - .next_blobs - .as_ref() - .map(|blobs| blobs.len()) - .unwrap_or(0); - + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); for i in 0..num_blobs { rig.enqueue_gossip_blob(i); - rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar]) .await; } + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + assert_eq!( rig.head_root(), rig.next_block.canonical_root(), @@ -759,11 +820,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod ); // Send the block and ensure that the attestation is received back and imported. - let num_blobs = rig - .next_blobs - .as_ref() - .map(|blobs| blobs.len()) - .unwrap_or(0); + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); let mut events = vec![]; match import_method { BlockImportMethod::Gossip => { @@ -773,6 +831,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod rig.enqueue_gossip_blob(i); events.push(WorkType::GossipBlobSidecar); } + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + events.push(WorkType::GossipDataColumnSidecar); + } } BlockImportMethod::Rpc => { rig.enqueue_rpc_block(); @@ -781,6 +843,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod rig.enqueue_single_lookup_rpc_blobs(); events.push(WorkType::RpcBlobs); } + if num_data_columns > 0 { + rig.enqueue_single_lookup_rpc_data_columns(); + events.push(WorkType::RpcCustodyColumn); + } } }; @@ -840,11 +906,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod ); // Send the block and ensure that the attestation is received back and imported. - let num_blobs = rig - .next_blobs - .as_ref() - .map(|blobs| blobs.len()) - .unwrap_or(0); + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); let mut events = vec![]; match import_method { BlockImportMethod::Gossip => { @@ -854,6 +917,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod rig.enqueue_gossip_blob(i); events.push(WorkType::GossipBlobSidecar); } + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + events.push(WorkType::GossipDataColumnSidecar) + } } BlockImportMethod::Rpc => { rig.enqueue_rpc_block(); @@ -862,6 +929,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod rig.enqueue_single_lookup_rpc_blobs(); events.push(WorkType::RpcBlobs); } + if num_data_columns > 0 { + rig.enqueue_single_lookup_rpc_data_columns(); + events.push(WorkType::RpcCustodyColumn); + } } }; @@ -1046,12 +1117,20 @@ async fn test_rpc_block_reprocessing() { rig.assert_event_journal_completes(&[WorkType::RpcBlock]) .await; - rig.enqueue_single_lookup_rpc_blobs(); - if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 { + let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0); + if num_blobs > 0 { + rig.enqueue_single_lookup_rpc_blobs(); rig.assert_event_journal_completes(&[WorkType::RpcBlobs]) .await; } + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + if num_data_columns > 0 { + rig.enqueue_single_lookup_rpc_data_columns(); + rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn]) + .await; + } + // next_block shouldn't be processed since it couldn't get the // duplicate cache handle assert_ne!(next_block_root, rig.head_root()); diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 018381a8507..e7e6e623497 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -102,7 +102,6 @@ impl ActiveCustodyRequest { ) -> CustodyRequestResult { let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else { warn!( - id = ?self.custody_id, block_root = ?self.block_root, %req_id, "Received custody column response for unrequested index" @@ -113,7 +112,6 @@ impl ActiveCustodyRequest { match resp { Ok((data_columns, seen_timestamp)) => { debug!( - id = ?self.custody_id, block_root = ?self.block_root, %req_id, %peer_id, @@ -161,7 +159,6 @@ impl ActiveCustodyRequest { if !missing_column_indexes.is_empty() { // Note: Batch logging that columns are missing to not spam logger debug!( - id = ?self.custody_id, block_root = ?self.block_root, %req_id, %peer_id, @@ -175,7 +172,6 @@ impl ActiveCustodyRequest { } Err(err) => { debug!( - id = ?self.custody_id, block_root = ?self.block_root, %req_id, %peer_id, diff --git a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml index 1455ec5f637..7256af8400d 100644 --- a/common/eth2_network_config/built_in_network_configs/chiado/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/chiado/config.yaml @@ -150,9 +150,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 2 # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 256 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index 9ff5a161980..42d3d968da1 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -121,9 +121,10 @@ BLOB_SIDECAR_SUBNET_COUNT: 6 # `uint64(6)` MAX_BLOBS_PER_BLOCK: 6 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml index e5f38b8c9b5..d8e2d658ff5 100644 --- a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml @@ -139,9 +139,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9 # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 74fe7278674..6b2e82cbd0f 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -142,9 +142,10 @@ BLOB_SIDECAR_SUBNET_COUNT: 6 # `uint64(6)` MAX_BLOBS_PER_BLOCK: 6 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml index af783322055..9323192c82b 100644 --- a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml @@ -140,9 +140,10 @@ MAX_BLOBS_PER_BLOCK_ELECTRA: 9 # MAX_REQUEST_BLOCKS_DENEB * MAX_BLOBS_PER_BLOCK_ELECTRA MAX_REQUEST_BLOB_SIDECARS_ELECTRA: 1152 -# DAS +# Fulu NUMBER_OF_COLUMNS: 128 NUMBER_OF_CUSTODY_GROUPS: 128 DATA_COLUMN_SIDECAR_SUBNET_COUNT: 128 SAMPLES_PER_SLOT: 8 CUSTODY_REQUIREMENT: 4 +MAX_BLOBS_PER_BLOCK_FULU: 12 diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 1650001db61..333b9544dfd 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -241,6 +241,11 @@ pub struct ChainSpec { blob_sidecar_subnet_count_electra: u64, max_request_blob_sidecars_electra: u64, + /* + * Networking Fulu + */ + max_blobs_per_block_fulu: u64, + /* * Networking Derived * @@ -656,7 +661,9 @@ impl ChainSpec { /// Return the value of `MAX_BLOBS_PER_BLOCK` appropriate for `fork`. pub fn max_blobs_per_block_by_fork(&self, fork_name: ForkName) -> u64 { - if fork_name.electra_enabled() { + if fork_name.fulu_enabled() { + self.max_blobs_per_block_fulu + } else if fork_name.electra_enabled() { self.max_blobs_per_block_electra } else { self.max_blobs_per_block @@ -965,6 +972,11 @@ impl ChainSpec { blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(), max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(), + /* + * Networking Fulu specific + */ + max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(), + /* * Application specific */ @@ -1295,6 +1307,11 @@ impl ChainSpec { blob_sidecar_subnet_count_electra: default_blob_sidecar_subnet_count_electra(), max_request_blob_sidecars_electra: default_max_request_blob_sidecars_electra(), + /* + * Networking Fulu specific + */ + max_blobs_per_block_fulu: default_max_blobs_per_block_fulu(), + /* * Application specific */ @@ -1517,6 +1534,9 @@ pub struct Config { #[serde(default = "default_custody_requirement")] #[serde(with = "serde_utils::quoted_u64")] custody_requirement: u64, + #[serde(default = "default_max_blobs_per_block_fulu")] + #[serde(with = "serde_utils::quoted_u64")] + max_blobs_per_block_fulu: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1658,6 +1678,10 @@ const fn default_max_blobs_per_block_electra() -> u64 { 9 } +const fn default_max_blobs_per_block_fulu() -> u64 { + 12 +} + const fn default_attestation_propagation_slot_range() -> u64 { 32 } @@ -1886,6 +1910,7 @@ impl Config { data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count, samples_per_slot: spec.samples_per_slot, custody_requirement: spec.custody_requirement, + max_blobs_per_block_fulu: spec.max_blobs_per_block_fulu, } } @@ -1965,6 +1990,7 @@ impl Config { data_column_sidecar_subnet_count, samples_per_slot, custody_requirement, + max_blobs_per_block_fulu, } = self; if preset_base != E::spec_name().to_string().as_str() { @@ -2048,6 +2074,7 @@ impl Config { data_column_sidecar_subnet_count, samples_per_slot, custody_requirement, + max_blobs_per_block_fulu, ..chain_spec.clone() })