Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions beacon_node/beacon_chain/src/fetch_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::observed_data_sidecars::DoNotObserve;
use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
use execution_layer::json_structures::BlobAndProofV1;
use execution_layer::Error as ExecutionLayerError;
use metrics::{inc_counter, inc_counter_by, TryExt};
use metrics::{inc_counter, TryExt};
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::sync::Arc;
Expand Down Expand Up @@ -73,13 +73,20 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
.as_ref()
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;

metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
debug!(num_expected_blobs, "Fetching blobs from the EL");
let response = execution_layer
.get_blobs(versioned_hashes)
.await
.inspect_err(|_| {
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
})
.map_err(FetchEngineBlobError::RequestFailed)?;

if response.is_empty() || response.iter().all(|opt| opt.is_none()) {
let num_fetched_blobs = response.iter().filter(|opt| opt.is_some()).count();
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);

if num_fetched_blobs == 0 {
debug!(num_expected_blobs, "No blobs fetched from the EL");
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
return Ok(None);
Expand All @@ -99,20 +106,6 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
&chain.spec,
)?;

let num_fetched_blobs = fixed_blob_sidecar_list
.iter()
.filter(|b| b.is_some())
.count();

inc_counter_by(
&metrics::BLOBS_FROM_EL_EXPECTED_TOTAL,
num_expected_blobs as u64,
);
inc_counter_by(
&metrics::BLOBS_FROM_EL_RECEIVED_TOTAL,
num_fetched_blobs as u64,
);

// Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from
// the EL making it into the data availability checker. We do not immediately add these
// blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip
Expand Down
21 changes: 15 additions & 6 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,28 +1662,37 @@ pub static DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: LazyLock<Result<Histog
pub static BLOBS_FROM_EL_HIT_TOTAL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"beacon_blobs_from_el_hit_total",
"Number of blob batches fetched from the execution layer",
"Number of non-empty blob batches fetched from the execution layer",
)
});

pub static BLOBS_FROM_EL_MISS_TOTAL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"beacon_blobs_from_el_miss_total",
"Number of blob batches failed to fetch from the execution layer",
"Number of empty blob responses from the execution layer",
)
});

pub static BLOBS_FROM_EL_EXPECTED_TOTAL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
pub static BLOBS_FROM_EL_ERROR_TOTAL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"beacon_blobs_from_el_expected_total",
"beacon_blobs_from_el_error_total",
"Number of failed blob fetches from the execution layer",
)
});

pub static BLOBS_FROM_EL_EXPECTED: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram_with_buckets(
"beacon_blobs_from_el_expected",
"Number of blobs expected from the execution layer",
Ok(vec![0.0, 3.0, 6.0, 9.0, 12.0, 18.0, 24.0, 30.0]),
)
});

pub static BLOBS_FROM_EL_RECEIVED_TOTAL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
pub static BLOBS_FROM_EL_RECEIVED: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram_with_buckets(
"beacon_blobs_from_el_received_total",
"Number of blobs fetched from the execution layer",
linear_buckets(0.0, 4.0, 20),
)
});

