diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index 39577116e96..7220f443cc1 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -9,6 +9,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] use reth_primitives_traits::NodePrimitives; +use std::sync::Arc; pub use payload::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, FlashBlockDecoder, @@ -39,39 +40,37 @@ pub type PendingBlockRx = tokio::sync::watch::Receiver; +/// Receiver of received [`FlashBlock`]s from the (websocket) subscription. +/// +/// [`FlashBlock`]: crate::FlashBlock +pub type FlashBlockRx = tokio::sync::broadcast::Receiver>; + /// Receiver that signals whether a [`FlashBlock`] is currently being built. pub type InProgressFlashBlockRx = tokio::sync::watch::Receiver>; /// Container for all flashblocks-related listeners. /// -/// Groups together the three receivers that provide flashblock-related updates. +/// Groups together the channels for flashblock-related updates. #[derive(Debug)] pub struct FlashblocksListeners { - /// Receiver of the most recent [`PendingFlashBlock`] built out of [`FlashBlock`]s. + /// Receiver of the most recent executed [`PendingFlashBlock`] built out of [`FlashBlock`]s. pub pending_block_rx: PendingBlockRx, - /// Receiver of the sequences of [`FlashBlock`]s built. - pub flashblock_rx: FlashBlockCompleteSequenceRx, + /// Subscription channel of the complete sequences of [`FlashBlock`]s built. + pub flashblocks_sequence: tokio::sync::broadcast::Sender, /// Receiver that signals whether a [`FlashBlock`] is currently being built. pub in_progress_rx: InProgressFlashBlockRx, + /// Subscription channel for received flashblocks from the (websocket) connection. + pub received_flashblocks: tokio::sync::broadcast::Sender>, } impl FlashblocksListeners { - /// Creates a new [`FlashblocksListeners`] with the given receivers. + /// Creates a new [`FlashblocksListeners`] with the given channels. pub const fn new( pending_block_rx: PendingBlockRx, - flashblock_rx: FlashBlockCompleteSequenceRx, + flashblocks_sequence: tokio::sync::broadcast::Sender, in_progress_rx: InProgressFlashBlockRx, + received_flashblocks: tokio::sync::broadcast::Sender>, ) -> Self { - Self { pending_block_rx, flashblock_rx, in_progress_rx } - } -} - -impl Clone for FlashblocksListeners { - fn clone(&self) -> Self { - Self { - pending_block_rx: self.pending_block_rx.clone(), - flashblock_rx: self.flashblock_rx.resubscribe(), - in_progress_rx: self.in_progress_rx.clone(), - } + Self { pending_block_rx, flashblocks_sequence, in_progress_rx, received_flashblocks } } } diff --git a/crates/optimism/flashblocks/src/sequence.rs b/crates/optimism/flashblocks/src/sequence.rs index fff4bd84a45..f2363207e38 100644 --- a/crates/optimism/flashblocks/src/sequence.rs +++ b/crates/optimism/flashblocks/src/sequence.rs @@ -38,6 +38,13 @@ where Self { inner: BTreeMap::new(), block_broadcaster: tx, state_root: None } } + /// Returns the sender half of the [`FlashBlockCompleteSequence`] channel. + pub const fn block_sequence_broadcaster( + &self, + ) -> &broadcast::Sender { + &self.block_broadcaster + } + /// Gets a subscriber to the flashblock sequences produced. pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx { self.block_broadcaster.subscribe() @@ -160,7 +167,10 @@ where } /// A complete sequence of flashblocks, often corresponding to a full block. -/// Ensure invariants of a complete flashblocks sequence. +/// +/// Ensures invariants of a complete flashblocks sequence. +/// If this entire sequence of flashblocks was executed on top of latest block, this also includes +/// the computed state root. #[derive(Debug, Clone)] pub struct FlashBlockCompleteSequence { inner: Vec, diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 7e442470d98..f5d4a4a810d 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,8 +1,8 @@ use crate::{ sequence::FlashBlockPendingSequence, worker::{BuildArgs, FlashBlockBuilder}, - ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx, - PendingFlashBlock, + ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx, + InProgressFlashBlockRx, PendingFlashBlock, }; use alloy_eips::eip2718::WithEncoded; use alloy_primitives::B256; @@ -19,6 +19,7 @@ use reth_storage_api::{BlockReaderIdExt, StateProviderFactory}; use reth_tasks::TaskExecutor; use std::{ pin::Pin, + sync::Arc, task::{ready, Context, Poll}, time::Instant, }; @@ -42,6 +43,8 @@ pub struct FlashBlockService< rx: S, current: Option>, blocks: FlashBlockPendingSequence, + /// Broadcast channel to forward received flashblocks from the subscription. + received_flashblocks_tx: tokio::sync::broadcast::Sender>, rebuild: bool, builder: FlashBlockBuilder, canon_receiver: CanonStateNotifications, @@ -60,17 +63,6 @@ pub struct FlashBlockService< compute_state_root: bool, } -/// Information for a flashblock currently built -#[derive(Debug, Clone, Copy)] -pub struct FlashBlockBuildInfo { - /// Parent block hash - pub parent_hash: B256, - /// Flashblock index within the current block's sequence - pub index: u64, - /// Block number of the flashblock being built. - pub block_number: u64, -} - impl FlashBlockService where N: NodePrimitives, @@ -92,10 +84,12 @@ where /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream. pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self { let (in_progress_tx, _) = watch::channel(None); + let (received_flashblocks_tx, _) = tokio::sync::broadcast::channel(128); Self { rx, current: None, blocks: FlashBlockPendingSequence::new(), + received_flashblocks_tx, canon_receiver: provider.subscribe_to_canonical_state(), builder: FlashBlockBuilder::new(evm_config, provider), rebuild: false, @@ -114,6 +108,20 @@ where self } + /// Returns the sender half to the received flashblocks. + pub const fn flashblocks_broadcaster( + &self, + ) -> &tokio::sync::broadcast::Sender> { + &self.received_flashblocks_tx + } + + /// Returns the sender half to the flashblock sequence. + pub const fn block_sequence_broadcaster( + &self, + ) -> &tokio::sync::broadcast::Sender { + self.blocks.block_sequence_broadcaster() + } + /// Returns a subscriber to the flashblock sequence. pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx { self.blocks.subscribe_block_sequence() @@ -137,6 +145,13 @@ where warn!("Flashblock service has stopped"); } + /// Notifies all subscribers about the received flashblock + fn notify_received_flashblock(&self, flashblock: &FlashBlock) { + if self.received_flashblocks_tx.receiver_count() > 0 { + let _ = self.received_flashblocks_tx.send(Arc::new(flashblock.clone())); + } + } + /// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier. /// /// Returns `None` if the flashblock have no `base` or the base is not a child block of latest. @@ -284,6 +299,7 @@ where while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { match result { Ok(flashblock) => { + this.notify_received_flashblock(&flashblock); if flashblock.index == 0 { this.metrics.last_flashblock_length.record(this.blocks.count() as f64); } @@ -344,6 +360,17 @@ where } } +/// Information for a flashblock currently built +#[derive(Debug, Clone, Copy)] +pub struct FlashBlockBuildInfo { + /// Parent block hash + pub parent_hash: B256, + /// Flashblock index within the current block's sequence + pub index: u64, + /// Block number of the flashblock being built. + pub block_number: u64, +} + type BuildJob = (Instant, oneshot::Receiver, CachedReads)>>>); diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index f69da896424..84929e98852 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -23,8 +23,8 @@ use reth_evm::ConfigureEvm; use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes}; use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ - ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService, - FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream, + ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockRx, + FlashBlockService, FlashblocksListeners, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream, }; use reth_rpc::eth::core::EthApiInner; use reth_rpc_eth_api::{ @@ -110,9 +110,14 @@ impl OpEthApi { self.inner.flashblocks.as_ref().map(|f| f.pending_block_rx.clone()) } - /// Returns a flashblock receiver, if any, by resubscribing to it. - pub fn flashblock_rx(&self) -> Option { - self.inner.flashblocks.as_ref().map(|f| f.flashblock_rx.resubscribe()) + /// Returns a new subscription to received flashblocks. + pub fn subscribe_received_flashblocks(&self) -> Option { + self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe()) + } + + /// Returns a new subscription to flashblock sequences. + pub fn subscribe_flashblock_sequence(&self) -> Option { + self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe()) } /// Returns information about the flashblock currently being built, if any. @@ -501,12 +506,18 @@ where ctx.components.task_executor().clone(), ); - let flashblock_rx = service.subscribe_block_sequence(); + let flashblocks_sequence = service.block_sequence_broadcaster().clone(); + let received_flashblocks = service.flashblocks_broadcaster().clone(); let in_progress_rx = service.subscribe_in_progress(); ctx.components.task_executor().spawn(Box::pin(service.run(tx))); - Some(FlashblocksListeners::new(pending_rx, flashblock_rx, in_progress_rx)) + Some(FlashblocksListeners::new( + pending_rx, + flashblocks_sequence, + in_progress_rx, + received_flashblocks, + )) } else { None };