Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7514,7 +7514,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This method:
/// 1. Verifies the BLS signature over the proof message
/// 2. Verifies the proof via the ProofEngine (execution engine RPC)
/// 2. Verifies the proof via the ProofEngine
/// 3. If the proof is valid, updates fork choice to mark the corresponding block as valid.
///
/// # Returns
Expand Down Expand Up @@ -7619,24 +7619,42 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Update fork choice using spawn_blocking_handle to avoid lock contention.
let chain = self.clone();
self.spawn_blocking_handle(
move || {
chain
.canonical_head
.fork_choice_write_lock()
.on_valid_execution_payload(block_root)
},
"verify_execution_proof_fork_choice_update",
)
.await??;

info!(
?block_root,
?request_root,
"Updated fork choice for verified proof"
);
let fc_result: Result<(), ForkChoiceError> = self
.spawn_blocking_handle(
move || {
chain
.canonical_head
.fork_choice_write_lock()
.on_valid_execution_payload(block_root)
},
"verify_execution_proof_fork_choice_update",
)
.await?;

match fc_result {
Ok(()) => {
info!(
?block_root,
?request_root,
"Updated fork choice for verified proof"
);
}
// There is a chance that a race condition occurs where the block has not been
// imported into fork choice yet. This is a benign condition that can be ignored
// caused by proof verification time < block execution time.
Err(ForkChoiceError::FailedToProcessValidExecutionPayload(ref msg))
if msg.contains("NodeUnknown") =>
{
warn!(
?block_root,
?request_root,
"Proof valid but block not yet in fork choice, skipping fc update"
);
}
Err(e) => return Err(Error::ForkChoiceError(e)),
}

// Look up the slot so callers can update local execution proof status.
// Look up the slot so caller can update local execution proof status.
let slot = self
.store
.get_blinded_block(&block_root)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub fn register_mock_proof_engine<E: EthSpec>(
requests: stored.requests.clone(),
event_tx: stored.event_tx.clone(),
call_tx: stored.call_tx.clone(),
callback_delay_ms: stored.callback_delay_ms,
proof_generation_delay: stored.proof_generation_delay,
_phantom: PhantomData,
};
MOCK_REGISTRY.lock().insert(index, stored);
Expand All @@ -138,7 +138,7 @@ pub fn get_mock_proof_engine<E: EthSpec>(index: usize) -> Option<MockProofNodeCl
requests: stored.requests.clone(),
event_tx: stored.event_tx.clone(),
call_tx: stored.call_tx.clone(),
callback_delay_ms: stored.callback_delay_ms,
proof_generation_delay: stored.proof_generation_delay,
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -193,7 +193,7 @@ pub struct MockProofNodeClient<E: EthSpec> {
/// Broadcast channel for method-invocation events.
call_tx: broadcast::Sender<MockClientEvent>,
/// Delay in milliseconds before broadcasting proof complete events.
callback_delay_ms: u64,
proof_generation_delay: u64,
_phantom: PhantomData<E>,
}

Expand All @@ -203,7 +203,7 @@ impl<E: EthSpec> Clone for MockProofNodeClient<E> {
requests: self.requests.clone(),
event_tx: self.event_tx.clone(),
call_tx: self.call_tx.clone(),
callback_delay_ms: self.callback_delay_ms,
proof_generation_delay: self.proof_generation_delay,
_phantom: PhantomData,
}
}
Expand All @@ -221,7 +221,7 @@ impl<E: EthSpec> MockProofNodeClient<E> {
requests: Arc::new(Mutex::new(Vec::new())),
event_tx,
call_tx,
callback_delay_ms,
proof_generation_delay: callback_delay_ms,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<E: EthSpec> ProofNodeClient for MockProofNodeClient<E> {
});

let event_tx = self.event_tx.clone();
let delay = self.callback_delay_ms;
let delay = self.proof_generation_delay;
let proof_types = proof_attributes.proof_types.clone();