Expand Down
133 changes: 106 additions & 27 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use crate::{
sync::{manager::BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::kzg_utils::blobs_to_data_column_sidecars;
use beacon_chain::test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
get_kzg, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy,
EphemeralHarnessType,
};
use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::{
Expand All @@ -29,9 +32,9 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec,
ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot,
SubnetId,
Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId,
};

type E = MainnetEthSpec;
Expand All @@ -52,6 +55,7 @@ struct TestRig {
chain: Arc<BeaconChain<T>>,
next_block: Arc<SignedBeaconBlock<E>>,
next_blobs: Option<BlobSidecarList<E>>,
next_data_columns: Option<DataColumnSidecarList<E>>,
attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_attestations: Vec<(Attestation<E>, SubnetId)>,
next_block_aggregate_attestations: Vec<SignedAggregateAndProof<E>>,
Expand Down Expand Up @@ -241,7 +245,7 @@ impl TestRig {
let network_beacon_processor = Arc::new(network_beacon_processor);

let beacon_processor = BeaconProcessor {
network_globals,
network_globals: network_globals.clone(),
executor,
current_workers: 0,
config: beacon_processor_config,
Expand All @@ -262,15 +266,35 @@ impl TestRig {

assert!(beacon_processor.is_ok());
let block = next_block_tuple.0;
let blob_sidecars = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
Some(BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap())
let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 {
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let kzg = get_kzg(&chain.spec);
let custody_columns: DataColumnSidecarList<E> = blobs_to_data_column_sidecars(
&blobs.iter().collect_vec(),
&block,
&kzg,
&chain.spec,
)
.unwrap()
.into_iter()
.filter(|c| network_globals.sampling_columns.contains(&c.index))
.collect::<Vec<_>>();

(None, Some(custody_columns))
} else {
let blob_sidecars =
BlobSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap();
(Some(blob_sidecars), None)
}
} else {
None
(None, None)
};

Self {
chain,
next_block: block,
next_blobs: blob_sidecars,
next_data_columns: data_columns,
attestations,
next_block_attestations,
next_block_aggregate_attestations,
Expand Down Expand Up @@ -323,6 +347,22 @@ impl TestRig {
}
}

pub fn enqueue_gossip_data_columns(&self, col_index: usize) {
if let Some(data_columns) = self.next_data_columns.as_ref() {
let data_column = data_columns.get(col_index).unwrap();
self.network_beacon_processor
.send_gossip_data_column_sidecar(
junk_message_id(),
junk_peer_id(),
Client::default(),
DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec),
data_column.clone(),
Duration::from_secs(0),
)
.unwrap();
}
}

pub fn enqueue_rpc_block(&self) {
let block_root = self.next_block.canonical_root();
self.network_beacon_processor
Expand Down Expand Up @@ -361,6 +401,19 @@ impl TestRig {
}
}

pub fn enqueue_single_lookup_rpc_data_columns(&self) {
if let Some(data_columns) = self.next_data_columns.clone() {
self.network_beacon_processor
.send_rpc_custody_columns(
self.next_block.canonical_root(),
data_columns,
Duration::default(),
BlockProcessType::SingleCustodyColumn(1),
)
.unwrap();
}
}

pub fn enqueue_blobs_by_range_request(&self, count: u64) {
self.network_beacon_processor
.send_blobs_by_range_request(
Expand Down Expand Up @@ -618,6 +671,13 @@ async fn import_gossip_block_acceptably_early() {
.await;
}

let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}

// Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock
// and check the head in the time between the block arrived early and when its due for
// processing.
Expand Down Expand Up @@ -694,19 +754,20 @@ async fn import_gossip_block_at_current_slot() {
rig.assert_event_journal_completes(&[WorkType::GossipBlock])
.await;

let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);

let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
for i in 0..num_blobs {
rig.enqueue_gossip_blob(i);

rig.assert_event_journal_completes(&[WorkType::GossipBlobSidecar])
.await;
}

let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
.await;
}

assert_eq!(
rig.head_root(),
rig.next_block.canonical_root(),
Expand Down Expand Up @@ -759,11 +820,8 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
);

// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
BlockImportMethod::Gossip => {
Expand All @@ -773,6 +831,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar);
}
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
Expand All @@ -781,6 +843,10 @@ async fn attestation_to_unknown_block_processed(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
}
};

Expand Down Expand Up @@ -840,11 +906,8 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
);

// Send the block and ensure that the attestation is received back and imported.
let num_blobs = rig
.next_blobs
.as_ref()
.map(|blobs| blobs.len())
.unwrap_or(0);
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
let mut events = vec![];
match import_method {
BlockImportMethod::Gossip => {
Expand All @@ -854,6 +917,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_gossip_blob(i);
events.push(WorkType::GossipBlobSidecar);
}
for i in 0..num_data_columns {
rig.enqueue_gossip_data_columns(i);
events.push(WorkType::GossipDataColumnSidecar)
}
}
BlockImportMethod::Rpc => {
rig.enqueue_rpc_block();
Expand All @@ -862,6 +929,10 @@ async fn aggregate_attestation_to_unknown_block(import_method: BlockImportMethod
rig.enqueue_single_lookup_rpc_blobs();
events.push(WorkType::RpcBlobs);
}
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
events.push(WorkType::RpcCustodyColumn);
}
}
};

Expand Down Expand Up @@ -1046,12 +1117,20 @@ async fn test_rpc_block_reprocessing() {
rig.assert_event_journal_completes(&[WorkType::RpcBlock])
.await;

rig.enqueue_single_lookup_rpc_blobs();
if rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0) > 0 {
let num_blobs = rig.next_blobs.as_ref().map(|b| b.len()).unwrap_or(0);
if num_blobs > 0 {
rig.enqueue_single_lookup_rpc_blobs();
rig.assert_event_journal_completes(&[WorkType::RpcBlobs])
.await;
}

let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
if num_data_columns > 0 {
rig.enqueue_single_lookup_rpc_data_columns();
rig.assert_event_journal_completes(&[WorkType::RpcCustodyColumn])
.await;
}

// next_block shouldn't be processed since it couldn't get the
// duplicate cache handle
assert_ne!(next_block_root, rig.head_root());
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
) -> CustodyRequestResult<T::EthSpec> {
let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else {
warn!(
id = ?self.custody_id,
block_root = ?self.block_root,
%req_id,
"Received custody column response for unrequested index"
Expand All @@ -113,7 +112,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
match resp {
Ok((data_columns, seen_timestamp)) => {
debug!(
id = ?self.custody_id,
block_root = ?self.block_root,
%req_id,
%peer_id,
Expand Down Expand Up @@ -161,7 +159,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
if !missing_column_indexes.is_empty() {
// Note: Batch logging that columns are missing to not spam logger
debug!(
id = ?self.custody_id,
block_root = ?self.block_root,
%req_id,
%peer_id,
Expand All @@ -175,7 +172,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
}
Err(err) => {
debug!(
id = ?self.custody_id,
block_root = ?self.block_root,
%req_id,
%peer_id,
Expand Down
Loading
Loading