diff --git a/crates/optimism/flashblocks/src/lib.rs b/crates/optimism/flashblocks/src/lib.rs index 6570ac5b71f..f189afa8f6b 100644 --- a/crates/optimism/flashblocks/src/lib.rs +++ b/crates/optimism/flashblocks/src/lib.rs @@ -17,4 +17,10 @@ mod ws; /// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s. /// /// [`FlashBlock`]: crate::FlashBlock -pub type FlashBlockRx = tokio::sync::watch::Receiver>>; +pub type PendingBlockRx = tokio::sync::watch::Receiver>>; + +/// Receiver of the sequences of [`FlashBlock`]s built. +/// +/// [`FlashBlock`]: crate::FlashBlock +pub type FlashBlockCompleteSequenceRx = + tokio::sync::broadcast::Receiver; diff --git a/crates/optimism/flashblocks/src/sequence.rs b/crates/optimism/flashblocks/src/sequence.rs index 96aa7180aca..72abfdca16d 100644 --- a/crates/optimism/flashblocks/src/sequence.rs +++ b/crates/optimism/flashblocks/src/sequence.rs @@ -1,9 +1,14 @@ use crate::{ExecutionPayloadBaseV1, FlashBlock}; use alloy_eips::eip2718::WithEncoded; +use core::mem; use eyre::{bail, OptionExt}; use reth_primitives_traits::{Recovered, SignedTransaction}; use std::collections::BTreeMap; -use tracing::trace; +use tokio::sync::broadcast; +use tracing::{debug, trace, warn}; + +/// The size of the broadcast channel for completed flashblock sequences. +const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128; /// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices. #[derive(Debug)] @@ -13,14 +18,51 @@ pub(crate) struct FlashBlockPendingSequence { /// With a blocktime of 2s and flashblock tick-rate of 200ms plus one extra flashblock per new /// pending block, we expect 11 flashblocks per slot. inner: BTreeMap>, + /// Broadcasts flashblocks to subscribers. + block_broadcaster: broadcast::Sender, } impl FlashBlockPendingSequence where T: SignedTransaction, { - pub(crate) const fn new() -> Self { - Self { inner: BTreeMap::new() } + pub(crate) fn new() -> Self { + // Note: if the channel is full, send will not block but rather overwrite the oldest + // messages. Order is preserved. + let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE); + Self { inner: BTreeMap::new(), block_broadcaster: tx } + } + + /// Gets a subscriber to the flashblock sequences produced. + pub(crate) fn subscribe_block_sequence( + &self, + ) -> broadcast::Receiver { + self.block_broadcaster.subscribe() + } + + // Clears the state and broadcasts the blocks produced to subscribers. + fn clear_and_broadcast_blocks(&mut self) { + let flashblocks = mem::take(&mut self.inner); + + // If there are any subscribers, send the flashblocks to them. + if self.block_broadcaster.receiver_count() > 0 { + let flashblocks = match FlashBlockCompleteSequence::new( + flashblocks.into_iter().map(|block| block.1.into()).collect(), + ) { + Ok(flashblocks) => flashblocks, + Err(err) => { + debug!(target: "flashblocks", error = ?err, "Failed to create full flashblock complete sequence"); + return; + } + }; + + // Note: this should only ever fail if there are no receivers. This can happen if + // there is a race condition between the clause right above and this + // one. We can simply warn the user and continue. + if let Err(err) = self.block_broadcaster.send(flashblocks) { + warn!(target: "flashblocks", error = ?err, "Failed to send flashblocks to subscribers"); + } + } } /// Inserts a new block into the sequence. @@ -29,8 +71,10 @@ where pub(crate) fn insert(&mut self, flashblock: FlashBlock) -> eyre::Result<()> { if flashblock.index == 0 { trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence"); - // Flash block at index zero resets the whole state - self.clear(); + + // Flash block at index zero resets the whole state. + self.clear_and_broadcast_blocks(); + self.inner.insert(flashblock.index, PreparedFlashBlock::new(flashblock)?); return Ok(()) } @@ -65,10 +109,6 @@ where .flat_map(|(_, block)| block.txs.clone()) } - fn clear(&mut self) { - self.inner.clear(); - } - /// Returns the first block number pub(crate) fn block_number(&self) -> Option { Some(self.inner.values().next()?.block().metadata.block_number) @@ -87,7 +127,7 @@ where /// A complete sequence of flashblocks, often corresponding to a full block. /// Ensure invariants of a complete flashblocks sequence. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FlashBlockCompleteSequence(Vec); impl FlashBlockCompleteSequence { @@ -153,6 +193,12 @@ impl PreparedFlashBlock { } } +impl From> for FlashBlock { + fn from(val: PreparedFlashBlock) -> Self { + val.block + } +} + impl PreparedFlashBlock where T: SignedTransaction, @@ -238,4 +284,46 @@ mod tests { assert_eq!(actual_txs, expected_txs); } + + #[test] + fn test_sequence_sends_flashblocks_to_subscribers() { + let mut sequence = FlashBlockPendingSequence::>::new(); + let mut subscriber = sequence.subscribe_block_sequence(); + + for idx in 0..10 { + sequence + .insert(FlashBlock { + payload_id: Default::default(), + index: idx, + base: Some(ExecutionPayloadBaseV1::default()), + diff: Default::default(), + metadata: Default::default(), + }) + .unwrap(); + } + + assert_eq!(sequence.count(), 10); + + // Then we don't receive anything until we insert a new flashblock + let no_flashblock = subscriber.try_recv(); + assert!(no_flashblock.is_err()); + + // Let's insert a new flashblock with index 0 + sequence + .insert(FlashBlock { + payload_id: Default::default(), + index: 0, + base: Some(ExecutionPayloadBaseV1::default()), + diff: Default::default(), + metadata: Default::default(), + }) + .unwrap(); + + let flashblocks = subscriber.try_recv().unwrap(); + assert_eq!(flashblocks.count(), 10); + + for (idx, block) in flashblocks.0.iter().enumerate() { + assert_eq!(block.index, idx as u64); + } + } } diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index d7c4568bb35..9b93baad0dd 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,7 +1,7 @@ use crate::{ sequence::FlashBlockPendingSequence, worker::{BuildArgs, FlashBlockBuilder}, - ExecutionPayloadBaseV1, FlashBlock, + ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequence, }; use alloy_eips::eip2718::WithEncoded; use alloy_primitives::B256; @@ -20,7 +20,10 @@ use std::{ task::{ready, Context, Poll}, time::Instant, }; -use tokio::{pin, sync::oneshot}; +use tokio::{ + pin, + sync::{broadcast, oneshot}, +}; use tracing::{debug, trace, warn}; /// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of @@ -80,6 +83,11 @@ where } } + /// Returns a subscriber to the flashblock sequence. + pub fn subscribe_block_sequence(&self) -> broadcast::Receiver { + self.blocks.subscribe_block_sequence() + } + /// Drives the services and sends new blocks to the receiver /// /// Note: this should be spawned diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index 6c32e28a5b7..732341e3bcf 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -22,7 +22,8 @@ use reth_evm::ConfigureEvm; use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy}; use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx}; use reth_optimism_flashblocks::{ - ExecutionPayloadBaseV1, FlashBlockRx, FlashBlockService, WsFlashBlockStream, + ExecutionPayloadBaseV1, FlashBlockCompleteSequenceRx, FlashBlockService, PendingBlockRx, + WsFlashBlockStream, }; use reth_rpc::eth::{core::EthApiInner, DevSigner}; use reth_rpc_eth_api::{ @@ -76,13 +77,15 @@ impl OpEthApi { eth_api: EthApiNodeBackend, sequencer_client: Option, min_suggested_priority_fee: U256, - flashblocks_rx: Option>, + pending_block_rx: Option>, + flashblock_rx: Option, ) -> Self { let inner = Arc::new(OpEthApiInner { eth_api, sequencer_client, min_suggested_priority_fee, - flashblocks_rx, + pending_block_rx, + flashblock_rx, }); Self { inner } } @@ -96,9 +99,14 @@ impl OpEthApi { self.inner.sequencer_client() } - /// Returns a cloned Flashblocks receiver, if any. - pub fn flashblocks_rx(&self) -> Option> { - self.inner.flashblocks_rx.clone() + /// Returns a cloned pending block receiver, if any. + pub fn pending_block_rx(&self) -> Option> { + self.inner.pending_block_rx.clone() + } + + /// Returns a flashblock receiver, if any, by resubscribing to it. + pub fn flashblock_rx(&self) -> Option { + self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe()) } /// Build a [`OpEthApi`] using [`OpEthApiBuilder`]. @@ -119,7 +127,7 @@ impl OpEthApi { PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent, }; - let Some(rx) = self.inner.flashblocks_rx.as_ref() else { return Ok(None) }; + let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) }; let pending_block = rx.borrow(); let Some(pending_block) = pending_block.as_ref() else { return Ok(None) }; @@ -321,10 +329,14 @@ pub struct OpEthApiInner { /// /// See also min_suggested_priority_fee: U256, - /// Flashblocks receiver. + /// Pending block receiver. /// /// If set, then it provides current pending block based on received Flashblocks. - flashblocks_rx: Option>, + pending_block_rx: Option>, + /// Flashblocks receiver. + /// + /// If set, then it provides sequences of flashblock built. + flashblock_rx: Option, } impl fmt::Debug for OpEthApiInner { @@ -459,9 +471,9 @@ where None }; - let flashblocks_rx = if let Some(ws_url) = flashblocks_url { + let rxs = if let Some(ws_url) = flashblocks_url { info!(target: "reth:cli", %ws_url, "Launching flashblocks service"); - let (tx, rx) = watch::channel(None); + let (tx, pending_block_rx) = watch::channel(None); let stream = WsFlashBlockStream::new(ws_url); let service = FlashBlockService::new( stream, @@ -469,19 +481,23 @@ where ctx.components.provider().clone(), ctx.components.task_executor().clone(), ); + let flashblock_rx = service.subscribe_block_sequence(); ctx.components.task_executor().spawn(Box::pin(service.run(tx))); - Some(rx) + Some((pending_block_rx, flashblock_rx)) } else { None }; + let (pending_block_rx, flashblock_rx) = rxs.unzip(); + let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner(); Ok(OpEthApi::new( eth_api, sequencer_client, U256::from(min_suggested_priority_fee), - flashblocks_rx, + pending_block_rx, + flashblock_rx, )) } } diff --git a/crates/optimism/rpc/src/historical.rs b/crates/optimism/rpc/src/historical.rs index e567bc79062..90357afa777 100644 --- a/crates/optimism/rpc/src/historical.rs +++ b/crates/optimism/rpc/src/historical.rs @@ -386,12 +386,12 @@ mod tests { #[test] fn parses_transaction_hash_from_params() { let hash = "0xdbdfa0f88b2cf815fdc1621bd20c2bd2b0eed4f0c56c9be2602957b5a60ec702"; - let params_str = format!(r#"["{}"]"#, hash); + let params_str = format!(r#"["{hash}"]"#); let params = Params::new(Some(¶ms_str)); let result = parse_transaction_hash_from_params(¶ms); assert!(result.is_ok()); let parsed_hash = result.unwrap(); - assert_eq!(format!("{:?}", parsed_hash), hash); + assert_eq!(format!("{parsed_hash:?}"), hash); } /// Tests that invalid transaction hash returns error.