tokio::spawn(async move {
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub const DEFAULT_TCP_PORT: u16 = 9000u16;
pub const DEFAULT_DISC_PORT: u16 = 9000u16;
pub const DEFAULT_QUIC_PORT: u16 = 9001u16;
pub const DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD: usize = 1000usize;
pub const DEFAULT_PROOF_SYNC_ACTIVATION_SLOTS: u64 = 10;

pub struct GossipsubConfigParams {
pub message_domain_valid_snappy: [u8; 4],
Expand Down Expand Up @@ -129,6 +130,11 @@ pub struct Config {
/// Set to `true` only when `--proof-engine-endpoint` is configured.
pub enable_execution_proof: bool,

/// Number of slot ticks to wait after range sync completes before issuing
/// `ExecutionProofsByRange` requests. Gives the beacon processor time to finish
/// calling `notify_new_payload` for all imported blocks before proofs are requested.
pub proof_sync_activation_slots: u64,

/// Configuration for the outbound rate limiter (requests made by this node).
pub outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,

Expand Down Expand Up @@ -364,6 +370,7 @@ impl Default for Config {
metrics_enabled: false,
enable_light_client_server: true,
enable_execution_proof: false,
proof_sync_activation_slots: DEFAULT_PROOF_SYNC_ACTIVATION_SLOTS,
outbound_rate_limiter_config: None,
invalid_block_storage: None,
inbound_rate_limiter_config: None,
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lighthouse_network::Enr;
use lighthouse_network::identity::Keypair;
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::RequestType;
use lighthouse_network::rpc::methods::RpcResponse;
use lighthouse_network::rpc::methods::{ExecutionProofStatus, RpcResponse};
use lighthouse_network::service::Network;
use lighthouse_network::types::GossipKind;
use lighthouse_network::{
Expand Down Expand Up @@ -291,6 +291,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
)
.await?;

network_globals.set_local_execution_proof_status(ExecutionProofStatus {
slot: 0,
block_root: beacon_chain.genesis_block_root,
});

// Repopulate the DHT with stored ENR's if discovery is not disabled.
if !config.disable_discovery {
let enrs_to_load = load_dht::<T::EthSpec, T::HotStore, T::ColdStore>(store.clone());
Expand Down
13 changes: 12 additions & 1 deletion beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
notified_unknown_roots: LRUTimeCache::new(Duration::from_secs(
NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS,
)),
proof_sync: ProofSync::new(beacon_chain.clone()),
proof_sync: ProofSync::new(
beacon_chain.clone(),
network_globals.config.proof_sync_activation_slots,
),
}
}

Expand Down Expand Up @@ -416,6 +419,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
#[cfg(test)]
pub(crate) fn start_proof_sync(&mut self) {
self.proof_sync.start(&mut self.network);
// Advance through the Waiting countdown so callers immediately see Syncing state,
// matching pre-Waiting behaviour in unit tests.
while matches!(
self.proof_sync.state(),
super::proof_sync::ProofSyncState::Waiting(_)
) {
self.proof_sync.poll(&mut self.network);
}
}

fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down
67 changes: 48 additions & 19 deletions beacon_node/network/src/sync/proof_sync.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! ProofSync: catch-up mechanism for EIP-8025 execution proofs.
//!
//! Operates in two states: `Idle` (range sync active, no proof work) and `Syncing`
//! (proof catchup active). In `Syncing`, each poll computes the slot gap between the
//! finalized epoch and the current head and chooses the most efficient strategy:
//! a bulk `ExecutionProofsByRange` request for large gaps, or targeted
//! Operates in three states: `Idle` (range sync active, no proof work), `Waiting(n)`
//! (counting down n slot ticks after range sync completes before activating), and
//! `Syncing` (proof catchup active). In `Syncing`, each poll computes the slot gap
//! between the finalized epoch and the current head and chooses the most efficient
//! strategy: a bulk `ExecutionProofsByRange` request for large gaps, or targeted
//! `ExecutionProofsByRoot` requests when the gap is small.

use super::network_context::{CachedExecutionProofStatus, SyncNetworkContext};
Expand Down Expand Up @@ -42,18 +43,23 @@ const DEFAULT_MAX_CONCURRENT: usize = 4;
pub enum ProofSyncState {
/// Range sync is active; proof sync is paused.
Idle,
/// Waiting for the beacon processor to finish importing range sync blocks.
/// The inner value counts down remaining slot ticks before activation.
Waiting(u64),
/// Proof sync is active. Each poll chooses between a range request (large slot gap)
/// or by-root fill requests (small gap) based on current chain state.
Syncing,
}

/// Proof sync subsystem for EIP-8025.
///
/// Operates as a two-state machine: `Idle` while range sync is active, `Syncing`
/// otherwise. In `Syncing`, each poll computes the slot gap between the max(finalized
/// epoch, local verified head) - peer verified head to determine the most efficient request strategy.
/// In-flight by-root and range responses are always processed regardless of state
/// transitions — the proofs are valid independent of sync progress.
/// Operates as a three-state machine: `Idle` while range sync is active, `Waiting(n)`
/// after range sync completes (counting down n slot ticks to let the beacon processor
/// finish importing blocks), and `Syncing` once active. In `Syncing`, each poll computes
/// the slot gap between the max(finalized epoch, local verified head) and peer verified
/// head to determine the most efficient request strategy. In-flight by-root and range
/// responses are always processed regardless of state transitions — the proofs are valid
/// independent of sync progress.
pub struct ProofSync<T: BeaconChainTypes> {
/// The beacon chain.
chain: Arc<BeaconChain<T>>,
Expand All @@ -71,13 +77,16 @@ pub struct ProofSync<T: BeaconChainTypes> {
peer_statuses: HashMap<PeerId, CachedExecutionProofStatus>,
/// In-flight `ExecutionProofStatus` request IDs, keyed by peer.
status_in_flight: HashMap<PeerId, ExecutionProofStatusRequestId>,
/// Number of slot ticks to wait after `start()` or a range response before issuing
/// the next `ExecutionProofsByRange` request.
activation_slots: u64,
/// Injected missing-proof list for unit testing by-root behaviour.
#[cfg(test)]
pub test_missing_proofs: Option<Vec<MissingProofInfo>>,
}

impl<T: BeaconChainTypes> ProofSync<T> {
pub fn new(chain: Arc<BeaconChain<T>>) -> Self {
pub fn new(chain: Arc<BeaconChain<T>>, activation_slots: u64) -> Self {
Self {
state: ProofSyncState::Idle,
range_request: None,
Expand All @@ -87,6 +96,7 @@ impl<T: BeaconChainTypes> ProofSync<T> {
max_concurrent: DEFAULT_MAX_CONCURRENT,
peer_statuses: HashMap::default(),
status_in_flight: HashMap::default(),
activation_slots,
#[cfg(test)]
test_missing_proofs: None,
}
Expand Down Expand Up @@ -125,11 +135,16 @@ impl<T: BeaconChainTypes> ProofSync<T> {

/// Called by `SyncManager` when range sync completes.
///
/// Kicks off peer status refreshes and transitions to `Syncing`.
/// Kicks off peer status refreshes and transitions to `Waiting`, which counts down
/// slot ticks before activating. This delay allows the beacon processor to finish
/// importing range sync blocks before proof requests go out.
pub fn start(&mut self, cx: &mut SyncNetworkContext<T>) {
debug!("ProofSync: starting");
debug!(
activation_slots = self.activation_slots,
"ProofSync: starting, waiting before activation"
);
self.refresh_peer_statuses(cx);
self.state = ProofSyncState::Syncing;
self.state = ProofSyncState::Waiting(self.activation_slots);
}

/// Called by `SyncManager` when range sync re-enters.
Expand All @@ -143,12 +158,22 @@ impl<T: BeaconChainTypes> ProofSync<T> {

/// Drive one polling cycle.
///
/// In `Syncing`, computes the slot gap and dispatches either a range request
/// (gap > `RANGE_REQUEST_THRESHOLD`) or by-root fill requests (gap ≤ threshold).
/// Waits if a range request is already in-flight or peer status polls are pending.
/// In `Waiting`, counts down the activation delay. In `Syncing`, computes the slot
/// gap and dispatches either a range request (gap > `RANGE_REQUEST_THRESHOLD`) or
/// by-root fill requests (gap ≤ threshold). Waits if a range request is already
/// in-flight or peer status polls are pending.
pub fn poll(&mut self, cx: &mut SyncNetworkContext<T>) {
if matches!(self.state, ProofSyncState::Idle) {
return;
match self.state {
ProofSyncState::Idle => return,
ProofSyncState::Waiting(0) => {
debug!("ProofSync: activation delay elapsed, transitioning to Syncing");
self.state = ProofSyncState::Syncing;
}
ProofSyncState::Waiting(ref mut n) => {
*n -= 1;
return;
}
ProofSyncState::Syncing => {}
}

// If a range request is already in-flight, wait for it to drain.
Expand Down Expand Up @@ -222,10 +247,14 @@ impl<T: BeaconChainTypes> ProofSync<T> {
}

/// Called when an `ExecutionProofsByRange` RPC stream terminates (response `None`).
///
/// Transitions back to `Waiting` to give the proof engine time to process the
/// received proofs before the next request is issued.
pub fn on_range_request_terminated(&mut self, id: &ExecutionProofsByRangeRequestId) {
if self.range_request.as_ref().map(|r| &r.id) == Some(id) {
debug!("ProofSync: range stream complete");
debug!("ProofSync: range stream complete, cooling down before next request");
self.range_request = None;
self.state = ProofSyncState::Waiting(self.activation_slots);
}
}

Expand Down
11 changes: 8 additions & 3 deletions beacon_node/network/src/sync/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,14 @@ fn test_proof_sync_range_termination_enters_fill_mode() {
assert!(rig.sync_manager.proof_sync().range_request().is_some());

rig.terminate_execution_proofs_by_range(req_id, peer_id);
assert_eq!(
rig.sync_manager.proof_sync().state(),
ProofSyncState::Syncing
// Termination transitions to Waiting to give the proof engine time to process
// received proofs before the next range request is issued.
assert!(
matches!(
rig.sync_manager.proof_sync().state(),
ProofSyncState::Waiting(_)
),
"Range termination should enter Waiting state"
);
assert!(
rig.sync_manager.proof_sync().range_request().is_none(),
Expand Down
2 changes: 1 addition & 1 deletion testing/simulator/src/local_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl<E: EthSpec> LocalNetwork<E> {
beacon_config.network.enable_execution_proof = true;
let bn_idx = self.beacon_nodes.read().len();
let _: execution_layer::test_utils::MockProofNodeClient<E> =
execution_layer::test_utils::register_mock_proof_engine(bn_idx, 0);
execution_layer::test_utils::register_mock_proof_engine(bn_idx, 400);
let mock_url =
SensitiveUrl::parse(&execution_layer::test_utils::mock_proof_engine_url(bn_idx))
.expect("mock URL is valid");
Expand Down
Loading