Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ use crate::{
use bls::{PublicKey, PublicKeyBytes, Signature};
use eth2::beacon_response::ForkVersionedResponse;
use eth2::types::{
EventKind, SseBlobSidecar, SseBlock, SseBlockFull, SseDataColumnSidecar,
SseExtendedPayloadAttributes,
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
};
use execution_layer::{
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
Expand Down Expand Up @@ -4316,9 +4315,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
payload_verification_status: PayloadVerificationStatus,
current_slot: Slot,
) {
// TODO: Optimise this so we don't have to clone.
let beacon_block = Arc::unwrap_or_clone(signed_block.clone());
let (beacon_block, _) = beacon_block.deconstruct();
let block = signed_block.message();

// Only present some metrics for blocks from the previous epoch or later.
Expand Down Expand Up @@ -4361,21 +4357,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
execution_optimistic: payload_verification_status.is_optimistic(),
}));
}

// Emit BlockFull event if there are block_full subscribers
if event_handler.has_block_full_subscribers() {
let slot = block.slot();
// Convert BeaconBlockRef to owned BeaconBlock for the event
event_handler.register(EventKind::BlockFull(Box::new(ForkVersionedResponse {
data: SseBlockFull {
slot,
block: beacon_block,
execution_optimistic: payload_verification_status.is_optimistic(),
},
metadata: Default::default(),
version: self.spec.fork_name_at_slot::<T::EthSpec>(slot),
})));
}
}

