diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b4610605aac..32ad072c9e8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -77,8 +77,7 @@ use crate::{ use bls::{PublicKey, PublicKeyBytes, Signature}; use eth2::beacon_response::ForkVersionedResponse; use eth2::types::{ - EventKind, SseBlobSidecar, SseBlock, SseBlockFull, SseDataColumnSidecar, - SseExtendedPayloadAttributes, + EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, }; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, @@ -4316,9 +4315,6 @@ impl BeaconChain { payload_verification_status: PayloadVerificationStatus, current_slot: Slot, ) { - // TODO: Optimise this so we don't have to clone. - let beacon_block = Arc::unwrap_or_clone(signed_block.clone()); - let (beacon_block, _) = beacon_block.deconstruct(); let block = signed_block.message(); // Only present some metrics for blocks from the previous epoch or later. @@ -4361,21 +4357,6 @@ impl BeaconChain { execution_optimistic: payload_verification_status.is_optimistic(), })); } - - // Emit BlockFull event if there are block_full subscribers - if event_handler.has_block_full_subscribers() { - let slot = block.slot(); - // Convert BeaconBlockRef to owned BeaconBlock for the event - event_handler.register(EventKind::BlockFull(Box::new(ForkVersionedResponse { - data: SseBlockFull { - slot, - block: beacon_block, - execution_optimistic: payload_verification_status.is_optimistic(), - }, - metadata: Default::default(), - version: self.spec.fork_name_at_slot::(slot), - }))); - } } // Do not trigger light_client server update producer for old blocks, to extra work @@ -7567,7 +7548,7 @@ impl BeaconChain { // Step 3: Update the fork choice if the proof engine returns valid. // The proof engine returns valid if the proof is valid and the criteria for the associated block root to be considered valid are met. // The proof engine returns ACCEPTED if the proof is valid but block validity criteria are not met. - if verification_result.is_valid() { + if verification_result.is_valid() || verification_result.is_accepted() { let request_root = signed_proof.request_root(); // Look up the beacon block root from request root diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 5adce3f8dd2..4684db96ba9 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -1,4 +1,6 @@ -pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead}; +pub use eth2::types::{ + EventKind, SseBlock, SseExecutionProofValidated, SseFinalizedCheckpoint, SseHead, +}; use tokio::sync::broadcast; use tokio::sync::broadcast::{Receiver, Sender, error::SendError}; use tracing::trace; @@ -10,7 +12,6 @@ pub struct ServerSentEventHandler { attestation_tx: Sender>, single_attestation_tx: Sender>, block_tx: Sender>, - block_full_tx: Sender>, blob_sidecar_tx: Sender>, data_column_sidecar_tx: Sender>, finalized_tx: Sender>, @@ -27,6 +28,7 @@ pub struct ServerSentEventHandler { attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, block_gossip_tx: Sender>, + execution_proof_validated_tx: Sender>, } impl ServerSentEventHandler { @@ -38,7 +40,6 @@ impl ServerSentEventHandler { let (attestation_tx, _) = broadcast::channel(capacity); let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); - let (block_full_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); let (data_column_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); @@ -55,12 +56,12 @@ impl ServerSentEventHandler { let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); let (block_gossip_tx, _) = broadcast::channel(capacity); + let (execution_proof_validated_tx, _) = broadcast::channel(capacity); Self { attestation_tx, single_attestation_tx, block_tx, - block_full_tx, blob_sidecar_tx, data_column_sidecar_tx, finalized_tx, @@ -77,6 +78,7 @@ impl ServerSentEventHandler { attester_slashing_tx, bls_to_execution_change_tx, block_gossip_tx, + execution_proof_validated_tx, } } @@ -101,10 +103,6 @@ impl ServerSentEventHandler { .block_tx .send(kind) .map(|count| log_count("block", count)), - EventKind::BlockFull(_) => self - .block_full_tx - .send(kind) - .map(|count| log_count("block_full", count)), EventKind::BlobSidecar(_) => self .blob_sidecar_tx .send(kind) @@ -169,6 +167,10 @@ impl ServerSentEventHandler { .block_gossip_tx .send(kind) .map(|count| log_count("block gossip", count)), + EventKind::ExecutionProofValidated(_) => self + .execution_proof_validated_tx + .send(kind) + .map(|count| log_count("execution proof validated", count)), }; if let Err(SendError(event)) = result { trace!(?event, "No receivers registered to listen for event"); @@ -187,10 +189,6 @@ impl ServerSentEventHandler { self.block_tx.subscribe() } - pub fn subscribe_block_full(&self) -> Receiver> { - self.block_full_tx.subscribe() - } - pub fn subscribe_blob_sidecar(&self) -> Receiver> { self.blob_sidecar_tx.subscribe() } @@ -267,10 +265,6 @@ impl ServerSentEventHandler { self.block_tx.receiver_count() > 0 } - pub fn has_block_full_subscribers(&self) -> bool { - self.block_full_tx.receiver_count() > 0 - } - pub fn has_blob_sidecar_subscribers(&self) -> bool { self.blob_sidecar_tx.receiver_count() > 0 } @@ -326,4 +320,12 @@ impl ServerSentEventHandler { pub fn has_block_gossip_subscribers(&self) -> bool { self.block_gossip_tx.receiver_count() > 0 } + + pub fn subscribe_execution_proof_validated(&self) -> Receiver> { + self.execution_proof_validated_tx.subscribe() + } + + pub fn has_execution_proof_validated_subscribers(&self) -> bool { + self.execution_proof_validated_tx.receiver_count() > 0 + } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fd081b9cfb8..b84d0068cf8 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3181,9 +3181,6 @@ pub fn serve( let receiver = match topic { api_types::EventTopic::Head => event_handler.subscribe_head(), api_types::EventTopic::Block => event_handler.subscribe_block(), - api_types::EventTopic::BlockFull => { - event_handler.subscribe_block_full() - } api_types::EventTopic::BlobSidecar => { event_handler.subscribe_blob_sidecar() } @@ -3235,6 +3232,9 @@ pub fn serve( api_types::EventTopic::BlockGossip => { event_handler.subscribe_block_gossip() } + api_types::EventTopic::ExecutionProofValidated => { + event_handler.subscribe_execution_proof_validated() + } }; receivers.push( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 56e3f7b42be..02e5da8f9ea 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -7,6 +7,7 @@ use crate::{ use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use beacon_chain::events::{EventKind, SseExecutionProofValidated}; use beacon_chain::store::Error; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, @@ -1877,9 +1878,31 @@ impl NetworkBeaconProcessor { let proof_type = execution_proof.proof_type(); let validator_index = execution_proof.validator_index(); + // Extract the inner proof before moving execution_proof into verification. + let execution_proof_message = execution_proof.message.clone(); + // Verify the execution proof. let verification_result = self.chain.verify_execution_proof(execution_proof).await; + // If we have a execution proof subscriber we assume a validator will resign the proof and therefore we do not propagate this proof to peers. + // We will wait for the validator to sign and submit the proof for gossip. + let gossip_behaviour = if let Ok((proof_status, block)) = &verification_result + && (proof_status.is_valid() || proof_status.is_accepted()) + && let Some(event_handler) = self.chain.event_handler.as_ref() + && event_handler.has_execution_proof_validated_subscribers() + && let Some((_block_root, slot)) = block + { + event_handler.register(EventKind::ExecutionProofValidated( + SseExecutionProofValidated { + execution_proof: execution_proof_message, + epoch: slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(), + }, + )); + MessageAcceptance::Ignore + } else { + MessageAcceptance::Accept + }; + match verification_result { // TODO: split our error types and penalize accordingly Err(e) => { @@ -1909,7 +1932,7 @@ impl NetworkBeaconProcessor { block_root, }); } - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + self.propagate_validation_result(message_id, peer_id, gossip_behaviour); } Ok((ProofStatus::Invalid, _)) => { debug!( @@ -1927,7 +1950,7 @@ impl NetworkBeaconProcessor { proof_type, "Execution proof is accepted but not fully verified" ); - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + self.propagate_validation_result(message_id, peer_id, gossip_behaviour); } Ok((ProofStatus::Syncing, _)) => { debug!( diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 52fcee1184d..2db5967d846 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -980,38 +980,6 @@ pub struct SseBlock { pub execution_optimistic: bool, } -#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] -#[serde(bound = "E: EthSpec")] -pub struct SseBlockFull { - pub slot: Slot, - pub block: BeaconBlock, - pub execution_optimistic: bool, -} - -#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] -struct SseBlockFullGeneric { - pub slot: Slot, - pub block: T, - pub execution_optimistic: bool, -} - -type VersionedSseBlockFull = ForkVersionedResponse>; - -impl<'de, E: EthSpec> ContextDeserialize<'de, ForkName> for SseBlockFull { - fn context_deserialize(deserializer: D, context: ForkName) -> Result - where - D: Deserializer<'de>, - { - let helper = SseBlockFullGeneric::::deserialize(deserializer)?; - Ok(SseBlockFull { - slot: helper.slot, - block: BeaconBlock::context_deserialize(helper.block, context) - .map_err(serde::de::Error::custom)?, - execution_optimistic: helper.execution_optimistic, - }) - } -} - #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseBlobSidecar { pub block_root: Hash256, @@ -1206,13 +1174,23 @@ impl<'de> ContextDeserialize<'de, ForkName> for SseExtendedPayloadAttributes { } } +/// SSE event payload for a validated execution proof (EIP-8025). +/// +/// Emitted by the beacon node when an `ExecutionProof` passes verification, +/// allowing validator clients to resign the proof with their own key. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseExecutionProofValidated { + pub execution_proof: ExecutionProof, + #[serde(with = "serde_utils::quoted_u64")] + pub epoch: u64, +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "E: EthSpec", untagged)] pub enum EventKind { Attestation(Box>), SingleAttestation(Box), Block(SseBlock), - BlockFull(Box>), BlobSidecar(SseBlobSidecar), DataColumnSidecar(SseDataColumnSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), @@ -1230,6 +1208,7 @@ pub enum EventKind { AttesterSlashing(Box>), BlsToExecutionChange(Box), BlockGossip(Box), + ExecutionProofValidated(SseExecutionProofValidated), } impl EventKind { @@ -1237,7 +1216,6 @@ impl EventKind { match self { EventKind::Head(_) => "head", EventKind::Block(_) => "block", - EventKind::BlockFull(_) => "block_full", EventKind::BlobSidecar(_) => "blob_sidecar", EventKind::DataColumnSidecar(_) => "data_column_sidecar", EventKind::Attestation(_) => "attestation", @@ -1256,6 +1234,7 @@ impl EventKind { EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", EventKind::BlockGossip(_) => "block_gossip", + EventKind::ExecutionProofValidated(_) => "execution_proof_validated", } } @@ -1272,9 +1251,6 @@ impl EventKind { "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), - "block_full" => Ok(EventKind::BlockFull(serde_json::from_str(data).map_err( - |e| ServerError::InvalidServerSentEvent(format!("Block Full: {:?}", e)), - )?)), "blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)), )?)), @@ -1352,6 +1328,14 @@ impl EventKind { "block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)), )?)), + "execution_proof_validated" => Ok(EventKind::ExecutionProofValidated( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!( + "Execution Proof Validated: {:?}", + e + )) + })?, + )), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1371,7 +1355,6 @@ pub struct EventQuery { pub enum EventTopic { Head, Block, - BlockFull, BlobSidecar, DataColumnSidecar, Attestation, @@ -1390,6 +1373,7 @@ pub enum EventTopic { ProposerSlashing, BlsToExecutionChange, BlockGossip, + ExecutionProofValidated, } impl FromStr for EventTopic { @@ -1399,7 +1383,6 @@ impl FromStr for EventTopic { match s { "head" => Ok(EventTopic::Head), "block" => Ok(EventTopic::Block), - "block_full" => Ok(EventTopic::BlockFull), "blob_sidecar" => Ok(EventTopic::BlobSidecar), "data_column_sidecar" => Ok(EventTopic::DataColumnSidecar), "attestation" => Ok(EventTopic::Attestation), @@ -1418,6 +1401,7 @@ impl FromStr for EventTopic { "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), "block_gossip" => Ok(EventTopic::BlockGossip), + "execution_proof_validated" => Ok(EventTopic::ExecutionProofValidated), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1428,7 +1412,6 @@ impl fmt::Display for EventTopic { match self { EventTopic::Head => write!(f, "head"), EventTopic::Block => write!(f, "block"), - EventTopic::BlockFull => write!(f, "block_full"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"), EventTopic::Attestation => write!(f, "attestation"), @@ -1447,6 +1430,7 @@ impl fmt::Display for EventTopic { EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), EventTopic::BlockGossip => write!(f, "block_gossip"), + EventTopic::ExecutionProofValidated => write!(f, "execution_proof_validated"), } } } @@ -2565,31 +2549,4 @@ mod test { let roundtrip = O::context_deserialize::(deserializer, fork_name).unwrap(); assert_eq!(original, roundtrip); } - - #[test] - fn test_versioned_sse_block_full_round_trip() { - let rng = &mut XorShiftRng::from_seed([42; 16]); - for fork_name in ForkName::list_all() { - let beacon_block = map_fork_name!(fork_name, BeaconBlock, <_>::random_for_test(rng)); - let slot = Slot::random_for_test(rng); - - let versioned_response = VersionedSseBlockFull:: { - version: fork_name, - metadata: EmptyMetadata {}, - data: SseBlockFull { - slot, - block: beacon_block, - execution_optimistic: true, - }, - }; - - let json_str = serde_json::to_string(&versioned_response).unwrap(); - let deserialized: VersionedSseBlockFull = - serde_json::from_str(&json_str).unwrap(); - - assert_eq!(versioned_response, deserialized); - assert!(deserialized.data.execution_optimistic); - println!("fork_name: {:?} PASSED", fork_name); - } - } } diff --git a/validator_client/validator_services/src/proof_service.rs b/validator_client/validator_services/src/proof_service.rs index 679df7cffe5..1febe1a1e79 100644 --- a/validator_client/validator_services/src/proof_service.rs +++ b/validator_client/validator_services/src/proof_service.rs @@ -1,28 +1,21 @@ //! EIP-8025 Execution Proof Service //! -//! This service handles both proactive and reactive execution proof workflows: -//! -//! 1. **Proactive Mode**: Monitors beacon chain for new blocks via SSE and requests -//! proofs from the configured proof engine -//! 2. **Reactive Mode**: Receives proof requests from HTTP API (proof engine callbacks) -//! and signs/submits them to the beacon chain -//! -//! The service bridges the gap between external proof engines, validator keys, and -//! beacon nodes, providing a complete end-to-end execution proof flow. +//! This service handles execution proof requests, signing and resigning workflows. use beacon_node_fallback::BeaconNodeFallback; use bls::PublicKey; -use eth2::types::EventTopic; +use eth2::types::{BlockId, EventKind, EventTopic, SseExecutionProofValidated}; use execution_layer::NewPayloadRequest; use execution_layer::eip8025::{HttpProofEngine, ProofEngine}; use futures::StreamExt; use slot_clock::SlotClock; use std::sync::Arc; +use std::time::Duration; use task_executor::TaskExecutor; use tracing::{debug, error, info, warn}; use types::execution::eip8025::ProofAttributes; -use types::{BeaconBlock, Epoch, EthSpec, ExecutionProof}; -use validator_store::ValidatorStore; +use types::{Epoch, EthSpec, ExecutionProof, Hash256}; +use validator_store::{DoppelgangerStatus, ValidatorStore}; /// Background service for execution proof handling pub struct ProofService { @@ -64,19 +57,14 @@ impl ProofService) -> Result<(), String> { - // Only start monitoring if proof engine is configured let inner = self.inner.clone(); - let service_fut = async move { - inner.monitor_blocks_task().await; - }; - self.inner - .executor - .spawn(service_fut, "proof_service_monitor"); - - info!("Proof service started - monitoring for new blocks"); - + self.inner.executor.spawn( + async move { inner.monitor_events_task().await }, + "proof_service_monitor", + ); + info!("Proof service started - monitoring for new blocks and validated proofs"); Ok(()) } @@ -97,100 +85,105 @@ impl ProofService Inner { - /// Proactive: Monitor beacon node for new blocks and request proofs - async fn monitor_blocks_task(self: Arc) { - info!("Starting proof service block monitoring via SSE"); + /// Subscribe to both `Block` and `ExecutionProofValidated` events via a single SSE stream. + async fn subscribe_to_events( + &self, + ) -> Result< + impl futures::Stream, eth2::Error>>, + String, + > { + self.beacon_nodes + .first_success(|node| async move { + node.get_events::(&[EventTopic::Block, EventTopic::ExecutionProofValidated]) + .await + }) + .await + .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) + } + + /// Monitor block and validated-proof events over a single SSE connection. + async fn monitor_events_task(self: Arc) { + info!("Starting proof service event monitoring via SSE"); loop { - // Attempt to subscribe to block events from beacon node - match self.subscribe_to_blocks().await { + match self.subscribe_to_events().await { Ok(mut stream) => { - info!("Successfully subscribed to block events"); + info!("Successfully subscribed to block and execution proof events"); - // Process events from the stream while let Some(event_result) = stream.next().await { match event_result { - Ok(eth2::types::EventKind::BlockFull(block_event)) => { - let block = block_event.data; - if block.execution_optimistic { + Ok(EventKind::Block(sse_block)) => { + if sse_block.execution_optimistic { debug!( - slot = block.slot.as_u64(), - "Received execution optimistic block event" + slot = sse_block.slot.as_u64(), + "Skipping execution optimistic block" ); + continue; } - self.handle_block_event(&block.block, block.slot).await; + self.handle_block_event(sse_block.block, sse_block.slot) + .await; } - Ok(_) => { - // Ignore other event types (shouldn't happen with our topic filter) - debug!("Received non-block event in block_full stream"); + Ok(EventKind::ExecutionProofValidated(proof_event)) => { + self.handle_validated_proof(proof_event).await; } + Ok(_) => {} Err(e) => { - warn!( - error = %e, - "Error receiving block event, will reconnect" - ); - break; // Break inner loop to reconnect + warn!(error = %e, "Error receiving event, will reconnect"); + break; } } } - // Stream ended or errored - reconnect - warn!("Block event stream ended, reconnecting..."); + warn!("Event stream ended, reconnecting..."); } Err(e) => { - error!( - error = %e, - "Failed to subscribe to block events, retrying..." - ); + error!(error = %e, "Failed to subscribe to events, retrying..."); } } - } - } - /// Helper method to establish SSE subscription with beacon node fallback - async fn subscribe_to_blocks( - &self, - ) -> Result< - impl futures::Stream, eth2::Error>>, - String, - > { - self.beacon_nodes - .first_success( - |node| async move { node.get_events::(&[EventTopic::BlockFull]).await }, - ) - .await - .map_err(|e| format!("All beacon nodes failed to provide event stream: {}", e)) + tokio::time::sleep(Duration::from_secs(2)).await; + } } - /// Handle a new block event by requesting proofs from proof engine - async fn handle_block_event(&self, block: &BeaconBlock, slot: types::Slot) { - let block_root = block.canonical_root(); - + /// Handle a new block event by fetching the full block via RPC then requesting proofs + async fn handle_block_event(&self, block_root: Hash256, slot: types::Slot) { info!( slot = slot.as_u64(), block = %block_root, - "New block detected, requesting proofs from proof engine" + "New block detected, fetching full block via RPC" ); - // Construct NewPayloadRequest from beacon block - let new_payload_request = match NewPayloadRequest::try_from(block.to_ref()) { + let signed_block = match self + .beacon_nodes + .first_success(|node| async move { + node.get_beacon_blocks::(BlockId::Root(block_root)) + .await + }) + .await + { + Ok(Some(response)) => response.data().clone(), + Ok(None) => { + warn!(block = %block_root, "Block not found on beacon node"); + return; + } + Err(e) => { + error!(block = %block_root, error = %e, "Failed to fetch block via RPC"); + return; + } + }; + + let new_payload_request = match NewPayloadRequest::try_from(signed_block.message()) { Ok(req) => req, Err(e) => { - error!( - error = ?e, - block = %block_root, - "Failed to construct NewPayloadRequest from block" - ); + error!(block = %block_root, error = ?e, "Failed to construct NewPayloadRequest"); return; } }; - // Use configured proof types let proof_attributes = ProofAttributes { proof_types: self.proof_types.clone(), }; - // Request proofs from proof engine - HttpProofEngine handles JSON serialization match self .proof_engine .request_proofs(new_payload_request, proof_attributes) @@ -204,11 +197,50 @@ impl Inner { ); } Err(e) => { - error!( - error = ?e, - block = %block_root, - "Failed to request proofs from proof engine" - ); + error!(block = %block_root, error = ?e, "Failed to request proofs from proof engine"); + } + } + } + + /// Handle a validated proof event by resigning with the first local validator key + async fn handle_validated_proof(&self, event: SseExecutionProofValidated) { + let execution_proof = event.execution_proof; + let epoch = Epoch::new(event.epoch); + + let Some(pubkey) = self + .validator_store + .voting_pubkeys::, _>(DoppelgangerStatus::ignored) + .first() + .cloned() + else { + warn!("No local validators available to resign proof"); + return; + }; + + match self + .validator_store + .sign_execution_proof(pubkey, execution_proof, epoch) + .await + { + Ok(signed_proof) => { + match self + .beacon_nodes + .first_success(move |node| { + let proof = signed_proof.clone(); + async move { node.post_beacon_execution_proofs(&[proof]).await } + }) + .await + { + Ok(_) => { + info!(?pubkey, "Resigned proof submitted"); + } + Err(e) => { + warn!(?pubkey, error = %e, "Failed to submit resigned proof"); + } + } + } + Err(e) => { + warn!(?pubkey, error = ?e, "Failed to sign proof for validator"); } } }