Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3299,10 +3299,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let data_availability_checker = self.data_availability_checker.clone();

let current_span = Span::current();
let result = self
.task_executor
.spawn_blocking_handle(
move || data_availability_checker.reconstruct_data_columns(&block_root),
move || {
let _guard = current_span.enter();
data_availability_checker.reconstruct_data_columns(&block_root)
},
"reconstruct_data_columns",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}
}

#[instrument(skip_all, level = "debug")]
pub fn reconstruct_data_columns(
&self,
block_root: &Hash256,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {

/// Check whether data column reconstruction should be attempted.
///
/// Potentially trigger reconstruction if:
/// - Our custody requirement is all columns (supernode), and we haven't got all columns
/// - We have >= 50% of columns, but not all columns
/// Potentially trigger reconstruction if all the following satisfy:
/// - Our custody requirement is more than 50% of total columns,
/// - We haven't received all required columns
/// - Reconstruction hasn't been started for the block
///
/// If reconstruction is required, returns `PendingComponents` which contains the
Expand All @@ -609,15 +609,25 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
return ReconstructColumnsDecision::No("block already imported");
};

// If we're sampling all columns, it means we must be custodying all columns.
let Some(epoch) = pending_components
.verified_data_columns
.first()
.map(|c| c.as_data_column().epoch())
else {
return ReconstructColumnsDecision::No("not enough columns");
};

let total_column_count = T::EthSpec::number_of_columns();
let sampling_column_count = self
.custody_context
.num_of_data_columns_to_sample(epoch, &self.spec);
let received_column_count = pending_components.verified_data_columns.len();

if pending_components.reconstruction_started {
return ReconstructColumnsDecision::No("already started");
}
if received_column_count >= total_column_count {
return ReconstructColumnsDecision::No("all columns received");
if received_column_count >= sampling_column_count {
return ReconstructColumnsDecision::No("all sampling columns received");
}
if received_column_count < total_column_count / 2 {
return ReconstructColumnsDecision::No("not enough columns");
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/beacon_chain/src/validator_custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub struct CustodyContext<E: EthSpec> {
/// and enr values.
validator_custody_count: AtomicU64,
/// Is the node run as a supernode based on current cli parameters.
pub current_is_supernode: bool,
current_is_supernode: bool,
/// The persisted value for `is_supernode` based on the previous run of this node.
///
/// Note: We require this value because if a user restarts the node with a higher cli custody
Expand Down Expand Up @@ -307,6 +307,14 @@ impl<E: EthSpec> CustodyContext<E> {
.expect("should compute node sampling size from valid chain spec")
}

/// Returns whether the node should attempt reconstruction at a given epoch.
pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool {
let min_columns_for_reconstruction = E::number_of_columns() / 2;
// performing reconstruction is not necessary if sampling column count is exactly 50%,
// because the node doesn't need the remaining columns.
self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction
}

/// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch.
///
/// # Parameters
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/execution_layer/src/engine_api/json_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ impl<E: EthSpec> From<JsonBlobsBundleV1<E>> for BlobsBundle<E> {
)
)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
#[serde(bound = "E: EthSpec")]
pub struct BlobAndProof<E: EthSpec> {
#[serde(with = "ssz_types::serde_utils::hex_fixed_vec")]
pub blob: Blob<E>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use crate::engine_api::{
},
};
use crate::engines::ForkchoiceState;
use crate::json_structures::{BlobAndProof, BlobAndProofV1, BlobAndProofV2};
use eth2::types::BlobsBundle;
use kzg::{Kzg, KzgCommitment, KzgProof};
use parking_lot::Mutex;
use rand::{Rng, SeedableRng, rngs::StdRng};
use serde::{Deserialize, Serialize};
use ssz::Decode;
use ssz_types::VariableList;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::collections::HashMap;
use std::sync::Arc;
use tree_hash::TreeHash;
Expand Down Expand Up @@ -479,6 +481,41 @@ impl<E: EthSpec> ExecutionBlockGenerator<E> {
self.blobs_bundles.get(id).cloned()
}

pub fn get_blob_and_proofs(&self, versioned_hashes: Vec<Hash256>) -> Vec<BlobAndProof<E>> {
self.blobs_bundles
.values()
// assumes all versioned hashes are present in the same blobs_bundles, short-circuit if found.
.find_map(|blobs_bundle| {
let blobs_and_proofs = blobs_bundle
.commitments
.iter()
.enumerate()
.filter(|(_, commitment)| {
let hash = kzg_commitment_to_versioned_hash(commitment);
versioned_hashes.contains(&hash)
})
.filter_map(|(blob_idx, _)| {
let is_fulu_bundle = blobs_bundle.blobs.len() < blobs_bundle.proofs.len();
let blob = blobs_bundle.blobs.get(blob_idx)?.clone();
if is_fulu_bundle {
let start = blob_idx * E::cells_per_ext_blob();
let end = start + E::cells_per_ext_blob();
let proofs = blobs_bundle.proofs.get(start..end)?.to_vec().into();
Some(BlobAndProof::V2(BlobAndProofV2 { blob, proofs }))
} else {
Some(BlobAndProof::V1(BlobAndProofV1 {
blob,
proof: *blobs_bundle.proofs.get(blob_idx)?,
}))
}
})
.collect::<Vec<_>>();

(!blobs_and_proofs.is_empty()).then_some(blobs_and_proofs)
})
.unwrap_or_default()
}

pub fn new_payload(&mut self, payload: ExecutionPayload<E>) -> PayloadStatusV1 {
let Some(parent) = self.blocks.get(&payload.parent_hash()) else {
return PayloadStatusV1 {
Expand Down
33 changes: 33 additions & 0 deletions beacon_node/execution_layer/src/test_utils/handle_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,39 @@ pub async fn handle_rpc<E: EthSpec>(
_ => unreachable!(),
}
}
ENGINE_GET_BLOBS_V1 | ENGINE_GET_BLOBS_V2 => {
let versioned_hashes =
get_param::<Vec<Hash256>>(params, 0).map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?;
let blobs_and_proofs_vec = ctx
.execution_block_generator
.read()
.get_blob_and_proofs(versioned_hashes);

// validate method called correctly according to the blob and proof type
// as the blob bundle generated are based on the fork
if let Some(blob_and_proofs) = blobs_and_proofs_vec.first().as_ref() {
match blob_and_proofs {
BlobAndProof::V1(_) => {
if method == ENGINE_GET_BLOBS_V2 {
return Err((
format!("{} called before Fulu fork!", method),
FORK_REQUEST_MISMATCH_ERROR_CODE,
));
}
}
BlobAndProof::V2(_) => {
if method == ENGINE_GET_BLOBS_V1 {
return Err((
format!("{} called after Fulu fork!", method),
FORK_REQUEST_MISMATCH_ERROR_CODE,
));
}
}
}
}

Ok(serde_json::to_value(blobs_and_proofs_vec).unwrap())
}
ENGINE_FORKCHOICE_UPDATED_V1
| ENGINE_FORKCHOICE_UPDATED_V2
| ENGINE_FORKCHOICE_UPDATED_V3 => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::HotColdDBError;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
use types::{
Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar,
Expand Down Expand Up @@ -1054,36 +1053,43 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Processed data column, waiting for other components"
);

// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;
let send_result = self.beacon_processor_send.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot: *slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root)
.await;
}),
},
)),
});
if let Err(TrySendError::Full(WorkEvent {
work:
Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
reconstruction,
)),
..
})) = send_result
if self
.chain
.data_availability_checker
.custody_context()
.should_attempt_reconstruction(
slot.epoch(T::EthSpec::slots_per_epoch()),
&self.chain.spec,
)
{
warn!("Unable to send reconstruction to reprocessing");
// Execute it immediately instead.
reconstruction.process_fn.await;
// Instead of triggering reconstruction immediately, schedule it to be run. If
// another column arrives, it either completes availability or pushes
// reconstruction back a bit.
let cloned_self = Arc::clone(self);
let block_root = *block_root;

if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(
ReprocessQueueMessage::DelayColumnReconstruction(
QueuedColumnReconstruction {
block_root,
slot: *slot,
process_fn: Box::pin(async move {
cloned_self
.attempt_data_column_reconstruction(block_root)
.await;
}),
},
),
),
})
.is_err()
{
warn!("Unable to send reconstruction to reprocessing");
}
}
}
},
Expand Down
39 changes: 9 additions & 30 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, instrument, trace, warn};
use types::*;

