Skip to content

Commit 3fab6a2

Browse files
authored
Block availability data enum (#6866)
PeerDAS has undergone multiple refactors + the blending with the get_blobs optimization has generated technical debt. A function signature like this https://github.com/sigp/lighthouse/blob/f008b84079bbb6eb86de22bb3421dfc8263a5650/beacon_node/beacon_chain/src/beacon_chain.rs#L7171-L7178 Allows at least the following combination of states: - blobs: Some / None - data_columns: Some / None - data_column_recv: Some / None - Block has data? Yes / No - Block post-PeerDAS? Yes / No In reality, we don't have that many possible states, only: - `NoData`: pre-deneb, pre-PeerDAS with 0 blobs or post-PeerDAS with 0 blobs - `Blobs(BlobSidecarList<E>)`: post-Deneb pre-PeerDAS with > 0 blobs - `DataColumns(DataColumnSidecarList<E>)`: post-PeerDAS with > 0 blobs - `DataColumnsRecv(oneshot::Receiver<DataColumnSidecarList<E>>)`: post-PeerDAS with > 0 blobs, but we obtained the columns via reconstruction ^ this are the variants of the new `AvailableBlockData` enum So we go from 2^5 states to 4 well-defined. Downstream code benefits nicely from this clarity and I think it makes the whole feature much more maintainable. Currently `is_available` returns a bool, and then we construct the available block in `make_available`. In a way the availability condition is duplicated in both functions. Instead, this PR constructs `AvailableBlockData` in `is_available` so the availability conditions are written once ```rust if let Some(block_data) = is_available(..) { let available_block = make_available(block_data); } ```
1 parent 60964fc commit 3fab6a2

File tree

11 files changed

+426
-355
lines changed

11 files changed

+426
-355
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 50 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use crate::block_verification_types::{
2121
pub use crate::canonical_head::CanonicalHead;
2222
use crate::chain_config::ChainConfig;
2323
use crate::data_availability_checker::{
24-
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
25-
DataColumnReconstructionResult,
24+
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
25+
DataAvailabilityChecker, DataColumnReconstructionResult,
2626
};
2727
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
2828
use crate::early_attester_cache::EarlyAttesterCache;
@@ -3169,7 +3169,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31693169
return Err(BlockError::DuplicateFullyImported(block_root));
31703170
}
31713171

3172-
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
3172+
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
3173+
// consumers don't expect the blobs event to fire erratically.
3174+
if !self
3175+
.spec
3176+
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
3177+
{
3178+
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
3179+
}
31733180

31743181
let r = self
31753182
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
@@ -3640,9 +3647,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36403647
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
36413648
) -> Result<AvailabilityProcessingStatus, BlockError> {
36423649
self.check_blobs_for_slashability(block_root, &blobs)?;
3643-
let availability =
3644-
self.data_availability_checker
3645-
.put_engine_blobs(block_root, blobs, data_column_recv)?;
3650+
let availability = self.data_availability_checker.put_engine_blobs(
3651+
block_root,
3652+
slot.epoch(T::EthSpec::slots_per_epoch()),
3653+
blobs,
3654+
data_column_recv,
3655+
)?;
36463656

36473657
self.process_availability(slot, availability, || Ok(()))
36483658
.await
@@ -3727,7 +3737,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37273737
parent_eth1_finalization_data,
37283738
confirmed_state_roots,
37293739
consensus_context,
3730-
data_column_recv,
37313740
} = import_data;
37323741

