Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 60 additions & 51 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnsToPublish,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -3012,13 +3011,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
Expand Down Expand Up @@ -3102,13 +3095,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = custody_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
Expand Down Expand Up @@ -3149,6 +3136,45 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified_custody_columns(&block_root, r)
}

pub async fn reconstruct_data_columns(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<
Option<(
AvailabilityProcessingStatus,
DataColumnSidecarList<T::EthSpec>,
)>,
BlockError,
> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
// all columns.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Ok(None);
}

let Some((availability, data_columns_to_publish)) = self
.data_availability_checker
.reconstruct_data_columns(&block_root)?
else {
return Ok(None);
};

let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
return Ok(None);
};

let r = self.process_availability(slot, availability).await;
self.remove_notified_custody_columns(&block_root, r)
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand All @@ -3166,15 +3192,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns<P>(
fn remove_notified_custody_columns(
&self,
block_root: &Hash256,
r: Result<(AvailabilityProcessingStatus, P), BlockError>,
) -> Result<(AvailabilityProcessingStatus, P), BlockError> {
let has_missing_components = matches!(
r,
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
);
r: Result<AvailabilityProcessingStatus, BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
Expand Down Expand Up @@ -3432,26 +3456,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_gossip_data_columns(slot, block_root, data_columns)?;
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;

self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3500,13 +3518,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError,
> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3531,16 +3543,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let (availability, data_columns_to_publish) =
self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;

self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ where
store,
self.import_all_data_columns,
self.spec,
log.new(o!("service" => "data_availability_checker")),
)
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
),
Expand Down
Loading