pub use sync_methods::ChainSegmentProcessId;
Expand Down Expand Up @@ -825,30 +825,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

/// Attempt to reconstruct all data columns if the following conditions satisfies:
/// - Our custody requirement is all columns
/// - We have >= 50% of columns, but not all columns
///
/// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed,
/// otherwise returns `None`.
///
/// The `publish_columns` parameter controls whether reconstructed columns should be published
/// to the gossip network.
async fn attempt_data_column_reconstruction(
self: &Arc<Self>,
block_root: Hash256,
) -> Option<AvailabilityProcessingStatus> {
// Only supernodes attempt reconstruction
if !self
.chain
.data_availability_checker
.custody_context()
.current_is_supernode
{
return None;
}

/// Attempts to reconstruct all data columns if the conditions checked in
/// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied.
#[instrument(level = "debug", skip_all, fields(?block_root))]
async fn attempt_data_column_reconstruction(self: &Arc<Self>, block_root: Hash256) {
let result = self.chain.reconstruct_data_columns(block_root).await;

match result {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
self.publish_data_columns_gradually(data_columns_to_publish, block_root);
Expand All @@ -864,29 +846,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
AvailabilityProcessingStatus::MissingComponents(_, _) => {
debug!(
result = "imported all custody columns",
block_hash = %block_root,
%block_root,
"Block components still missing block after reconstruction"
);
}
}

Some(availability_processing_status)
}
Ok(None) => {
// reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric
trace!(
block_hash = %block_root,
%block_root,
"Reconstruction not required for block"
);
None
}
Err(e) => {
error!(
%block_root,
error = ?e,
"Error during data column reconstruction"
);
None
}
}
}
Expand Down Expand Up @@ -975,6 +953,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// by some nodes on the network as soon as possible. Our hope is that some columns arrive from
/// other nodes in the meantime, obviating the need for us to publish them. If no other
/// publisher exists for a column, it will eventually get published here.
#[instrument(level="debug", skip_all, fields(?block_root, data_column_count=data_columns_to_publish.len()))]
fn publish_data_columns_gradually(
self: &Arc<Self>,
mut data_columns_to_publish: DataColumnSidecarList<T::EthSpec>,
Expand Down
Loading