From e916550a0acef41a26242d86b33fbc0eb39855d9 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 12:10:08 +1000 Subject: [PATCH 1/7] Add tracing span to reconstruction. --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +++++- beacon_node/network/src/network_beacon_processor/mod.rs | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6e11b666102..1a0f24b3732 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3299,10 +3299,14 @@ impl BeaconChain { let data_availability_checker = self.data_availability_checker.clone(); + let current_span = Span::current(); let result = self .task_executor .spawn_blocking_handle( - move || data_availability_checker.reconstruct_data_columns(&block_root), + move || { + let _guard = current_span.enter(); + data_availability_checker.reconstruct_data_columns(&block_root) + }, "reconstruct_data_columns", ) .ok_or(BeaconChainError::RuntimeShutdown)? diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 73349cd4314..07ea1c68b41 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc::{self, error::TrySendError}; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, instrument, trace, warn}; use types::*; pub use sync_methods::ChainSegmentProcessId; @@ -834,6 +834,7 @@ impl NetworkBeaconProcessor { /// /// The `publish_columns` parameter controls whether reconstructed columns should be published /// to the gossip network. + #[instrument(level = "debug", skip_all, fields(?block_root))] async fn attempt_data_column_reconstruction( self: &Arc, block_root: Hash256, @@ -850,6 +851,7 @@ impl NetworkBeaconProcessor { } let result = self.chain.reconstruct_data_columns(block_root).await; + match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { if publish_columns { @@ -978,6 +980,7 @@ impl NetworkBeaconProcessor { /// by some nodes on the network as soon as possible. Our hope is that some columns arrive from /// other nodes in the meantime, obviating the need for us to publish them. If no other /// publisher exists for a column, it will eventually get published here. + #[instrument(level="debug", skip_all, fields(?block_root, data_column_count=data_columns_to_publish.len()))] fn publish_data_columns_gradually( self: &Arc, mut data_columns_to_publish: DataColumnSidecarList, From a62f4e5c14d15eab825a6052933ae6a4651d6d84 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 14:02:08 +1000 Subject: [PATCH 2/7] Add missing span --- beacon_node/beacon_chain/src/data_availability_checker.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9225ed6b47b..307dc0e227a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -547,6 +547,7 @@ impl DataAvailabilityChecker { } } + #[instrument(skip_all, level = "debug")] pub fn reconstruct_data_columns( &self, block_root: &Hash256, From b2c4c1d19b814a6797027996d10e64efec27fa1d Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 14:23:19 +1000 Subject: [PATCH 3/7] Remove column reconstruction when processing rpc requests (#8051) commit a209f724552d2ab2baa5a84f71ba8e81969b0d4e Author: Eitan Seri- Levi Date: Mon Sep 15 20:18:40 2025 -0700 remove publish flag commit a4044a9b173620669b6a31c3681381d5b779af93 Author: Eitan Seri- Levi Date: Mon Sep 15 20:14:34 2025 -0700 remove publish flag commit eab76933e3208a41886b2aa99157394efd0ee6b0 Author: Eitan Seri- Levi Date: Mon Sep 15 18:56:19 2025 -0700 linting commit a3e0aecb46c27e3189364777883c844d29a18212 Author: Eitan Seri- Levi Date: Mon Sep 15 18:53:23 2025 -0700 Delete reconstruction when processesing column rpc requests --- .../src/network_beacon_processor/gossip_methods.rs | 2 +- .../network/src/network_beacon_processor/mod.rs | 5 +---- .../src/network_beacon_processor/sync_methods.rs | 13 +------------ 3 files changed, 3 insertions(+), 17 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 1f1a3427e78..bc44db40e9e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1067,7 +1067,7 @@ impl NetworkBeaconProcessor { slot: *slot, process_fn: Box::pin(async move { cloned_self - .attempt_data_column_reconstruction(block_root, true) + .attempt_data_column_reconstruction(block_root) .await; }), }, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 07ea1c68b41..eb1c5a92033 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -838,7 +838,6 @@ impl NetworkBeaconProcessor { async fn attempt_data_column_reconstruction( self: &Arc, block_root: Hash256, - publish_columns: bool, ) -> Option { // Only supernodes attempt reconstruction if !self @@ -854,9 +853,7 @@ impl NetworkBeaconProcessor { match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { - if publish_columns { - self.publish_data_columns_gradually(data_columns_to_publish, block_root); - } + self.publish_data_columns_gradually(data_columns_to_publish, block_root); match &availability_processing_status { AvailabilityProcessingStatus::Imported(hash) => { debug!( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index f24495cc54c..edeed7e98cf 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -383,7 +383,7 @@ impl NetworkBeaconProcessor { "RPC custody data columns received" ); - let mut result = self + let result = self .chain .process_rpc_custody_columns(custody_columns) .await; @@ -404,17 +404,6 @@ impl NetworkBeaconProcessor { block_hash = %block_root, "Missing components over rpc" ); - // Attempt reconstruction here before notifying sync, to avoid sending out more requests - // that we may no longer need. - // We don't publish columns reconstructed from rpc columns to the gossip network, - // as these are likely historic columns. - let publish_columns = false; - if let Some(availability) = self - .attempt_data_column_reconstruction(block_root, publish_columns) - .await - { - result = Ok(availability) - } } }, Err(BlockError::DuplicateFullyImported(_)) => { From 7b67ee3db93859f8f898d583efced97b39f0f300 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 14:53:42 +1000 Subject: [PATCH 4/7] Never trigger reconstruction immediately if queue is full. Updated the trigger to attempt reconstruction if column count is >= 50%. --- .../beacon_chain/src/validator_custody.rs | 8 ++- .../gossip_methods.rs | 66 ++++++++++--------- .../src/network_beacon_processor/mod.rs | 23 +------ 3 files changed, 46 insertions(+), 51 deletions(-) diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 1c89624f9d7..4634fbde5e3 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -130,7 +130,7 @@ pub struct CustodyContext { /// and enr values. validator_custody_count: AtomicU64, /// Is the node run as a supernode based on current cli parameters. - pub current_is_supernode: bool, + current_is_supernode: bool, /// The persisted value for `is_supernode` based on the previous run of this node. /// /// Note: We require this value because if a user restarts the node with a higher cli custody @@ -307,6 +307,12 @@ impl CustodyContext { .expect("should compute node sampling size from valid chain spec") } + /// Returns whether the node should attempt reconstruction at a given epoch. + pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool { + let min_columns_for_reconstruction = E::number_of_columns() / 2; + self.num_of_data_columns_to_sample(epoch, spec) >= min_columns_for_reconstruction + } + /// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch. /// /// # Parameters diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index bc44db40e9e..e7c79e38af6 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -34,7 +34,6 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; -use tokio::sync::mpsc::error::TrySendError; use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn}; use types::{ Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar, @@ -1054,36 +1053,43 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - // Instead of triggering reconstruction immediately, schedule it to be run. If - // another column arrives it either completes availability or pushes - // reconstruction back a bit. - let cloned_self = Arc::clone(self); - let block_root = *block_root; - let send_result = self.beacon_processor_send.try_send(WorkEvent { - drop_during_sync: false, - work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( - QueuedColumnReconstruction { - block_root, - slot: *slot, - process_fn: Box::pin(async move { - cloned_self - .attempt_data_column_reconstruction(block_root) - .await; - }), - }, - )), - }); - if let Err(TrySendError::Full(WorkEvent { - work: - Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( - reconstruction, - )), - .. - })) = send_result + if !self + .chain + .data_availability_checker + .custody_context() + .should_attempt_reconstruction( + slot.epoch(T::EthSpec::slots_per_epoch()), + &self.chain.spec, + ) { - warn!("Unable to send reconstruction to reprocessing"); - // Execute it immediately instead. - reconstruction.process_fn.await; + // Instead of triggering reconstruction immediately, schedule it to be run. If + // another column arrives, it either completes availability or pushes + // reconstruction back a bit. + let cloned_self = Arc::clone(self); + let block_root = *block_root; + + if self + .beacon_processor_send + .try_send(WorkEvent { + drop_during_sync: false, + work: Work::Reprocess( + ReprocessQueueMessage::DelayColumnReconstruction( + QueuedColumnReconstruction { + block_root, + slot: *slot, + process_fn: Box::pin(async move { + cloned_self + .attempt_data_column_reconstruction(block_root) + .await; + }), + }, + ), + ), + }) + .is_err() + { + warn!("Unable to send reconstruction to reprocessing"); + } } } }, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index eb1c5a92033..f001fbbad23 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -835,20 +835,7 @@ impl NetworkBeaconProcessor { /// The `publish_columns` parameter controls whether reconstructed columns should be published /// to the gossip network. #[instrument(level = "debug", skip_all, fields(?block_root))] - async fn attempt_data_column_reconstruction( - self: &Arc, - block_root: Hash256, - ) -> Option { - // Only supernodes attempt reconstruction - if !self - .chain - .data_availability_checker - .custody_context() - .current_is_supernode - { - return None; - } - + async fn attempt_data_column_reconstruction(self: &Arc, block_root: Hash256) { let result = self.chain.reconstruct_data_columns(block_root).await; match result { @@ -866,21 +853,18 @@ impl NetworkBeaconProcessor { AvailabilityProcessingStatus::MissingComponents(_, _) => { debug!( result = "imported all custody columns", - block_hash = %block_root, + %block_root, "Block components still missing block after reconstruction" ); } } - - Some(availability_processing_status) } Ok(None) => { // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric trace!( - block_hash = %block_root, + %block_root, "Reconstruction not required for block" ); - None } Err(e) => { error!( @@ -888,7 +872,6 @@ impl NetworkBeaconProcessor { error = ?e, "Error during data column reconstruction" ); - None } } } From 580e3e6def298b1487e1ba99308fb51d43ad1517 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 15:13:29 +1000 Subject: [PATCH 5/7] Update docs and ensure reconstruction is triggered correctly. --- .../overflow_lru_cache.rs | 22 ++++++++++++++----- .../src/network_beacon_processor/mod.rs | 11 ++-------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 9de63f61261..6afb680ddb8 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -592,9 +592,9 @@ impl DataAvailabilityCheckerInner { /// Check whether data column reconstruction should be attempted. /// - /// Potentially trigger reconstruction if: - /// - Our custody requirement is all columns (supernode), and we haven't got all columns - /// - We have >= 50% of columns, but not all columns + /// Potentially trigger reconstruction if all the following satisfy: + /// - Our custody requirement is more than 50% of total columns, + /// - We haven't received all required columns /// - Reconstruction hasn't been started for the block /// /// If reconstruction is required, returns `PendingComponents` which contains the @@ -609,15 +609,25 @@ impl DataAvailabilityCheckerInner { return ReconstructColumnsDecision::No("block already imported"); }; - // If we're sampling all columns, it means we must be custodying all columns. + let Some(epoch) = pending_components + .verified_data_columns + .first() + .map(|c| c.as_data_column().epoch()) + else { + return ReconstructColumnsDecision::No("not enough columns"); + }; + let total_column_count = T::EthSpec::number_of_columns(); + let sampling_column_count = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); let received_column_count = pending_components.verified_data_columns.len(); if pending_components.reconstruction_started { return ReconstructColumnsDecision::No("already started"); } - if received_column_count >= total_column_count { - return ReconstructColumnsDecision::No("all columns received"); + if received_column_count >= sampling_column_count { + return ReconstructColumnsDecision::No("all sampling columns received"); } if received_column_count < total_column_count / 2 { return ReconstructColumnsDecision::No("not enough columns"); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f001fbbad23..691c06f2687 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -825,15 +825,8 @@ impl NetworkBeaconProcessor { } } - /// Attempt to reconstruct all data columns if the following conditions satisfies: - /// - Our custody requirement is all columns - /// - We have >= 50% of columns, but not all columns - /// - /// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed, - /// otherwise returns `None`. - /// - /// The `publish_columns` parameter controls whether reconstructed columns should be published - /// to the gossip network. + /// Attempts to reconstruct all data columns if the conditions checked in + /// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied. #[instrument(level = "debug", skip_all, fields(?block_root))] async fn attempt_data_column_reconstruction(self: &Arc, block_root: Hash256) { let result = self.chain.reconstruct_data_columns(block_root).await; From b62a61908f0537774ea9b44c1ed9f73eb98ecbbb Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 16:05:21 +1000 Subject: [PATCH 6/7] Fix test and update trigger condition. --- beacon_node/beacon_chain/src/validator_custody.rs | 4 +++- .../network/src/network_beacon_processor/gossip_methods.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 4634fbde5e3..3ab76828c9c 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -310,7 +310,9 @@ impl CustodyContext { /// Returns whether the node should attempt reconstruction at a given epoch. pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool { let min_columns_for_reconstruction = E::number_of_columns() / 2; - self.num_of_data_columns_to_sample(epoch, spec) >= min_columns_for_reconstruction + // performing reconstruction is not necessary if sampling column count is exactly 50%, + // because the node doesn't need the remaining columns. + self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction } /// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch. diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index e7c79e38af6..b3d717142f5 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1053,7 +1053,7 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - if !self + if self .chain .data_availability_checker .custody_context() From 800c1802883a6e7d0645d267ae946a53f42bfb1b Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 16 Sep 2025 17:10:45 +1000 Subject: [PATCH 7/7] Remove reconstruction work assertion on non supernode test. --- beacon_node/network/src/network_beacon_processor/tests.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 2935c2d2132..d3a93d48637 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1009,10 +1009,6 @@ async fn import_gossip_block_acceptably_early() { rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) .await; } - if num_data_columns > 0 { - rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction]) - .await; - } // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock // and check the head in the time between the block arrived early and when its due for