37333742
// Record the time at which this block's blobs became available.
@@ -3755,7 +3764,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37553764
parent_block,
37563765
parent_eth1_finalization_data,
37573766
consensus_context,
3758-
data_column_recv,
37593767
)
37603768
},
37613769
"payload_verification_handle",
@@ -3794,7 +3802,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37943802
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
37953803
parent_eth1_finalization_data: Eth1FinalizationData,
37963804
mut consensus_context: ConsensusContext<T::EthSpec>,
3797-
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
37983805
) -> Result<Hash256, BlockError> {
37993806
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
38003807
// Everything in this initial section is on the hot path between processing the block and
@@ -3892,7 +3899,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38923899
if let Some(proto_block) = fork_choice.get_block(&block_root) {
38933900
if let Err(e) = self.early_attester_cache.add_head_block(
38943901
block_root,
3895-
signed_block.clone(),
3902+
&signed_block,
38963903
proto_block,
38973904
&state,
38983905
&self.spec,
@@ -3961,15 +3968,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
39613968
// If the write fails, revert fork choice to the version from disk, else we can
39623969
// end up with blocks in fork choice that are missing from disk.
39633970
// See https://github.com/sigp/lighthouse/issues/2028
3964-
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
3971+
let (_, signed_block, block_data) = signed_block.deconstruct();
39653972

3966-
match self.get_blobs_or_columns_store_op(
3967-
block_root,
3968-
signed_block.epoch(),
3969-
blobs,
3970-
data_columns,
3971-
data_column_recv,
3972-
) {
3973+
match self.get_blobs_or_columns_store_op(block_root, block_data) {
39733974
Ok(Some(blobs_or_columns_store_op)) => {
39743975
ops.push(blobs_or_columns_store_op);
39753976
}
@@ -7218,29 +7219,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
72187219
}
72197220
}
72207221

7221-
fn get_blobs_or_columns_store_op(
7222+
pub(crate) fn get_blobs_or_columns_store_op(
72227223
&self,
72237224
block_root: Hash256,
7224-
block_epoch: Epoch,
7225-
blobs: Option<BlobSidecarList<T::EthSpec>>,
7226-
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
7227-
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
7225+
block_data: AvailableBlockData<T::EthSpec>,
72287226
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
7229-
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
7230-
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
7231-
// custody columns: https://github.com/sigp/lighthouse/issues/6465
7232-
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
7227+
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
7228+
// custody columns: https://github.com/sigp/lighthouse/issues/6465
7229+
let _custody_columns_count = self.data_availability_checker.get_sampling_column_count();
72337230

7234-
let custody_columns_available = data_columns
7235-
.as_ref()
7236-
.as_ref()
7237-
.is_some_and(|columns| columns.len() == custody_columns_count);
7238-
7239-
let data_columns_to_persist = if custody_columns_available {
7240-
// If the block was made available via custody columns received from gossip / rpc, use them
7241-
// since we already have them.
7242-
data_columns
7243-
} else if let Some(data_column_recv) = data_column_recv {
7231+
match block_data {
7232+
AvailableBlockData::NoData => Ok(None),
7233+
AvailableBlockData::Blobs(blobs) => {
7234+
debug!(
7235+
self.log, "Writing blobs to store";
7236+
"block_root" => %block_root,
7237+
"count" => blobs.len(),
7238+
);
7239+
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
7240+
}
7241+
AvailableBlockData::DataColumns(data_columns) => {
7242+
debug!(
7243+
self.log, "Writing data columns to store";
7244+
"block_root" => %block_root,
7245+
"count" => data_columns.len(),
7246+
);
7247+
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
7248+
}
7249+
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
72447250
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
72457251
let _column_recv_timer =
72467252
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
@@ -7250,34 +7256,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
72507256
let computed_data_columns = data_column_recv
72517257
.blocking_recv()
72527258
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
7253-
Some(computed_data_columns)
7254-
} else {
7255-
// No blobs in the block.
7256-
None
7257-
};
7258-
7259-
if let Some(data_columns) = data_columns_to_persist {
7260-
if !data_columns.is_empty() {
7261-
debug!(
7262-
self.log, "Writing data_columns to store";
7263-
"block_root" => %block_root,
7264-
"count" => data_columns.len(),
7265-
);
7266-
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
7267-
}
7268-
}
7269-
} else if let Some(blobs) = blobs {
7270-
if !blobs.is_empty() {
72717259
debug!(
7272-
self.log, "Writing blobs to store";
7260+
self.log, "Writing data columns to store";
72737261
"block_root" => %block_root,
7274-
"count" => blobs.len(),
7262+
"count" => computed_data_columns.len(),
72757263
);
7276-
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
7264+
// TODO(das): Store only this node's custody columns
7265+
Ok(Some(StoreOp::PutDataColumns(
7266+
block_root,
7267+
computed_data_columns,
7268+
)))
72777269
}
72787270
}
7279-
7280-
Ok(None)
72817271
}
72827272
}
72837273

beacon_node/beacon_chain/src/block_verification.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1707,7 +1707,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
17071707
parent_eth1_finalization_data,
17081708
confirmed_state_roots,
17091709
consensus_context,
1710-
data_column_recv: None,
17111710
},
17121711
payload_verification_handle,
17131712
})

beacon_node/beacon_chain/src/block_verification_types.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use derivative::Derivative;
77
use state_processing::ConsensusContext;
88
use std::fmt::{Debug, Formatter};
99
use std::sync::Arc;
10-
use tokio::sync::oneshot;
1110
use types::blob_sidecar::BlobIdentifier;
1211
use types::{
13-
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
14-
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
12+
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
13+
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
1514
};
1615

1716
/// A block that has been received over RPC. It has 2 internal variants:
@@ -265,7 +264,6 @@ impl<E: EthSpec> ExecutedBlock<E> {
265264

266265
/// A block that has completed all pre-deneb block processing checks including verification
267266
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
268-
#[derive(PartialEq)]
269267
pub struct AvailableExecutedBlock<E: EthSpec> {
270268
pub block: AvailableBlock<E>,
271269
pub import_data: BlockImportData<E>,
@@ -338,21 +336,14 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
338336
}
339337
}
340338

341-
#[derive(Debug, Derivative)]
342-
#[derivative(PartialEq)]
339+
#[derive(Debug, PartialEq)]
343340
pub struct BlockImportData<E: EthSpec> {
344341
pub block_root: Hash256,
345342
pub state: BeaconState<E>,
346343
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
347344
pub parent_eth1_finalization_data: Eth1FinalizationData,
348345
pub confirmed_state_roots: Vec<Hash256>,
349346
pub consensus_context: ConsensusContext<E>,
350-
#[derivative(PartialEq = "ignore")]
351-
/// An optional receiver for `DataColumnSidecarList`.
352-
///
353-
/// This field is `Some` when data columns are being computed asynchronously.
354-
/// The resulting `DataColumnSidecarList` will be sent through this receiver.
355-
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
356347
}
357348

358349
impl<E: EthSpec> BlockImportData<E> {
@@ -371,7 +362,6 @@ impl<E: EthSpec> BlockImportData<E> {
371362
},
372363
confirmed_state_roots: vec![],
373364
consensus_context: ConsensusContext::new(Slot::new(0)),
374-
data_column_recv: None,
375365
}
376366
}
377367
}

0 commit comments

Comments
 (0)