From 78bbb5f5cc1bb49d66168e7a424485412c4be137 Mon Sep 17 00:00:00 2001 From: Nova Date: Tue, 17 Mar 2026 15:27:10 +0000 Subject: [PATCH 1/6] feat: validator proof resigning --- beacon_node/beacon_chain/src/events.rs | 19 +- beacon_node/http_api/src/eip8025.rs | 18 ++ beacon_node/http_api/src/lib.rs | 3 + .../gossip_methods.rs | 40 +++- common/eth2/src/types.rs | 25 +++ .../validator_services/src/proof_service.rs | 182 ++++++++++++++++-- 6 files changed, 271 insertions(+), 16 deletions(-) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 5adce3f8dd2..f3464e29041 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -1,4 +1,6 @@ -pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead}; +pub use eth2::types::{ + EventKind, SseBlock, SseExecutionProofValidated, SseFinalizedCheckpoint, SseHead, +}; use tokio::sync::broadcast; use tokio::sync::broadcast::{Receiver, Sender, error::SendError}; use tracing::trace; @@ -27,6 +29,7 @@ pub struct ServerSentEventHandler { attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, block_gossip_tx: Sender>, + execution_proof_validated_tx: Sender>, } impl ServerSentEventHandler { @@ -55,6 +58,7 @@ impl ServerSentEventHandler { let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); let (block_gossip_tx, _) = broadcast::channel(capacity); + let (execution_proof_validated_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -77,6 +81,7 @@ impl ServerSentEventHandler { attester_slashing_tx, bls_to_execution_change_tx, block_gossip_tx, + execution_proof_validated_tx, } } @@ -169,6 +174,10 @@ impl ServerSentEventHandler { .block_gossip_tx .send(kind) .map(|count| log_count("block gossip", count)), + EventKind::ExecutionProofValidated(_) => self + .execution_proof_validated_tx + .send(kind) + .map(|count| log_count("execution_proof_validated", count)), }; if let Err(SendError(event)) = result { trace!(?event, "No receivers registered to listen for event"); @@ -326,4 +335,12 @@ impl ServerSentEventHandler { pub fn has_block_gossip_subscribers(&self) -> bool { self.block_gossip_tx.receiver_count() > 0 } + + pub fn subscribe_execution_proof_validated(&self) -> Receiver> { + self.execution_proof_validated_tx.subscribe() + } + + pub fn has_execution_proof_validated_subscribers(&self) -> bool { + self.execution_proof_validated_tx.receiver_count() > 0 + } } diff --git a/beacon_node/http_api/src/eip8025.rs b/beacon_node/http_api/src/eip8025.rs index a5a2dbea1df..7f56af10c6e 100644 --- a/beacon_node/http_api/src/eip8025.rs +++ b/beacon_node/http_api/src/eip8025.rs @@ -6,6 +6,7 @@ use crate::block_id::BlockId; use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2::types::{EventKind, SseExecutionProofValidated}; use execution_layer::eip8025::ProofEngine; use lighthouse_network::rpc::methods::ExecutionProofStatus; use lighthouse_network::{NetworkGlobals, PubsubMessage}; @@ -149,6 +150,23 @@ pub async fn submit_execution_proofs( // Invalid, Syncing, and NotSupported proofs must not be gossiped. match status { ProofStatus::Valid | ProofStatus::Accepted => { + // Emit SSE event for validator proof resigning + if let Some(event_handler) = chain.event_handler.as_ref() { + if event_handler.has_execution_proof_validated_subscribers() { + event_handler.register(EventKind::ExecutionProofValidated( + SseExecutionProofValidated { + execution_proof: signed_proof.message.clone(), + slot: verified_block + .map(|(_, s)| s.as_u64()) + .unwrap_or(0), + block_root: verified_block + .map(|(r, _)| r) + .unwrap_or_default(), + }, + )); + } + } + if let Err(e) = network_send.send(NetworkMessage::Publish { messages: vec![PubsubMessage::ExecutionProof(Box::new(signed_proof))], }) { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fd081b9cfb8..4319d90e83a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3235,6 +3235,9 @@ pub fn serve( api_types::EventTopic::BlockGossip => { event_handler.subscribe_block_gossip() } + api_types::EventTopic::ExecutionProofValidated => { + event_handler.subscribe_execution_proof_validated() + } }; receivers.push( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 56e3f7b42be..3c6bc0b9f9b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -36,6 +36,7 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn}; +use beacon_chain::events::{EventKind, SseExecutionProofValidated}; use types::ProofStatus; use types::{ Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar, @@ -1877,6 +1878,9 @@ impl NetworkBeaconProcessor { let proof_type = execution_proof.proof_type(); let validator_index = execution_proof.validator_index(); + // Clone the proof message before verification moves ownership + let proof_message = execution_proof.message.clone(); + // Verify the execution proof. let verification_result = self.chain.verify_execution_proof(execution_proof).await; @@ -1910,6 +1914,23 @@ impl NetworkBeaconProcessor { }); } self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Emit SSE event for validator proof resigning + if let Some(event_handler) = self.chain.event_handler.as_ref() { + if event_handler.has_execution_proof_validated_subscribers() { + event_handler.register(EventKind::ExecutionProofValidated( + SseExecutionProofValidated { + execution_proof: proof_message.clone(), + slot: verified_block + .map(|(_, s)| s.as_u64()) + .unwrap_or(0), + block_root: verified_block + .map(|(r, _)| r) + .unwrap_or_default(), + }, + )); + } + } } Ok((ProofStatus::Invalid, _)) => { debug!( @@ -1920,7 +1941,7 @@ impl NetworkBeaconProcessor { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); self.gossip_penalize_peer(peer_id, PeerAction::Fatal, "invalid_execution_proof"); } - Ok((ProofStatus::Accepted, _)) => { + Ok((ProofStatus::Accepted, verified_block)) => { debug!( ?request_root, validator_index, @@ -1928,6 +1949,23 @@ impl NetworkBeaconProcessor { "Execution proof is accepted but not fully verified" ); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Emit SSE event for validator proof resigning + if let Some(event_handler) = self.chain.event_handler.as_ref() { + if event_handler.has_execution_proof_validated_subscribers() { + event_handler.register(EventKind::ExecutionProofValidated( + SseExecutionProofValidated { + execution_proof: proof_message, + slot: verified_block + .map(|(_, s)| s.as_u64()) + .unwrap_or(0), + block_root: verified_block + .map(|(r, _)| r) + .unwrap_or_default(), + }, + )); + } + } } Ok((ProofStatus::Syncing, _)) => { debug!( diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 52fcee1184d..c7ebd474397 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1206,6 +1206,18 @@ impl<'de> ContextDeserialize<'de, ForkName> for SseExtendedPayloadAttributes { } } +/// SSE event payload for a validated execution proof (EIP-8025). +/// +/// Emitted by the beacon node when an `ExecutionProof` passes verification, +/// allowing validator clients to resign the proof with their own key. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseExecutionProofValidated { + pub execution_proof: ExecutionProof, + #[serde(with = "serde_utils::quoted_u64")] + pub slot: u64, + pub block_root: Hash256, +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "E: EthSpec", untagged)] pub enum EventKind { @@ -1230,6 +1242,7 @@ pub enum EventKind { AttesterSlashing(Box>), BlsToExecutionChange(Box), BlockGossip(Box), + ExecutionProofValidated(SseExecutionProofValidated), } impl EventKind { @@ -1256,6 +1269,7 @@ impl EventKind { EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", EventKind::BlockGossip(_) => "block_gossip", + EventKind::ExecutionProofValidated(_) => "execution_proof_validated", } } @@ -1352,6 +1366,14 @@ impl EventKind { "block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)), )?)), + "execution_proof_validated" => Ok(EventKind::ExecutionProofValidated( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!( + "Execution Proof Validated: {:?}", + e + )) + })?, + )), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1390,6 +1412,7 @@ pub enum EventTopic { ProposerSlashing, BlsToExecutionChange, BlockGossip, + ExecutionProofValidated, } impl FromStr for EventTopic { @@ -1418,6 +1441,7 @@ impl FromStr for EventTopic { "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), "block_gossip" => Ok(EventTopic::BlockGossip), + "execution_proof_validated" => Ok(EventTopic::ExecutionProofValidated), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1447,6 +1471,7 @@ impl fmt::Display for EventTopic { EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), EventTopic::BlockGossip => write!(f, "block_gossip"), + EventTopic::ExecutionProofValidated => write!(f, "execution_proof_validated"), } } } diff --git a/validator_client/validator_services/src/proof_service.rs b/validator_client/validator_services/src/proof_service.rs index 679df7cffe5..eb075daf84e 100644 --- a/validator_client/validator_services/src/proof_service.rs +++ b/validator_client/validator_services/src/proof_service.rs @@ -1,28 +1,32 @@ //! EIP-8025 Execution Proof Service //! -//! This service handles both proactive and reactive execution proof workflows: +//! This service handles proactive, reactive, and resigning execution proof workflows: //! //! 1. **Proactive Mode**: Monitors beacon chain for new blocks via SSE and requests //! proofs from the configured proof engine //! 2. **Reactive Mode**: Receives proof requests from HTTP API (proof engine callbacks) //! and signs/submits them to the beacon chain -//! -//! The service bridges the gap between external proof engines, validator keys, and -//! beacon nodes, providing a complete end-to-end execution proof flow. +//! 3. **Resigning Mode**: Subscribes to `execution_proof_validated` SSE events and +//! resigns validated proofs with each local validator's key use beacon_node_fallback::BeaconNodeFallback; use bls::PublicKey; -use eth2::types::EventTopic; +use eth2::types::{EventKind, EventTopic, SseExecutionProofValidated}; use execution_layer::NewPayloadRequest; use execution_layer::eip8025::{HttpProofEngine, ProofEngine}; use futures::StreamExt; use slot_clock::SlotClock; +use std::collections::HashMap; use std::sync::Arc; +use std::time::{Duration, Instant}; use task_executor::TaskExecutor; +use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; use types::execution::eip8025::ProofAttributes; -use types::{BeaconBlock, Epoch, EthSpec, ExecutionProof}; -use validator_store::ValidatorStore; +use types::{BeaconBlock, Epoch, EthSpec, ExecutionProof, Hash256}; +use validator_store::{DoppelgangerStatus, ValidatorStore}; + +use bls::PublicKeyBytes; /// Background service for execution proof handling pub struct ProofService { @@ -36,6 +40,8 @@ struct Inner { slot_clock: T, executor: TaskExecutor, proof_types: Vec, + /// Tracks (validator_pubkey, new_payload_request_root) to prevent resigning loops. + resigned_proofs: RwLock>, } impl ProofService { @@ -60,22 +66,26 @@ impl ProofService) -> Result<(), String> { - // Only start monitoring if proof engine is configured + // Proactive: monitor blocks for proof requests let inner = self.inner.clone(); - let service_fut = async move { - inner.monitor_blocks_task().await; - }; self.inner .executor - .spawn(service_fut, "proof_service_monitor"); + .spawn(async move { inner.monitor_blocks_task().await }, "proof_service_monitor"); - info!("Proof service started - monitoring for new blocks"); + // Resigning: monitor validated proofs and resign with local validator keys + let inner2 = self.inner.clone(); + self.inner + .executor + .spawn(async move { inner2.monitor_validated_proofs_task().await }, "proof_service_resigning"); + + info!("Proof service started - monitoring for new blocks and validated proofs"); Ok(()) } @@ -147,6 +157,47 @@ impl Inner { } } + /// Resigning: Monitor validated proofs and resign with local validator keys + async fn monitor_validated_proofs_task(self: Arc) { + info!("Starting proof resigning service via SSE"); + + loop { + match self.subscribe_to_validated_proofs().await { + Ok(mut stream) => { + info!("Subscribed to execution_proof_validated events"); + + while let Some(event_result) = stream.next().await { + match event_result { + Ok(EventKind::ExecutionProofValidated(proof_event)) => { + self.handle_validated_proof(proof_event).await; + } + Ok(_) => { + debug!("Received non-proof event in validated proof stream"); + } + Err(e) => { + warn!( + error = %e, + "Error receiving proof event, will reconnect" + ); + break; + } + } + } + + warn!("Validated proof event stream ended, reconnecting..."); + } + Err(e) => { + error!( + error = %e, + "Failed to subscribe to proof events, retrying..." + ); + } + } + + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + /// Helper method to establish SSE subscription with beacon node fallback async fn subscribe_to_blocks( &self, @@ -162,6 +213,22 @@ impl Inner { .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) } + /// Helper method to establish SSE subscription for validated proof events + async fn subscribe_to_validated_proofs( + &self, + ) -> Result< + impl futures::Stream, eth2::Error>>, + String, + > { + self.beacon_nodes + .first_success(|node| async move { + node.get_events::(&[EventTopic::ExecutionProofValidated]) + .await + }) + .await + .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) + } + /// Handle a new block event by requesting proofs from proof engine async fn handle_block_event(&self, block: &BeaconBlock, slot: types::Slot) { let block_root = block.canonical_root(); @@ -213,6 +280,93 @@ impl Inner { } } + /// Handle a validated proof event by resigning with each local validator key + async fn handle_validated_proof(&self, event: SseExecutionProofValidated) { + let execution_proof = event.execution_proof; + let request_root = execution_proof.public_input.new_payload_request_root; + + let epoch = self + .slot_clock + .now() + .map(|slot| slot.epoch(S::E::slots_per_epoch())) + .unwrap_or(Epoch::new(0)); + + // Get all validator pubkeys (non-slashable — bypass doppelganger) + let all_pubkeys: Vec = + self.validator_store.voting_pubkeys(DoppelgangerStatus::ignored); + + for pubkey in all_pubkeys { + // Dedup: skip if this validator already resigned this proof + let dedup_key = (pubkey, request_root); + { + let cache = self.resigned_proofs.read().await; + if cache.contains_key(&dedup_key) { + debug!( + ?pubkey, + ?request_root, + "Skipping already-resigned proof" + ); + continue; + } + } + + // Sign the proof with this validator's key + match self + .validator_store + .sign_execution_proof(pubkey, execution_proof.clone(), epoch) + .await + { + Ok(signed_proof) => { + let signed_proof_clone = signed_proof.clone(); + match self + .beacon_nodes + .first_success(move |node| { + let proof = signed_proof_clone.clone(); + async move { node.post_beacon_execution_proofs(&[proof]).await } + }) + .await + { + Ok(_) => { + info!( + ?pubkey, + ?request_root, + "Resigned proof submitted" + ); + self.resigned_proofs + .write() + .await + .insert(dedup_key, Instant::now()); + } + Err(e) => { + warn!( + ?pubkey, + error = %e, + "Failed to submit resigned proof" + ); + } + } + } + Err(e) => { + warn!( + ?pubkey, + error = ?e, + "Failed to sign proof for validator" + ); + } + } + } + + // Periodic cache pruning (entries older than ~2 epochs ≈ 12.8 min) + self.prune_resigned_cache().await; + } + + /// Remove expired entries from the dedup cache + async fn prune_resigned_cache(&self) { + let cutoff = Instant::now() - Duration::from_secs(768); + let mut cache = self.resigned_proofs.write().await; + cache.retain(|_, timestamp| *timestamp > cutoff); + } + /// Reactive: Sign and submit proof (called by HTTP API) async fn sign_and_submit_proof( &self, From ba18da6a598c85ab44f2b4162bfc4ee64429b45a Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 17 Mar 2026 20:23:30 +0100 Subject: [PATCH 2/6] refactor --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/http_api/src/eip8025.rs | 18 -- .../gossip_methods.rs | 60 +++---- common/eth2/src/types.rs | 3 +- .../validator_services/src/proof_service.rs | 167 +++++------------- 5 files changed, 62 insertions(+), 188 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b4610605aac..349bdfdcbe2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7567,7 +7567,7 @@ impl BeaconChain { // Step 3: Update the fork choice if the proof engine returns valid. // The proof engine returns valid if the proof is valid and the criteria for the associated block root to be considered valid are met. // The proof engine returns ACCEPTED if the proof is valid but block validity criteria are not met. - if verification_result.is_valid() { + if verification_result.is_valid() || verification_result.is_accepted() { let request_root = signed_proof.request_root(); // Look up the beacon block root from request root diff --git a/beacon_node/http_api/src/eip8025.rs b/beacon_node/http_api/src/eip8025.rs index 7f56af10c6e..a5a2dbea1df 100644 --- a/beacon_node/http_api/src/eip8025.rs +++ b/beacon_node/http_api/src/eip8025.rs @@ -6,7 +6,6 @@ use crate::block_id::BlockId; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2::types::{EventKind, SseExecutionProofValidated}; use execution_layer::eip8025::ProofEngine; use lighthouse_network::rpc::methods::ExecutionProofStatus; use lighthouse_network::{NetworkGlobals, PubsubMessage}; @@ -150,23 +149,6 @@ pub async fn submit_execution_proofs( // Invalid, Syncing, and NotSupported proofs must not be gossiped. match status { ProofStatus::Valid | ProofStatus::Accepted => { - // Emit SSE event for validator proof resigning - if let Some(event_handler) = chain.event_handler.as_ref() { - if event_handler.has_execution_proof_validated_subscribers() { - event_handler.register(EventKind::ExecutionProofValidated( - SseExecutionProofValidated { - execution_proof: signed_proof.message.clone(), - slot: verified_block - .map(|(_, s)| s.as_u64()) - .unwrap_or(0), - block_root: verified_block - .map(|(r, _)| r) - .unwrap_or_default(), - }, - )); - } - } - if let Err(e) = network_send.send(NetworkMessage::Publish { messages: vec![PubsubMessage::ExecutionProof(Box::new(signed_proof))], }) { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 3c6bc0b9f9b..a01cf45ba0d 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -7,6 +7,7 @@ use crate::{ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use beacon_chain::events::{EventKind, SseExecutionProofValidated}; use beacon_chain::store::Error; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, @@ -36,7 +37,6 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn}; -use beacon_chain::events::{EventKind, SseExecutionProofValidated}; use types::ProofStatus; use types::{ Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, DataColumnSidecar, @@ -1878,12 +1878,26 @@ impl NetworkBeaconProcessor { let proof_type = execution_proof.proof_type(); let validator_index = execution_proof.validator_index(); - // Clone the proof message before verification moves ownership - let proof_message = execution_proof.message.clone(); + // Extract the inner proof before moving execution_proof into verification. + let execution_proof_message = execution_proof.message.clone(); // Verify the execution proof. let verification_result = self.chain.verify_execution_proof(execution_proof).await; + if let Ok((proof_status, block)) = &verification_result + && (proof_status.is_valid() || proof_status.is_accepted()) + && let Some(event_handler) = self.chain.event_handler.as_ref() + && event_handler.has_execution_proof_validated_subscribers() + && let Some((_block_root, slot)) = block + { + event_handler.register(EventKind::ExecutionProofValidated( + SseExecutionProofValidated { + execution_proof: execution_proof_message, + epoch: slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(), + }, + )); + } + match verification_result { // TODO: split our error types and penalize accordingly Err(e) => { @@ -1913,24 +1927,7 @@ impl NetworkBeaconProcessor { block_root, }); } - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - - // Emit SSE event for validator proof resigning - if let Some(event_handler) = self.chain.event_handler.as_ref() { - if event_handler.has_execution_proof_validated_subscribers() { - event_handler.register(EventKind::ExecutionProofValidated( - SseExecutionProofValidated { - execution_proof: proof_message.clone(), - slot: verified_block - .map(|(_, s)| s.as_u64()) - .unwrap_or(0), - block_root: verified_block - .map(|(r, _)| r) - .unwrap_or_default(), - }, - )); - } - } + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } Ok((ProofStatus::Invalid, _)) => { debug!( @@ -1941,31 +1938,14 @@ impl NetworkBeaconProcessor { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); self.gossip_penalize_peer(peer_id, PeerAction::Fatal, "invalid_execution_proof"); } - Ok((ProofStatus::Accepted, verified_block)) => { + Ok((ProofStatus::Accepted, _)) => { debug!( ?request_root, validator_index, proof_type, "Execution proof is accepted but not fully verified" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); - - // Emit SSE event for validator proof resigning - if let Some(event_handler) = self.chain.event_handler.as_ref() { - if event_handler.has_execution_proof_validated_subscribers() { - event_handler.register(EventKind::ExecutionProofValidated( - SseExecutionProofValidated { - execution_proof: proof_message, - slot: verified_block - .map(|(_, s)| s.as_u64()) - .unwrap_or(0), - block_root: verified_block - .map(|(r, _)| r) - .unwrap_or_default(), - }, - )); - } - } + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } Ok((ProofStatus::Syncing, _)) => { debug!( diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c7ebd474397..5ec2887d2c3 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1214,8 +1214,7 @@ impl<'de> ContextDeserialize<'de, ForkName> for SseExtendedPayloadAttributes { pub struct SseExecutionProofValidated { pub execution_proof: ExecutionProof, #[serde(with = "serde_utils::quoted_u64")] - pub slot: u64, - pub block_root: Hash256, + pub epoch: u64, } #[derive(PartialEq, Debug, Serialize, Clone)] diff --git a/validator_client/validator_services/src/proof_service.rs b/validator_client/validator_services/src/proof_service.rs index eb075daf84e..c7b15593466 100644 --- a/validator_client/validator_services/src/proof_service.rs +++ b/validator_client/validator_services/src/proof_service.rs @@ -1,13 +1,6 @@ //! EIP-8025 Execution Proof Service //! -//! This service handles proactive, reactive, and resigning execution proof workflows: -//! -//! 1. **Proactive Mode**: Monitors beacon chain for new blocks via SSE and requests -//! proofs from the configured proof engine -//! 2. **Reactive Mode**: Receives proof requests from HTTP API (proof engine callbacks) -//! and signs/submits them to the beacon chain -//! 3. **Resigning Mode**: Subscribes to `execution_proof_validated` SSE events and -//! resigns validated proofs with each local validator's key +//! This service handles execution proof requests, signing and resigning execution proof workflows: use beacon_node_fallback::BeaconNodeFallback; use bls::PublicKey; @@ -73,20 +66,12 @@ impl ProofService) -> Result<(), String> { - // Proactive: monitor blocks for proof requests let inner = self.inner.clone(); - self.inner - .executor - .spawn(async move { inner.monitor_blocks_task().await }, "proof_service_monitor"); - - // Resigning: monitor validated proofs and resign with local validator keys - let inner2 = self.inner.clone(); - self.inner - .executor - .spawn(async move { inner2.monitor_validated_proofs_task().await }, "proof_service_resigning"); - + self.inner.executor.spawn( + async move { inner.monitor_events_task().await }, + "proof_service_monitor", + ); info!("Proof service started - monitoring for new blocks and validated proofs"); - Ok(()) } @@ -107,20 +92,37 @@ impl ProofService Inner { - /// Proactive: Monitor beacon node for new blocks and request proofs - async fn monitor_blocks_task(self: Arc) { - info!("Starting proof service block monitoring via SSE"); + /// Subscribe to both `BlockFull` and `ExecutionProofValidated` events via a single SSE stream. + async fn subscribe_to_events( + &self, + ) -> Result< + impl futures::Stream, eth2::Error>>, + String, + > { + self.beacon_nodes + .first_success(|node| async move { + node.get_events::(&[ + EventTopic::BlockFull, + EventTopic::ExecutionProofValidated, + ]) + .await + }) + .await + .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) + } + + /// Monitor block and validated-proof events over a single SSE connection. + async fn monitor_events_task(self: Arc) { + info!("Starting proof service event monitoring via SSE"); loop { - // Attempt to subscribe to block events from beacon node - match self.subscribe_to_blocks().await { + match self.subscribe_to_events().await { Ok(mut stream) => { - info!("Successfully subscribed to block events"); + info!("Successfully subscribed to block and execution proof events"); - // Process events from the stream while let Some(event_result) = stream.next().await { match event_result { - Ok(eth2::types::EventKind::BlockFull(block_event)) => { + Ok(EventKind::BlockFull(block_event)) => { let block = block_event.data; if block.execution_optimistic { debug!( @@ -130,67 +132,21 @@ impl Inner { } self.handle_block_event(&block.block, block.slot).await; } - Ok(_) => { - // Ignore other event types (shouldn't happen with our topic filter) - debug!("Received non-block event in block_full stream"); - } - Err(e) => { - warn!( - error = %e, - "Error receiving block event, will reconnect" - ); - break; // Break inner loop to reconnect - } - } - } - - // Stream ended or errored - reconnect - warn!("Block event stream ended, reconnecting..."); - } - Err(e) => { - error!( - error = %e, - "Failed to subscribe to block events, retrying..." - ); - } - } - } - } - - /// Resigning: Monitor validated proofs and resign with local validator keys - async fn monitor_validated_proofs_task(self: Arc) { - info!("Starting proof resigning service via SSE"); - - loop { - match self.subscribe_to_validated_proofs().await { - Ok(mut stream) => { - info!("Subscribed to execution_proof_validated events"); - - while let Some(event_result) = stream.next().await { - match event_result { Ok(EventKind::ExecutionProofValidated(proof_event)) => { self.handle_validated_proof(proof_event).await; } - Ok(_) => { - debug!("Received non-proof event in validated proof stream"); - } + Ok(_) => {} Err(e) => { - warn!( - error = %e, - "Error receiving proof event, will reconnect" - ); + warn!(error = %e, "Error receiving event, will reconnect"); break; } } } - warn!("Validated proof event stream ended, reconnecting..."); + warn!("Event stream ended, reconnecting..."); } Err(e) => { - error!( - error = %e, - "Failed to subscribe to proof events, retrying..." - ); + error!(error = %e, "Failed to subscribe to events, retrying..."); } } @@ -198,37 +154,6 @@ impl Inner { } } - /// Helper method to establish SSE subscription with beacon node fallback - async fn subscribe_to_blocks( - &self, - ) -> Result< - impl futures::Stream, eth2::Error>>, - String, - > { - self.beacon_nodes - .first_success( - |node| async move { node.get_events::(&[EventTopic::BlockFull]).await }, - ) - .await - .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) - } - - /// Helper method to establish SSE subscription for validated proof events - async fn subscribe_to_validated_proofs( - &self, - ) -> Result< - impl futures::Stream, eth2::Error>>, - String, - > { - self.beacon_nodes - .first_success(|node| async move { - node.get_events::(&[EventTopic::ExecutionProofValidated]) - .await - }) - .await - .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) - } - /// Handle a new block event by requesting proofs from proof engine async fn handle_block_event(&self, block: &BeaconBlock, slot: types::Slot) { let block_root = block.canonical_root(); @@ -284,16 +209,12 @@ impl Inner { async fn handle_validated_proof(&self, event: SseExecutionProofValidated) { let execution_proof = event.execution_proof; let request_root = execution_proof.public_input.new_payload_request_root; - - let epoch = self - .slot_clock - .now() - .map(|slot| slot.epoch(S::E::slots_per_epoch())) - .unwrap_or(Epoch::new(0)); + let epoch = Epoch::new(event.epoch); // Get all validator pubkeys (non-slashable — bypass doppelganger) - let all_pubkeys: Vec = - self.validator_store.voting_pubkeys(DoppelgangerStatus::ignored); + let all_pubkeys: Vec = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::ignored); for pubkey in all_pubkeys { // Dedup: skip if this validator already resigned this proof @@ -301,11 +222,7 @@ impl Inner { { let cache = self.resigned_proofs.read().await; if cache.contains_key(&dedup_key) { - debug!( - ?pubkey, - ?request_root, - "Skipping already-resigned proof" - ); + debug!(?pubkey, ?request_root, "Skipping already-resigned proof"); continue; } } @@ -327,11 +244,7 @@ impl Inner { .await { Ok(_) => { - info!( - ?pubkey, - ?request_root, - "Resigned proof submitted" - ); + info!(?pubkey, ?request_root, "Resigned proof submitted"); self.resigned_proofs .write() .await From ecfbdee43079e08330a89261caa4126770ef8654 Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 17 Mar 2026 20:37:52 +0100 Subject: [PATCH 3/6] clean up --- beacon_node/beacon_chain/src/events.rs | 2 +- .../validator_services/src/proof_service.rs | 107 ++++++------------ 2 files changed, 35 insertions(+), 74 deletions(-) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index f3464e29041..294ad767ad5 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -177,7 +177,7 @@ impl ServerSentEventHandler { EventKind::ExecutionProofValidated(_) => self .execution_proof_validated_tx .send(kind) - .map(|count| log_count("execution_proof_validated", count)), + .map(|count| log_count("execution proof validated", count)), }; if let Err(SendError(event)) = result { trace!(?event, "No receivers registered to listen for event"); diff --git a/validator_client/validator_services/src/proof_service.rs b/validator_client/validator_services/src/proof_service.rs index c7b15593466..1c2796ae75d 100644 --- a/validator_client/validator_services/src/proof_service.rs +++ b/validator_client/validator_services/src/proof_service.rs @@ -1,6 +1,6 @@ //! EIP-8025 Execution Proof Service //! -//! This service handles execution proof requests, signing and resigning execution proof workflows: +//! This service handles execution proof requests, signing and resigning workflows. use beacon_node_fallback::BeaconNodeFallback; use bls::PublicKey; @@ -9,18 +9,14 @@ use execution_layer::NewPayloadRequest; use execution_layer::eip8025::{HttpProofEngine, ProofEngine}; use futures::StreamExt; use slot_clock::SlotClock; -use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use task_executor::TaskExecutor; -use tokio::sync::RwLock; use tracing::{debug, error, info, warn}; use types::execution::eip8025::ProofAttributes; -use types::{BeaconBlock, Epoch, EthSpec, ExecutionProof, Hash256}; +use types::{BeaconBlock, Epoch, EthSpec, ExecutionProof}; use validator_store::{DoppelgangerStatus, ValidatorStore}; -use bls::PublicKeyBytes; - /// Background service for execution proof handling pub struct ProofService { inner: Arc>, @@ -33,8 +29,6 @@ struct Inner { slot_clock: T, executor: TaskExecutor, proof_types: Vec, - /// Tracks (validator_pubkey, new_payload_request_root) to prevent resigning loops. - resigned_proofs: RwLock>, } impl ProofService { @@ -59,7 +53,6 @@ impl ProofService Inner { } } - /// Handle a validated proof event by resigning with each local validator key + /// Handle a validated proof event by resigning with the first local validator key async fn handle_validated_proof(&self, event: SseExecutionProofValidated) { let execution_proof = event.execution_proof; - let request_root = execution_proof.public_input.new_payload_request_root; let epoch = Epoch::new(event.epoch); - // Get all validator pubkeys (non-slashable — bypass doppelganger) - let all_pubkeys: Vec = self + let Some(pubkey) = self .validator_store - .voting_pubkeys(DoppelgangerStatus::ignored); - - for pubkey in all_pubkeys { - // Dedup: skip if this validator already resigned this proof - let dedup_key = (pubkey, request_root); - { - let cache = self.resigned_proofs.read().await; - if cache.contains_key(&dedup_key) { - debug!(?pubkey, ?request_root, "Skipping already-resigned proof"); - continue; - } - } + .voting_pubkeys::, _>(DoppelgangerStatus::ignored) + .first() + .cloned() + else { + warn!("No local validators available to resign proof"); + return; + }; - // Sign the proof with this validator's key - match self - .validator_store - .sign_execution_proof(pubkey, execution_proof.clone(), epoch) - .await - { - Ok(signed_proof) => { - let signed_proof_clone = signed_proof.clone(); - match self - .beacon_nodes - .first_success(move |node| { - let proof = signed_proof_clone.clone(); - async move { node.post_beacon_execution_proofs(&[proof]).await } - }) - .await - { - Ok(_) => { - info!(?pubkey, ?request_root, "Resigned proof submitted"); - self.resigned_proofs - .write() - .await - .insert(dedup_key, Instant::now()); - } - Err(e) => { - warn!( - ?pubkey, - error = %e, - "Failed to submit resigned proof" - ); - } + match self + .validator_store + .sign_execution_proof(pubkey, execution_proof, epoch) + .await + { + Ok(signed_proof) => { + match self + .beacon_nodes + .first_success(move |node| { + let proof = signed_proof.clone(); + async move { node.post_beacon_execution_proofs(&[proof]).await } + }) + .await + { + Ok(_) => { + info!(?pubkey, "Resigned proof submitted"); + } + Err(e) => { + warn!(?pubkey, error = %e, "Failed to submit resigned proof"); } - } - Err(e) => { - warn!( - ?pubkey, - error = ?e, - "Failed to sign proof for validator" - ); } } + Err(e) => { + warn!(?pubkey, error = ?e, "Failed to sign proof for validator"); + } } - - // Periodic cache pruning (entries older than ~2 epochs ≈ 12.8 min) - self.prune_resigned_cache().await; - } - - /// Remove expired entries from the dedup cache - async fn prune_resigned_cache(&self) { - let cutoff = Instant::now() - Duration::from_secs(768); - let mut cache = self.resigned_proofs.write().await; - cache.retain(|_, timestamp| *timestamp > cutoff); } /// Reactive: Sign and submit proof (called by HTTP API) From 0e1f562508e7c966c65028e83d0f540da4bcbece Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 17 Mar 2026 21:11:58 +0100 Subject: [PATCH 4/6] deprecate SseBlockFull --- beacon_node/beacon_chain/src/beacon_chain.rs | 21 +----- beacon_node/beacon_chain/src/events.rs | 15 ---- beacon_node/http_api/src/lib.rs | 3 - common/eth2/src/types.rs | 67 ------------------ .../validator_services/src/proof_service.rs | 70 ++++++++++--------- 5 files changed, 38 insertions(+), 138 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 349bdfdcbe2..32ad072c9e8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -77,8 +77,7 @@ use crate::{ use bls::{PublicKey, PublicKeyBytes, Signature}; use eth2::beacon_response::ForkVersionedResponse; use eth2::types::{ - EventKind, SseBlobSidecar, SseBlock, SseBlockFull, SseDataColumnSidecar, - SseExtendedPayloadAttributes, + EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, }; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, @@ -4316,9 +4315,6 @@ impl BeaconChain { payload_verification_status: PayloadVerificationStatus, current_slot: Slot, ) { - // TODO: Optimise this so we don't have to clone. - let beacon_block = Arc::unwrap_or_clone(signed_block.clone()); - let (beacon_block, _) = beacon_block.deconstruct(); let block = signed_block.message(); // Only present some metrics for blocks from the previous epoch or later. @@ -4361,21 +4357,6 @@ impl BeaconChain { execution_optimistic: payload_verification_status.is_optimistic(), })); } - - // Emit BlockFull event if there are block_full subscribers - if event_handler.has_block_full_subscribers() { - let slot = block.slot(); - // Convert BeaconBlockRef to owned BeaconBlock for the event - event_handler.register(EventKind::BlockFull(Box::new(ForkVersionedResponse { - data: SseBlockFull { - slot, - block: beacon_block, - execution_optimistic: payload_verification_status.is_optimistic(), - }, - metadata: Default::default(), - version: self.spec.fork_name_at_slot::(slot), - }))); - } } // Do not trigger light_client server update producer for old blocks, to extra work diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 294ad767ad5..4684db96ba9 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -12,7 +12,6 @@ pub struct ServerSentEventHandler { attestation_tx: Sender>, single_attestation_tx: Sender>, block_tx: Sender>, - block_full_tx: Sender>, blob_sidecar_tx: Sender>, data_column_sidecar_tx: Sender>, finalized_tx: Sender>, @@ -41,7 +40,6 @@ impl ServerSentEventHandler { let (attestation_tx, _) = broadcast::channel(capacity); let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); - let (block_full_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); let (data_column_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); @@ -64,7 +62,6 @@ impl ServerSentEventHandler { attestation_tx, single_attestation_tx, block_tx, - block_full_tx, blob_sidecar_tx, data_column_sidecar_tx, finalized_tx, @@ -106,10 +103,6 @@ impl ServerSentEventHandler { .block_tx .send(kind) .map(|count| log_count("block", count)), - EventKind::BlockFull(_) => self - .block_full_tx - .send(kind) - .map(|count| log_count("block_full", count)), EventKind::BlobSidecar(_) => self .blob_sidecar_tx .send(kind) @@ -196,10 +189,6 @@ impl ServerSentEventHandler { self.block_tx.subscribe() } - pub fn subscribe_block_full(&self) -> Receiver> { - self.block_full_tx.subscribe() - } - pub fn subscribe_blob_sidecar(&self) -> Receiver> { self.blob_sidecar_tx.subscribe() } @@ -276,10 +265,6 @@ impl ServerSentEventHandler { self.block_tx.receiver_count() > 0 } - pub fn has_block_full_subscribers(&self) -> bool { - self.block_full_tx.receiver_count() > 0 - } - pub fn has_blob_sidecar_subscribers(&self) -> bool { self.blob_sidecar_tx.receiver_count() > 0 } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4319d90e83a..b84d0068cf8 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3181,9 +3181,6 @@ pub fn serve( let receiver = match topic { api_types::EventTopic::Head => event_handler.subscribe_head(), api_types::EventTopic::Block => event_handler.subscribe_block(), - api_types::EventTopic::BlockFull => { - event_handler.subscribe_block_full() - } api_types::EventTopic::BlobSidecar => { event_handler.subscribe_blob_sidecar() } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 5ec2887d2c3..2db5967d846 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -980,38 +980,6 @@ pub struct SseBlock { pub execution_optimistic: bool, } -#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] -#[serde(bound = "E: EthSpec")] -pub struct SseBlockFull { - pub slot: Slot, - pub block: BeaconBlock, - pub execution_optimistic: bool, -} - -#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] -struct SseBlockFullGeneric { - pub slot: Slot, - pub block: T, - pub execution_optimistic: bool, -} - -type VersionedSseBlockFull = ForkVersionedResponse>; - -impl<'de, E: EthSpec> ContextDeserialize<'de, ForkName> for SseBlockFull { - fn context_deserialize(deserializer: D, context: ForkName) -> Result - where - D: Deserializer<'de>, - { - let helper = SseBlockFullGeneric::::deserialize(deserializer)?; - Ok(SseBlockFull { - slot: helper.slot, - block: BeaconBlock::context_deserialize(helper.block, context) - .map_err(serde::de::Error::custom)?, - execution_optimistic: helper.execution_optimistic, - }) - } -} - #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseBlobSidecar { pub block_root: Hash256, @@ -1223,7 +1191,6 @@ pub enum EventKind { Attestation(Box>), SingleAttestation(Box), Block(SseBlock), - BlockFull(Box>), BlobSidecar(SseBlobSidecar), DataColumnSidecar(SseDataColumnSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), @@ -1249,7 +1216,6 @@ impl EventKind { match self { EventKind::Head(_) => "head", EventKind::Block(_) => "block", - EventKind::BlockFull(_) => "block_full", EventKind::BlobSidecar(_) => "blob_sidecar", EventKind::DataColumnSidecar(_) => "data_column_sidecar", EventKind::Attestation(_) => "attestation", @@ -1285,9 +1251,6 @@ impl EventKind { "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), - "block_full" => Ok(EventKind::BlockFull(serde_json::from_str(data).map_err( - |e| ServerError::InvalidServerSentEvent(format!("Block Full: {:?}", e)), - )?)), "blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)), )?)), @@ -1392,7 +1355,6 @@ pub struct EventQuery { pub enum EventTopic { Head, Block, - BlockFull, BlobSidecar, DataColumnSidecar, Attestation, @@ -1421,7 +1383,6 @@ impl FromStr for EventTopic { match s { "head" => Ok(EventTopic::Head), "block" => Ok(EventTopic::Block), - "block_full" => Ok(EventTopic::BlockFull), "blob_sidecar" => Ok(EventTopic::BlobSidecar), "data_column_sidecar" => Ok(EventTopic::DataColumnSidecar), "attestation" => Ok(EventTopic::Attestation), @@ -1451,7 +1412,6 @@ impl fmt::Display for EventTopic { match self { EventTopic::Head => write!(f, "head"), EventTopic::Block => write!(f, "block"), - EventTopic::BlockFull => write!(f, "block_full"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"), EventTopic::Attestation => write!(f, "attestation"), @@ -2589,31 +2549,4 @@ mod test { let roundtrip = O::context_deserialize::(deserializer, fork_name).unwrap(); assert_eq!(original, roundtrip); } - - #[test] - fn test_versioned_sse_block_full_round_trip() { - let rng = &mut XorShiftRng::from_seed([42; 16]); - for fork_name in ForkName::list_all() { - let beacon_block = map_fork_name!(fork_name, BeaconBlock, <_>::random_for_test(rng)); - let slot = Slot::random_for_test(rng); - - let versioned_response = VersionedSseBlockFull:: { - version: fork_name, - metadata: EmptyMetadata {}, - data: SseBlockFull { - slot, - block: beacon_block, - execution_optimistic: true, - }, - }; - - let json_str = serde_json::to_string(&versioned_response).unwrap(); - let deserialized: VersionedSseBlockFull = - serde_json::from_str(&json_str).unwrap(); - - assert_eq!(versioned_response, deserialized); - assert!(deserialized.data.execution_optimistic); - println!("fork_name: {:?} PASSED", fork_name); - } - } } diff --git a/validator_client/validator_services/src/proof_service.rs b/validator_client/validator_services/src/proof_service.rs index 1c2796ae75d..1febe1a1e79 100644 --- a/validator_client/validator_services/src/proof_service.rs +++ b/validator_client/validator_services/src/proof_service.rs @@ -4,7 +4,7 @@ use beacon_node_fallback::BeaconNodeFallback; use bls::PublicKey; -use eth2::types::{EventKind, EventTopic, SseExecutionProofValidated}; +use eth2::types::{BlockId, EventKind, EventTopic, SseExecutionProofValidated}; use execution_layer::NewPayloadRequest; use execution_layer::eip8025::{HttpProofEngine, ProofEngine}; use futures::StreamExt; @@ -14,7 +14,7 @@ use std::time::Duration; use task_executor::TaskExecutor; use tracing::{debug, error, info, warn}; use types::execution::eip8025::ProofAttributes; -use types::{BeaconBlock, Epoch, EthSpec, ExecutionProof}; +use types::{Epoch, EthSpec, ExecutionProof, Hash256}; use validator_store::{DoppelgangerStatus, ValidatorStore}; /// Background service for execution proof handling @@ -85,7 +85,7 @@ impl ProofService Inner { - /// Subscribe to both `BlockFull` and `ExecutionProofValidated` events via a single SSE stream. + /// Subscribe to both `Block` and `ExecutionProofValidated` events via a single SSE stream. async fn subscribe_to_events( &self, ) -> Result< @@ -94,11 +94,8 @@ impl Inner { > { self.beacon_nodes .first_success(|node| async move { - node.get_events::(&[ - EventTopic::BlockFull, - EventTopic::ExecutionProofValidated, - ]) - .await + node.get_events::(&[EventTopic::Block, EventTopic::ExecutionProofValidated]) + .await }) .await .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) @@ -115,15 +112,16 @@ impl Inner { while let Some(event_result) = stream.next().await { match event_result { - Ok(EventKind::BlockFull(block_event)) => { - let block = block_event.data; - if block.execution_optimistic { + Ok(EventKind::Block(sse_block)) => { + if sse_block.execution_optimistic { debug!( - slot = block.slot.as_u64(), - "Received execution optimistic block event" + slot = sse_block.slot.as_u64(), + "Skipping execution optimistic block" ); + continue; } - self.handle_block_event(&block.block, block.slot).await; + self.handle_block_event(sse_block.block, sse_block.slot) + .await; } Ok(EventKind::ExecutionProofValidated(proof_event)) => { self.handle_validated_proof(proof_event).await; @@ -147,35 +145,45 @@ impl Inner { } } - /// Handle a new block event by requesting proofs from proof engine - async fn handle_block_event(&self, block: &BeaconBlock, slot: types::Slot) { - let block_root = block.canonical_root(); - + /// Handle a new block event by fetching the full block via RPC then requesting proofs + async fn handle_block_event(&self, block_root: Hash256, slot: types::Slot) { info!( slot = slot.as_u64(), block = %block_root, - "New block detected, requesting proofs from proof engine" + "New block detected, fetching full block via RPC" ); - // Construct NewPayloadRequest from beacon block - let new_payload_request = match NewPayloadRequest::try_from(block.to_ref()) { + let signed_block = match self + .beacon_nodes + .first_success(|node| async move { + node.get_beacon_blocks::(BlockId::Root(block_root)) + .await + }) + .await + { + Ok(Some(response)) => response.data().clone(), + Ok(None) => { + warn!(block = %block_root, "Block not found on beacon node"); + return; + } + Err(e) => { + error!(block = %block_root, error = %e, "Failed to fetch block via RPC"); + return; + } + }; + + let new_payload_request = match NewPayloadRequest::try_from(signed_block.message()) { Ok(req) => req, Err(e) => { - error!( - error = ?e, - block = %block_root, - "Failed to construct NewPayloadRequest from block" - ); + error!(block = %block_root, error = ?e, "Failed to construct NewPayloadRequest"); return; } }; - // Use configured proof types let proof_attributes = ProofAttributes { proof_types: self.proof_types.clone(), }; - // Request proofs from proof engine - HttpProofEngine handles JSON serialization match self .proof_engine .request_proofs(new_payload_request, proof_attributes) @@ -189,11 +197,7 @@ impl Inner { ); } Err(e) => { - error!( - error = ?e, - block = %block_root, - "Failed to request proofs from proof engine" - ); + error!(block = %block_root, error = ?e, "Failed to request proofs from proof engine"); } } } From 9b262924e176f8bc763848bc196d6f4e44aec6d9 Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 17 Mar 2026 21:19:38 +0100 Subject: [PATCH 5/6] gossip behaviour --- .../src/network_beacon_processor/gossip_methods.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index a01cf45ba0d..f2fe214816b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1884,7 +1884,9 @@ impl NetworkBeaconProcessor { // Verify the execution proof. let verification_result = self.chain.verify_execution_proof(execution_proof).await; - if let Ok((proof_status, block)) = &verification_result + // If we have a execution proof subscriber we assume a validator will resign the proof and therefore we do not propagate this proof to peers. + // We will wait for the validator to sign and submit the proof for gossip. + let _gossip_behaviour = if let Ok((proof_status, block)) = &verification_result && (proof_status.is_valid() || proof_status.is_accepted()) && let Some(event_handler) = self.chain.event_handler.as_ref() && event_handler.has_execution_proof_validated_subscribers() @@ -1896,7 +1898,10 @@ impl NetworkBeaconProcessor { epoch: slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(), }, )); - } + MessageAcceptance::Ignore + } else { + MessageAcceptance::Accept + }; match verification_result { // TODO: split our error types and penalize accordingly @@ -1927,7 +1932,7 @@ impl NetworkBeaconProcessor { block_root, }); } - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + self.propagate_validation_result(message_id, peer_id, _gossip_behaviour); } Ok((ProofStatus::Invalid, _)) => { debug!( @@ -1945,7 +1950,7 @@ impl NetworkBeaconProcessor { proof_type, "Execution proof is accepted but not fully verified" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + self.propagate_validation_result(message_id, peer_id, _gossip_behaviour); } Ok((ProofStatus::Syncing, _)) => { debug!( From 61116c382c02055617ce246719bd95186446f2ad Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 17 Mar 2026 21:20:44 +0100 Subject: [PATCH 6/6] gossip behaviour --- .../network/src/network_beacon_processor/gossip_methods.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index f2fe214816b..02e5da8f9ea 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1886,7 +1886,7 @@ impl NetworkBeaconProcessor { // If we have a execution proof subscriber we assume a validator will resign the proof and therefore we do not propagate this proof to peers. // We will wait for the validator to sign and submit the proof for gossip. - let _gossip_behaviour = if let Ok((proof_status, block)) = &verification_result + let gossip_behaviour = if let Ok((proof_status, block)) = &verification_result && (proof_status.is_valid() || proof_status.is_accepted()) && let Some(event_handler) = self.chain.event_handler.as_ref() && event_handler.has_execution_proof_validated_subscribers() @@ -1932,7 +1932,7 @@ impl NetworkBeaconProcessor { block_root, }); } - self.propagate_validation_result(message_id, peer_id, _gossip_behaviour); + self.propagate_validation_result(message_id, peer_id, gossip_behaviour); } Ok((ProofStatus::Invalid, _)) => { debug!( @@ -1950,7 +1950,7 @@ impl NetworkBeaconProcessor { proof_type, "Execution proof is accepted but not fully verified" ); - self.propagate_validation_result(message_id, peer_id, _gossip_behaviour); + self.propagate_validation_result(message_id, peer_id, gossip_behaviour); } Ok((ProofStatus::Syncing, _)) => { debug!(