// Do not trigger light_client server update producer for old blocks, to extra work
Expand Down Expand Up @@ -7567,7 +7548,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Step 3: Update the fork choice if the proof engine returns valid.
// The proof engine returns valid if the proof is valid and the criteria for the associated block root to be considered valid are met.
// The proof engine returns ACCEPTED if the proof is valid but block validity criteria are not met.
if verification_result.is_valid() {
if verification_result.is_valid() || verification_result.is_accepted() {
let request_root = signed_proof.request_root();

// Look up the beacon block root from request root
Expand Down
34 changes: 18 additions & 16 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
pub use eth2::types::{
EventKind, SseBlock, SseExecutionProofValidated, SseFinalizedCheckpoint, SseHead,
};
use tokio::sync::broadcast;
use tokio::sync::broadcast::{Receiver, Sender, error::SendError};
use tracing::trace;
Expand All @@ -10,7 +12,6 @@ pub struct ServerSentEventHandler<E: EthSpec> {
attestation_tx: Sender<EventKind<E>>,
single_attestation_tx: Sender<EventKind<E>>,
block_tx: Sender<EventKind<E>>,
block_full_tx: Sender<EventKind<E>>,
blob_sidecar_tx: Sender<EventKind<E>>,
data_column_sidecar_tx: Sender<EventKind<E>>,
finalized_tx: Sender<EventKind<E>>,
Expand All @@ -27,6 +28,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
attester_slashing_tx: Sender<EventKind<E>>,
bls_to_execution_change_tx: Sender<EventKind<E>>,
block_gossip_tx: Sender<EventKind<E>>,
execution_proof_validated_tx: Sender<EventKind<E>>,
}

impl<E: EthSpec> ServerSentEventHandler<E> {
Expand All @@ -38,7 +40,6 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (attestation_tx, _) = broadcast::channel(capacity);
let (single_attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (block_full_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (data_column_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
Expand All @@ -55,12 +56,12 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (attester_slashing_tx, _) = broadcast::channel(capacity);
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
let (block_gossip_tx, _) = broadcast::channel(capacity);
let (execution_proof_validated_tx, _) = broadcast::channel(capacity);

Self {
attestation_tx,
single_attestation_tx,
block_tx,
block_full_tx,
blob_sidecar_tx,
data_column_sidecar_tx,
finalized_tx,
Expand All @@ -77,6 +78,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
attester_slashing_tx,
bls_to_execution_change_tx,
block_gossip_tx,
execution_proof_validated_tx,
}
}

Expand All @@ -101,10 +103,6 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.block_tx
.send(kind)
.map(|count| log_count("block", count)),
EventKind::BlockFull(_) => self
.block_full_tx
.send(kind)
.map(|count| log_count("block_full", count)),
EventKind::BlobSidecar(_) => self
.blob_sidecar_tx
.send(kind)
Expand Down Expand Up @@ -169,6 +167,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
EventKind::ExecutionProofValidated(_) => self
.execution_proof_validated_tx
.send(kind)
.map(|count| log_count("execution proof validated", count)),
};
if let Err(SendError(event)) = result {
trace!(?event, "No receivers registered to listen for event");
Expand All @@ -187,10 +189,6 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.block_tx.subscribe()
}

pub fn subscribe_block_full(&self) -> Receiver<EventKind<E>> {
self.block_full_tx.subscribe()
}

pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<E>> {
self.blob_sidecar_tx.subscribe()
}
Expand Down Expand Up @@ -267,10 +265,6 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.block_tx.receiver_count() > 0
}

pub fn has_block_full_subscribers(&self) -> bool {
self.block_full_tx.receiver_count() > 0
}

pub fn has_blob_sidecar_subscribers(&self) -> bool {
self.blob_sidecar_tx.receiver_count() > 0
}
Expand Down Expand Up @@ -326,4 +320,12 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_block_gossip_subscribers(&self) -> bool {
self.block_gossip_tx.receiver_count() > 0
}

pub fn subscribe_execution_proof_validated(&self) -> Receiver<EventKind<E>> {
self.execution_proof_validated_tx.subscribe()
}

pub fn has_execution_proof_validated_subscribers(&self) -> bool {
self.execution_proof_validated_tx.receiver_count() > 0
}
}
6 changes: 3 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3181,9 +3181,6 @@ pub fn serve<T: BeaconChainTypes>(
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::BlockFull => {
event_handler.subscribe_block_full()
}
api_types::EventTopic::BlobSidecar => {
event_handler.subscribe_blob_sidecar()
}
Expand Down Expand Up @@ -3235,6 +3232,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlockGossip => {
event_handler.subscribe_block_gossip()
}
api_types::EventTopic::ExecutionProofValidated => {
event_handler.subscribe_execution_proof_validated()
}
};

receivers.push(
Expand Down
27 changes: 25 additions & 2 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::events::{EventKind, SseExecutionProofValidated};
use beacon_chain::store::Error;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError,
Expand Down Expand Up @@ -1877,9 +1878,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let proof_type = execution_proof.proof_type();
let validator_index = execution_proof.validator_index();

// Extract the inner proof before moving execution_proof into verification.
let execution_proof_message = execution_proof.message.clone();

// Verify the execution proof.
let verification_result = self.chain.verify_execution_proof(execution_proof).await;

// If we have a execution proof subscriber we assume a validator will resign the proof and therefore we do not propagate this proof to peers.
// We will wait for the validator to sign and submit the proof for gossip.
let gossip_behaviour = if let Ok((proof_status, block)) = &verification_result
&& (proof_status.is_valid() || proof_status.is_accepted())
&& let Some(event_handler) = self.chain.event_handler.as_ref()
&& event_handler.has_execution_proof_validated_subscribers()
&& let Some((_block_root, slot)) = block
{
event_handler.register(EventKind::ExecutionProofValidated(
SseExecutionProofValidated {
execution_proof: execution_proof_message,
epoch: slot.epoch(T::EthSpec::slots_per_epoch()).as_u64(),
},
));
MessageAcceptance::Ignore
} else {
MessageAcceptance::Accept
};

match verification_result {
// TODO: split our error types and penalize accordingly
Err(e) => {
Expand Down Expand Up @@ -1909,7 +1932,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
block_root,
});
}
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.propagate_validation_result(message_id, peer_id, gossip_behaviour);
}
Ok((ProofStatus::Invalid, _)) => {
debug!(
Expand All @@ -1927,7 +1950,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
proof_type,
"Execution proof is accepted but not fully verified"
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.propagate_validation_result(message_id, peer_id, gossip_behaviour);
}
Ok((ProofStatus::Syncing, _)) => {
debug!(
Expand Down
Loading
Loading