Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7137,25 +7137,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Compare columns custodied for `epoch` versus columns custodied for the head of the chain
/// and return any column indices that are missing.
pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet<ColumnIndex> {
pub fn get_missing_columns_for_epoch(
&self,
epoch: Epoch,
) -> Result<HashSet<ColumnIndex>, String> {
let custody_context = self.data_availability_checker.custody_context();

let columns_required = custody_context
.custody_columns_for_epoch(None, &self.spec)
.custody_columns_for_epoch(None, &self.spec)?
.iter()
.cloned()
.collect::<HashSet<_>>();

let current_columns_at_epoch = custody_context
.custody_columns_for_epoch(Some(epoch), &self.spec)
.custody_columns_for_epoch(Some(epoch), &self.spec)?
.iter()
.cloned()
.collect::<HashSet<_>>();

columns_required
let missing_columns = columns_required
.difference(&current_columns_at_epoch)
.cloned()
.collect::<HashSet<_>>()
.collect::<HashSet<_>>();

Ok(missing_columns)
}

/// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch.
Expand Down Expand Up @@ -7442,7 +7447,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
AvailableBlockData::DataColumns(mut data_columns) => {
let columns_to_custody = self.custody_columns_for_epoch(Some(
block_slot.epoch(T::EthSpec::slots_per_epoch()),
));
))?;
// Supernodes need to persist all sampled custody columns
if columns_to_custody.len() != self.spec.number_of_custody_groups as usize {
data_columns
Expand Down Expand Up @@ -7485,7 +7490,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Returns a list of column indices that should be sampled for a given epoch.
/// Used for data availability sampling in PeerDAS.
pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] {
pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> Result<&[ColumnIndex], String> {
self.data_availability_checker
.custody_context()
.sampling_columns_for_epoch(epoch, &self.spec)
Expand All @@ -7496,7 +7501,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// serve them to peers.
///
/// If epoch is `None`, this function computes the custody columns at head.
pub fn custody_columns_for_epoch(&self, epoch_opt: Option<Epoch>) -> &[ColumnIndex] {
pub fn custody_columns_for_epoch(
&self,
epoch_opt: Option<Epoch>,
) -> Result<&[ColumnIndex], String> {
self.data_availability_checker
.custody_context()
.custody_columns_for_epoch(epoch_opt, &self.spec)
Expand Down
62 changes: 40 additions & 22 deletions beacon_node/beacon_chain/src/custody_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,7 @@ impl<E: EthSpec> CustodyContext<E> {
let old_custody_group_count = validator_custody_at_head;
validator_custody_at_head = cgc_from_cli;

let sampling_count = spec
.sampling_size_custody_groups(cgc_from_cli)
.expect("should compute node sampling size from valid chain spec");
let sampling_count = spec.sampling_size_custody_groups(cgc_from_cli);

epoch_validator_custody_requirements.push((effective_epoch, cgc_from_cli));

Expand Down Expand Up @@ -469,22 +467,25 @@ impl<E: EthSpec> CustodyContext<E> {
pub fn num_of_custody_groups_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> u64 {
let custody_group_count = self.custody_group_count_at_epoch(epoch, spec);
spec.sampling_size_custody_groups(custody_group_count)
.expect("should compute node sampling size from valid chain spec")
}

/// Returns the count of columns this node must _sample_ for a block at `epoch` to import.
pub fn num_of_data_columns_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> usize {
pub fn num_of_data_columns_to_sample(
&self,
epoch: Epoch,
spec: &ChainSpec,
) -> Result<usize, String> {
let custody_group_count = self.custody_group_count_at_epoch(epoch, spec);
spec.sampling_size_columns::<E>(custody_group_count)
.expect("should compute node sampling size from valid chain spec")
}

/// Returns whether the node should attempt reconstruction at a given epoch.
pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool {
let min_columns_for_reconstruction = E::number_of_columns() / 2;
// performing reconstruction is not necessary if sampling column count is exactly 50%,
// because the node doesn't need the remaining columns.
self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction
self.num_of_data_columns_to_sample(epoch, spec)
.is_ok_and(|sample_count| sample_count > min_columns_for_reconstruction)
}

/// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch.
Expand All @@ -495,13 +496,17 @@ impl<E: EthSpec> CustodyContext<E> {
///
/// # Returns
/// A slice of ordered column indices that should be sampled for this epoch based on the node's custody configuration
pub fn sampling_columns_for_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> &[ColumnIndex] {
let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec);
pub fn sampling_columns_for_epoch(
&self,
epoch: Epoch,
spec: &ChainSpec,
) -> Result<&[ColumnIndex], String> {
let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec)?;
let all_columns_ordered = self
.all_custody_columns_ordered
.get()
.expect("all_custody_columns_ordered should be initialized");
&all_columns_ordered[..num_of_columns_to_sample]
.ok_or("Custody context has not been initialised")?;
Ok(&all_columns_ordered[..num_of_columns_to_sample])
}

/// Returns the ordered list of column indices that the node is assigned to custody
Expand All @@ -521,7 +526,7 @@ impl<E: EthSpec> CustodyContext<E> {
&self,
epoch_opt: Option<Epoch>,
spec: &ChainSpec,
) -> &[ColumnIndex] {
) -> Result<&[ColumnIndex], String> {
let custody_group_count = if let Some(epoch) = epoch_opt {
self.custody_group_count_at_epoch(epoch, spec) as usize
} else {
Expand All @@ -531,9 +536,9 @@ impl<E: EthSpec> CustodyContext<E> {
let all_columns_ordered = self
.all_custody_columns_ordered
.get()
.expect("all_custody_columns_ordered should be initialized");
.ok_or("Custody context has not been initialised")?;

&all_columns_ordered[..custody_group_count]
Ok(&all_columns_ordered[..custody_group_count])
}

/// The node has completed backfill for this epoch. Update the internal records so the function
Expand Down Expand Up @@ -699,8 +704,7 @@ mod tests {
);
assert_eq!(
cgc_changed.sampling_count,
spec.sampling_size_custody_groups(expected_new_cgc)
.expect("should compute sampling size"),
spec.sampling_size_custody_groups(expected_new_cgc),
"sampling_count should match expected value"
);

Expand Down Expand Up @@ -1025,7 +1029,9 @@ mod tests {
fn should_init_ordered_data_columns_and_return_sampling_columns() {
let spec = E::default_spec();
let custody_context = CustodyContext::<E>::new(NodeCustodyType::Fullnode, &spec);
let sampling_size = custody_context.num_of_data_columns_to_sample(Epoch::new(0), &spec);
let sampling_size = custody_context
.num_of_data_columns_to_sample(Epoch::new(0), &spec)
.unwrap();

// initialise ordered columns
let mut all_custody_groups_ordered = (0..spec.number_of_custody_groups).collect::<Vec<_>>();
Expand All @@ -1038,8 +1044,9 @@ mod tests {
)
.expect("should initialise ordered data columns");

let actual_sampling_columns =
custody_context.sampling_columns_for_epoch(Epoch::new(0), &spec);
let actual_sampling_columns = custody_context
.sampling_columns_for_epoch(Epoch::new(0), &spec)
.unwrap();

let expected_sampling_columns = &all_custody_groups_ordered
.iter()
Expand Down Expand Up @@ -1085,7 +1092,10 @@ mod tests {
.expect("should initialise ordered data columns");

assert_eq!(
custody_context.custody_columns_for_epoch(None, &spec).len(),
custody_context
.custody_columns_for_epoch(None, &spec)
.unwrap()
.len(),
spec.custody_requirement as usize
);
}
Expand All @@ -1101,7 +1111,10 @@ mod tests {
.expect("should initialise ordered data columns");

assert_eq!(
custody_context.custody_columns_for_epoch(None, &spec).len(),
custody_context
.custody_columns_for_epoch(None, &spec)
.unwrap()
.len(),
spec.number_of_custody_groups as usize
);
}
Expand All @@ -1127,7 +1140,10 @@ mod tests {
);

assert_eq!(
custody_context.custody_columns_for_epoch(None, &spec).len(),
custody_context
.custody_columns_for_epoch(None, &spec)
.unwrap()
.len(),
val_custody_units as usize
);
}
Expand All @@ -1147,6 +1163,7 @@ mod tests {
assert_eq!(
custody_context
.custody_columns_for_epoch(Some(test_epoch), &spec)
.unwrap()
.len(),
expected_cgc as usize
);
Expand Down Expand Up @@ -1413,6 +1430,7 @@ mod tests {
assert_eq!(
custody_context
.custody_columns_for_epoch(Some(Epoch::new(15)), &spec)
.unwrap()
.len(),
final_cgc as usize,
);
Expand Down
33 changes: 24 additions & 9 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_columns = self
.custody_context
.sampling_columns_for_epoch(epoch, &self.spec);
.sampling_columns_for_epoch(epoch, &self.spec)
.map_err(AvailabilityCheckError::CustodyContextError)?;
let verified_custody_columns = kzg_verified_columns
.into_iter()
.filter(|col| sampling_columns.contains(&col.index()))
Expand Down Expand Up @@ -315,7 +316,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_columns = self
.custody_context
.sampling_columns_for_epoch(epoch, &self.spec);
.sampling_columns_for_epoch(epoch, &self.spec)
.map_err(AvailabilityCheckError::CustodyContextError)?;
let custody_columns = data_columns
.into_iter()
.filter(|col| sampling_columns.contains(&col.index()))
Expand Down Expand Up @@ -624,7 +626,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

let columns_to_sample = self
.custody_context()
.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec);
.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec)
.map_err(AvailabilityCheckError::CustodyContextError)?;

// We only need to import and publish columns that we need to sample
// and columns that we haven't already received
Expand Down Expand Up @@ -904,7 +907,9 @@ mod test {
&spec,
);
assert_eq!(
custody_context.num_of_data_columns_to_sample(epoch, &spec),
custody_context
.num_of_data_columns_to_sample(epoch, &spec)
.unwrap(),
spec.validator_custody_requirement as usize,
"sampling size should be the minimal custody requirement == 8"
);
Expand Down Expand Up @@ -939,7 +944,9 @@ mod test {
.expect("should put rpc custody columns");

// THEN the sampling size for the end slot of the same epoch remains unchanged
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let sampling_columns = custody_context
.sampling_columns_for_epoch(epoch, &spec)
.unwrap();
assert_eq!(
sampling_columns.len(),
spec.validator_custody_requirement as usize // 8
Expand Down Expand Up @@ -983,7 +990,9 @@ mod test {
&spec,
);
assert_eq!(
custody_context.num_of_data_columns_to_sample(epoch, &spec),
custody_context
.num_of_data_columns_to_sample(epoch, &spec)
.unwrap(),
spec.validator_custody_requirement as usize,
"sampling size should be the minimal custody requirement == 8"
);
Expand Down Expand Up @@ -1017,7 +1026,9 @@ mod test {
.expect("should put gossip custody columns");

// THEN the sampling size for the end slot of the same epoch remains unchanged
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let sampling_columns = custody_context
.sampling_columns_for_epoch(epoch, &spec)
.unwrap();
assert_eq!(
sampling_columns.len(),
spec.validator_custody_requirement as usize // 8
Expand Down Expand Up @@ -1106,7 +1117,9 @@ mod test {
Slot::new(0),
&spec,
);
let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch, &spec);
let sampling_requirement = custody_context
.num_of_data_columns_to_sample(epoch, &spec)
.unwrap();
assert_eq!(
sampling_requirement, 65,
"sampling requirement should be 65"
Expand Down Expand Up @@ -1164,7 +1177,9 @@ mod test {
);

// Only the columns required for custody (65) should be imported into the cache
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
let sampling_columns = custody_context
.sampling_columns_for_epoch(epoch, &spec)
.unwrap();
let actual_cached: HashSet<ColumnIndex> = da_checker
.cached_data_column_indexes(&block_root)
.expect("should have cached data columns")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
CustodyContextError(String),
Unexpected(String),
SszTypes(ssz_types::Error),
MissingBlobs,
Expand Down Expand Up @@ -44,6 +45,7 @@ impl Error {
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::RebuildingStateCaches(_)
| Error::CustodyContextError(_)
| Error::SlotClockError => ErrorCategory::Internal,
Error::InvalidBlobs { .. }
| Error::InvalidColumn { .. }
Expand Down
Loading