From 30dbd8b49e46ff9f0c9dbb9096690e5197d56b0d Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Mon, 1 Sep 2025 10:24:56 +0200 Subject: [PATCH 1/7] feat(optimism): keep flashblocks in ordered sequence for pending block --- crates/optimism/flashblocks/src/service.rs | 69 +++++++++++++++++++--- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index db595749bb9..2fa4259fb79 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -17,6 +17,7 @@ use reth_revm::{database::StateProviderDatabase, db::State}; use reth_rpc_eth_types::{EthApiError, PendingBlock}; use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory}; use std::{ + collections::BTreeMap, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -36,12 +37,62 @@ pub struct FlashBlockService< > { rx: S, current: Option>, - blocks: Vec, + blocks: FlashBlockSequence, evm_config: EvmConfig, provider: Provider, canon_receiver: CanonStateNotifications, } +/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index +#[derive(Debug)] +struct FlashBlockSequence { + inner: BTreeMap, +} + +impl FlashBlockSequence { + const fn new() -> Self { + Self { inner: BTreeMap::new() } + } + + /// Insert flashblock in the current sequence + /// Only take in account given flashblock index + fn insert(&mut self, flashblock: FlashBlock) { + self.inner.insert(flashblock.index, flashblock); + } + + fn clear(&mut self) { + self.inner.clear(); + } + + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Return base flashblock + /// Base flashblock is the first flashblock of index 0 in the sequence + fn base(&self) -> Option<&FlashBlock> { + self.inner.get(&0) + } + + /// Iterator over sequence of ready flashblocks + /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in + /// the sequence + fn iter_ready(&self) -> impl Iterator { + let mut current_index = 0; + self.inner + .iter() + .take_while(move |(&idx, _)| { + if idx == current_index { + current_index += 1; + true + } else { + false + } + }) + .map(|(_, f)| f) + } +} + impl< N: NodePrimitives, S, @@ -61,7 +112,7 @@ impl< Self { rx, current: None, - blocks: Vec::new(), + blocks: FlashBlockSequence::new(), evm_config, canon_receiver: provider.subscribe_to_canonical_state(), provider, @@ -77,12 +128,15 @@ impl< pub fn add_flash_block(&mut self, flashblock: FlashBlock) { // Flash block at index zero resets the whole state if flashblock.index == 0 { - self.blocks = vec![flashblock]; + self.blocks.clear(); self.current.take(); + self.blocks.insert(flashblock); } // Flash block at the following index adds to the collection and invalidates built block - else if flashblock.index == self.blocks.last().map(|last| last.index + 1).unwrap_or(0) { - self.blocks.push(flashblock); + else if flashblock.index == + self.blocks.iter_ready().last().map(|last| last.index + 1).unwrap_or(0) + { + self.blocks.insert(flashblock); self.current.take(); } // Flash block at a different index is ignored @@ -94,6 +148,7 @@ impl< curr_block = %pending_block.block().header().number(), new_block = %flashblock.metadata.block_number, ); + self.blocks.insert(flashblock); } else { error!( message = "Received Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", @@ -125,7 +180,7 @@ impl< let attrs = self .blocks - .first() + .base() .and_then(|v| v.base.clone()) .ok_or_eyre("Missing base flashblock")?; @@ -144,7 +199,7 @@ impl< builder.apply_pre_execution_changes()?; - let transactions = self.blocks.iter().flat_map(|v| v.diff.transactions.clone()); + let transactions = self.blocks.iter_ready().flat_map(|v| v.diff.transactions.clone()); for encoded in transactions { let tx = N::SignedTx::decode_2718_exact(encoded.as_ref())?; From 57c0afa680e1c12e57777f3556a21d8449919834 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Sep 2025 13:53:21 +0200 Subject: [PATCH 2/7] chore: simplify flashblocks poll logic --- crates/optimism/flashblocks/src/service.rs | 106 +++++++++------------ 1 file changed, 45 insertions(+), 61 deletions(-) diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 69109978f51..49187f7f5b3 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,7 +1,7 @@ use crate::{ExecutionPayloadBaseV1, FlashBlock}; use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag, Decodable2718}; use alloy_primitives::B256; -use eyre::{eyre, OptionExt}; +use eyre::OptionExt; use futures_util::{FutureExt, Stream, StreamExt}; use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock}; use reth_errors::RethError; @@ -11,7 +11,7 @@ use reth_evm::{ }; use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{ - AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, RecoveredBlock, + AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SignedTransaction, }; use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}; @@ -145,14 +145,10 @@ impl< } /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received using - /// [`Self::add_flash_block`]. - /// Builds a pending block using the configured provider and pool. + /// [`Self::add_flash_block`] on top of the latest state. /// - /// If the origin is the actual pending block, the block is built with withdrawals. - /// - /// After Cancun, if the origin is the actual pending block, the block includes the EIP-4788 pre - /// block contract call using the parent beacon block root received from the CL. - pub fn execute(&mut self) -> eyre::Result> { + /// Returns None if the flashblock doesn't attach to the latest header. + fn execute(&mut self) -> eyre::Result>> { let latest = self .provider .latest_header()? @@ -166,7 +162,8 @@ impl< .ok_or_eyre("Missing base flashblock")?; if attrs.parent_hash != latest_hash { - return Err(eyre!("The base flashblock is old")); + // doesn't attach to the latest block + return Ok(None) } let state_provider = self.provider.history_by_block_hash(latest.hash())?; @@ -204,39 +201,14 @@ impl< // update cached reads self.update_cached_reads(latest_hash, request_cache); - Ok(PendingBlock::with_executed_block( + Ok(Some(PendingBlock::with_executed_block( Instant::now() + Duration::from_secs(1), ExecutedBlock { recovered_block: block.into(), execution_output: Arc::new(execution_outcome), hashed_state: Arc::new(hashed_state), }, - )) - } - - /// Compares tip from the last notification of [`CanonStateSubscriptions`] with last computed - /// pending block and verifies that the tip is the parent of the pending block. - /// - /// Returns: - /// * `Ok(Some(true))` if tip == parent - /// * `Ok(Some(false))` if tip != parent - /// * `Ok(None)` if there weren't any new notifications or the pending block is not built - /// * `Err` if the cannon state receiver returned an error - fn verify_pending_block_integrity( - &mut self, - cx: &mut Context<'_>, - ) -> eyre::Result> { - let mut tip = None; - let fut = self.canon_receiver.recv(); - pin!(fut); - - while let Poll::Ready(result) = fut.poll_unpin(cx) { - tip = result?.tip_checked().map(RecoveredBlock::hash); - } - - Ok(tip - .zip(self.current.as_ref().map(PendingBlock::parent_hash)) - .map(|(latest, parent)| latest == parent)) + ))) } } @@ -259,40 +231,52 @@ impl< fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // Consume new flashblocks while they're ready + let mut new_flashblock = false; + // consume new flashblocks while they're ready while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { match result { - Ok(flashblock) => this.add_flash_block(flashblock), + Ok(flashblock) => { + new_flashblock = true; + this.add_flash_block(flashblock) + } Err(err) => return Poll::Ready(Some(Err(err))), } } - // Execute block if there are flashblocks but no last pending block - let changed = if this.current.is_none() && !this.blocks.is_empty() { - match this.execute() { - Ok(block) => this.current = Some(block), - Err(err) => return Poll::Ready(Some(Err(err))), + // advance new canonical message, if any to reset flashblock + { + let fut = this.canon_receiver.recv(); + pin!(fut); + if fut.poll_unpin(cx).is_ready() { + // if we have a new canonical message, we know the currently tracked flashblock is + // invalidated + if this.current.take().is_some() { + return Poll::Ready(Some(Ok(None))) + } } + } - true - } else { - false - }; + if !new_flashblock && this.current.is_none() { + // no new flashbblocks received since, block is still unchanged + return Poll::Pending + } - // Verify that pending block is following up to the canonical state - match this.verify_pending_block_integrity(cx) { - // Integrity check failed: erase last block - Ok(Some(false)) => Poll::Ready(Some(Ok(None))), - // Integrity check is OK or skipped: output last block - Ok(Some(true) | None) => { - if changed { - Poll::Ready(Some(Ok(this.current.clone()))) - } else { - Poll::Pending - } + // try to build a block on top of latest + match this.execute() { + Ok(Some(new_pending)) => { + // built a new pending block + this.current = Some(new_pending.clone()); + return Poll::Ready(Some(Ok(Some(new_pending)))); + } + Ok(None) => { + // nothing to do because tracked flashblock doesn't attach to latest + } + Err(err) => { + // we can ignore this error + debug!(%err, "failed to execute flashblock"); } - // Cannot check integrity: error occurred - Err(err) => Poll::Ready(Some(Err(err))), } + + Poll::Pending } } From 811c00265ba3523d841df91d4b68051d2698adb3 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Sep 2025 13:55:34 +0200 Subject: [PATCH 3/7] fmt --- crates/optimism/flashblocks/src/service.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 49187f7f5b3..a6fd6551644 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -11,8 +11,7 @@ use reth_evm::{ }; use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{ - AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, - SignedTransaction, + AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SignedTransaction, }; use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}; use reth_rpc_eth_types::{EthApiError, PendingBlock}; From df50ffd3643489bbde1ad4a252e8008c63e354f1 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Sep 2025 17:51:33 +0200 Subject: [PATCH 4/7] simplify --- crates/optimism/flashblocks/src/payload.rs | 12 ++ crates/optimism/flashblocks/src/service.rs | 158 ++++++++------------- 2 files changed, 75 insertions(+), 95 deletions(-) diff --git a/crates/optimism/flashblocks/src/payload.rs b/crates/optimism/flashblocks/src/payload.rs index ce5ddf4c95a..dee2458178f 100644 --- a/crates/optimism/flashblocks/src/payload.rs +++ b/crates/optimism/flashblocks/src/payload.rs @@ -27,6 +27,18 @@ pub struct FlashBlock { pub metadata: Metadata, } +impl FlashBlock { + /// Returns the block number of this flashblock. + pub const fn block_number(&self) -> u64 { + self.metadata.block_number + } + + /// Returns the first parent hash of this flashblock. + pub fn parent_hash(&self) -> Option { + Some(self.base.as_ref()?.parent_hash) + } +} + /// Provides metadata about the block that may be useful for indexing or analysis. #[derive(Default, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Metadata { diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index f994936ea35..8f22f6c4795 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -25,7 +25,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::pin; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; /// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of /// [`FlashBlock`]s. @@ -49,56 +49,6 @@ pub struct FlashBlockService< cached_state: Option<(B256, CachedReads)>, } -/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index -#[derive(Debug)] -struct FlashBlockSequence { - inner: BTreeMap, -} - -impl FlashBlockSequence { - const fn new() -> Self { - Self { inner: BTreeMap::new() } - } - - /// Insert flashblock in the current sequence - /// Only take in account given flashblock index - fn insert(&mut self, flashblock: FlashBlock) { - self.inner.insert(flashblock.index, flashblock); - } - - fn clear(&mut self) { - self.inner.clear(); - } - - fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - /// Return base flashblock - /// Base flashblock is the first flashblock of index 0 in the sequence - fn base(&self) -> Option<&FlashBlock> { - self.inner.get(&0) - } - - /// Iterator over sequence of ready flashblocks - /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in - /// the sequence - fn iter_ready(&self) -> impl Iterator { - let mut current_index = 0; - self.inner - .iter() - .take_while(move |(&idx, _)| { - if idx == current_index { - current_index += 1; - true - } else { - false - } - }) - .map(|(_, f)| f) - } -} - impl< N: NodePrimitives, S, @@ -155,49 +105,6 @@ impl< self.cached_state.take(); } - /// Adds the `block` into the collection. - /// - /// Depending on its index and associated block number, it may: - /// * Be added to all the flashblocks received prior using this function. - /// * Cause a reset of the flashblocks and become the sole member of the collection. - /// * Be ignored. - pub fn add_flash_block(&mut self, flashblock: FlashBlock) { - // Flash block at index zero resets the whole state - if flashblock.index == 0 { - self.clear(); - self.blocks.insert(flashblock); - } - // Flash block at the following index adds to the collection and invalidates built block - else if flashblock.index == - self.blocks.iter_ready().last().map(|last| last.index + 1).unwrap_or(0) - { - self.blocks.insert(flashblock); - self.current.take(); - } - // Flash block at a different index is ignored - else if let Some(pending_block) = self.current.as_ref() { - // Delete built block if it corresponds to a different height - if pending_block.block().header().number() == flashblock.metadata.block_number { - info!( - message = "None sequential Flashblocks, keeping cache", - curr_block = %pending_block.block().header().number(), - new_block = %flashblock.metadata.block_number, - ); - self.blocks.insert(flashblock); - } else { - error!( - message = "Received Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_block.block().header().number(), - new_block = %flashblock.metadata.block_number, - ); - - self.clear(); - } - } else { - debug!("ignoring {flashblock:?}"); - } - } - /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received using /// [`Self::add_flash_block`]. /// Builds a pending block using the configured provider and pool. @@ -316,7 +223,7 @@ impl< // Consume new flashblocks while they're ready while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { match result { - Ok(flashblock) => this.add_flash_block(flashblock), + Ok(flashblock) => this.blocks.insert(flashblock), Err(err) => return Poll::Ready(Some(Err(err))), } } @@ -350,3 +257,64 @@ impl< } } } + +/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index +#[derive(Debug)] +struct FlashBlockSequence { + inner: BTreeMap, +} + +impl FlashBlockSequence { + const fn new() -> Self { + Self { inner: BTreeMap::new() } + } + + /// Inserts a new block into the sequence. + /// + /// A [`FlashBlock`] with index 0 resets the set. + fn insert(&mut self, flashblock: FlashBlock) { + if flashblock.index == 0 { + trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence"); + // Flash block at index zero resets the whole state + self.clear(); + self.inner.insert(flashblock.index, flashblock); + return + } + + // only insert if we we previously received the same block, assume we received index 0 + if self.block_number() == Some(flashblock.metadata.block_number) { + trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() + 1 ,"Received followup flashblock"); + self.inner.insert(flashblock.index, flashblock); + } + } + + /// Returns the first block number + fn block_number(&self) -> Option { + Some(self.inner.values().next()?.metadata.block_number) + } + + fn clear(&mut self) { + self.inner.clear(); + } + + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Return base flashblock + /// Base flashblock is the first flashblock of index 0 in the sequence + fn base(&self) -> Option<&FlashBlock> { + self.inner.get(&0) + } + + /// Iterator over sequence of ready flashblocks + /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in + /// the sequence + fn iter_ready(&self) -> impl Iterator { + self.inner + .values() + .enumerate() + .take_while(|(idx, block)| block.index == *idx as u64) + .map(|(_, block)| block) + } +} From d3df2f63a1d7be3aa893aabc6238802d5d59d725 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Sep 2025 18:57:39 +0200 Subject: [PATCH 5/7] fixes and simplifications --- crates/optimism/flashblocks/src/service.rs | 74 ++++++++++++------- .../optimism/flashblocks/src/ws/decoding.rs | 1 + crates/optimism/flashblocks/src/ws/stream.rs | 46 ++++++------ 3 files changed, 72 insertions(+), 49 deletions(-) diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 8c07fdb167e..da851ab98d8 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,7 +1,6 @@ use crate::{ExecutionPayloadBaseV1, FlashBlock}; use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag, Decodable2718}; use alloy_primitives::B256; -use eyre::OptionExt; use futures_util::{FutureExt, Stream, StreamExt}; use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock}; use reth_errors::RethError; @@ -99,19 +98,21 @@ impl< /// /// Returns None if the flashblock doesn't attach to the latest header. fn execute(&mut self) -> eyre::Result>> { + trace!("Attempting new flashblock"); + let latest = self .provider .latest_header()? .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; let latest_hash = latest.hash(); - let attrs = self - .blocks - .base() - .and_then(|v| v.base.clone()) - .ok_or_eyre("Missing base flashblock")?; + let Some(attrs) = self.blocks.payload_base() else { + trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base"); + return Ok(None) + }; if attrs.parent_hash != latest_hash { + trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock"); // doesn't attach to the latest block return Ok(None) } @@ -162,19 +163,19 @@ impl< } } -impl< - N: NodePrimitives, - S: Stream> + Unpin, - EvmConfig: ConfigureEvm + Unpin>, - Provider: StateProviderFactory - + CanonStateSubscriptions - + BlockReaderIdExt< - Header = HeaderTy, - Block = BlockTy, - Transaction = N::SignedTx, - Receipt = ReceiptTy, - > + Unpin, - > Stream for FlashBlockService +impl Stream for FlashBlockService +where + N: NodePrimitives, + S: Stream> + Unpin, + EvmConfig: ConfigureEvm + Unpin>, + Provider: StateProviderFactory + + CanonStateSubscriptions + + BlockReaderIdExt< + Header = HeaderTy, + Block = BlockTy, + Transaction = N::SignedTx, + Receipt = ReceiptTy, + > + Unpin, { type Item = eyre::Result>>; @@ -201,6 +202,7 @@ impl< // if we have a new canonical message, we know the currently tracked flashblock is // invalidated if this.current.take().is_some() { + trace!("Clearing current flashblock on new canonical block"); return Poll::Ready(Some(Ok(None))) } } @@ -231,9 +233,13 @@ impl< } } -/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index +/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index. #[derive(Debug)] struct FlashBlockSequence { + /// tracks the individual flashblocks in order + /// + /// With a blocktime of 2s and flashblock tickrate of ~200ms, we expect 10 or 11 flashblocks + /// per slot. inner: BTreeMap, } @@ -256,34 +262,46 @@ impl FlashBlockSequence { // only insert if we we previously received the same block, assume we received index 0 if self.block_number() == Some(flashblock.metadata.block_number) { - trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() + 1 ,"Received followup flashblock"); + trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock"); self.inner.insert(flashblock.index, flashblock); + } else { + trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following"); } } + /// Returns the number of tracked flashblocks. + fn count(&self) -> usize { + self.inner.len() + } + /// Returns the first block number fn block_number(&self) -> Option { Some(self.inner.values().next()?.metadata.block_number) } - fn clear(&mut self) { - self.inner.clear(); + /// Returns the payload base of the first tracked flashblock. + fn payload_base(&self) -> Option { + self.inner.values().next()?.base.clone() } - /// Return base flashblock - /// Base flashblock is the first flashblock of index 0 in the sequence - fn base(&self) -> Option<&FlashBlock> { - self.inner.get(&0) + fn clear(&mut self) { + self.inner.clear(); } /// Iterator over sequence of ready flashblocks + /// /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in /// the sequence + /// + /// Note: flashblocks start at `index 0`. fn iter_ready(&self) -> impl Iterator { self.inner .values() .enumerate() - .take_while(|(idx, block)| block.index == *idx as u64) + .take_while(|(idx, block)| { + // flashblock index 0 is the first flashblock + block.index == *idx as u64 + }) .map(|(_, block)| block) } } diff --git a/crates/optimism/flashblocks/src/ws/decoding.rs b/crates/optimism/flashblocks/src/ws/decoding.rs index d96601a4f86..95de2f1a232 100644 --- a/crates/optimism/flashblocks/src/ws/decoding.rs +++ b/crates/optimism/flashblocks/src/ws/decoding.rs @@ -4,6 +4,7 @@ use alloy_rpc_types_engine::PayloadId; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, io}; +/// Internal helper for decoding #[derive(Clone, Debug, PartialEq, Default, Deserialize, Serialize)] struct FlashblocksPayloadV1 { /// The payload id of the flashblock diff --git a/crates/optimism/flashblocks/src/ws/stream.rs b/crates/optimism/flashblocks/src/ws/stream.rs index faeda7c3564..bf2c0f9c502 100644 --- a/crates/optimism/flashblocks/src/ws/stream.rs +++ b/crates/optimism/flashblocks/src/ws/stream.rs @@ -1,5 +1,4 @@ use crate::FlashBlock; -use eyre::eyre; use futures_util::{stream::SplitStream, FutureExt, Stream, StreamExt}; use std::{ fmt::{Debug, Formatter}, @@ -13,6 +12,7 @@ use tokio_tungstenite::{ tungstenite::{Error, Message}, MaybeTlsStream, WebSocketStream, }; +use tracing::debug; use url::Url; /// An asynchronous stream of [`FlashBlock`] from a websocket connection. @@ -78,17 +78,27 @@ where } } - let msg = ready!(self - .stream - .as_mut() - .expect("Stream state should be unreachable without stream") - .poll_next_unpin(cx)); - - Poll::Ready(msg.map(|msg| match msg { - Ok(Message::Binary(bytes)) => FlashBlock::decode(bytes), - Ok(msg) => Err(eyre!("Unexpected websocket message: {msg:?}")), - Err(err) => Err(err.into()), - })) + loop { + let Some(msg) = ready!(self + .stream + .as_mut() + .expect("Stream state should be unreachable without stream") + .poll_next_unpin(cx)) + else { + return Poll::Ready(None); + }; + + match msg { + Ok(Message::Binary(bytes)) => return Poll::Ready(Some(FlashBlock::decode(bytes))), + Ok(Message::Ping(_) | Message::Pong(_)) => { + // can ginore for now + } + Ok(msg) => { + debug!("Received unexpected message: {:?}", msg); + } + Err(err) => return Poll::Ready(Some(Err(err.into()))), + } + } } } @@ -249,7 +259,7 @@ mod tests { &mut self, _ws_url: Url, ) -> impl Future> + Send + Sync { - future::ready(Err(eyre!("{}", &self.0))) + future::ready(Err(eyre::eyre!("{}", &self.0))) } } @@ -307,14 +317,8 @@ mod tests { async fn test_stream_fails_to_decode_non_binary_message() { let messages = FakeConnector::from([Ok(Message::Text(Utf8Bytes::from("test")))]); let ws_url = "http://localhost".parse().unwrap(); - let stream = WsFlashBlockStream::with_connector(ws_url, messages); - - let actual_messages: Vec<_> = - stream.map(Result::unwrap_err).map(|e| format!("{e}")).collect().await; - let expected_messages = - vec!["Unexpected websocket message: Text(Utf8Bytes(b\"test\"))".to_owned()]; - - assert_eq!(actual_messages, expected_messages); + let mut stream = WsFlashBlockStream::with_connector(ws_url, messages); + assert!(stream.next().await.is_none()); } #[tokio::test] From a2e69224c22d0fc89275f049a39b880cd4b9d198 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Sep 2025 19:01:17 +0200 Subject: [PATCH 6/7] docs --- crates/optimism/flashblocks/src/service.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index da851ab98d8..e06ad58746c 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -93,8 +93,7 @@ impl< self.cached_state = Some((head, cached_reads)); } - /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received using - /// [`Self::add_flash_block`] on top of the latest state. + /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier. /// /// Returns None if the flashblock doesn't attach to the latest header. fn execute(&mut self) -> eyre::Result>> { From 5efe558a01f84b6ca762cf2691988a80086361a9 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 1 Sep 2025 19:25:50 +0200 Subject: [PATCH 7/7] Update crates/optimism/flashblocks/src/ws/stream.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roman Hodulák --- crates/optimism/flashblocks/src/ws/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/optimism/flashblocks/src/ws/stream.rs b/crates/optimism/flashblocks/src/ws/stream.rs index bf2c0f9c502..8a8438b0878 100644 --- a/crates/optimism/flashblocks/src/ws/stream.rs +++ b/crates/optimism/flashblocks/src/ws/stream.rs @@ -314,7 +314,7 @@ mod tests { } #[tokio::test] - async fn test_stream_fails_to_decode_non_binary_message() { + async fn test_stream_ignores_non_binary_message() { let messages = FakeConnector::from([Ok(Message::Text(Utf8Bytes::from("test")))]); let ws_url = "http://localhost".parse().unwrap(); let mut stream = WsFlashBlockStream::with_connector(ws_url, messages);