diff --git a/crates/optimism/flashblocks/src/payload.rs b/crates/optimism/flashblocks/src/payload.rs index ce5ddf4c95a..dee2458178f 100644 --- a/crates/optimism/flashblocks/src/payload.rs +++ b/crates/optimism/flashblocks/src/payload.rs @@ -27,6 +27,18 @@ pub struct FlashBlock { pub metadata: Metadata, } +impl FlashBlock { + /// Returns the block number of this flashblock. + pub const fn block_number(&self) -> u64 { + self.metadata.block_number + } + + /// Returns the first parent hash of this flashblock. + pub fn parent_hash(&self) -> Option { + Some(self.base.as_ref()?.parent_hash) + } +} + /// Provides metadata about the block that may be useful for indexing or analysis. #[derive(Default, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Metadata { diff --git a/crates/optimism/flashblocks/src/service.rs b/crates/optimism/flashblocks/src/service.rs index 69109978f51..e06ad58746c 100644 --- a/crates/optimism/flashblocks/src/service.rs +++ b/crates/optimism/flashblocks/src/service.rs @@ -1,7 +1,6 @@ use crate::{ExecutionPayloadBaseV1, FlashBlock}; use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag, Decodable2718}; use alloy_primitives::B256; -use eyre::{eyre, OptionExt}; use futures_util::{FutureExt, Stream, StreamExt}; use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock}; use reth_errors::RethError; @@ -11,20 +10,20 @@ use reth_evm::{ }; use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{ - AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, RecoveredBlock, - SignedTransaction, + AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SignedTransaction, }; use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}; use reth_rpc_eth_types::{EthApiError, PendingBlock}; use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory}; use std::{ + collections::BTreeMap, pin::Pin, sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; use tokio::pin; -use tracing::{debug, error, info}; +use tracing::{debug, trace}; /// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of /// [`FlashBlock`]s. @@ -37,7 +36,7 @@ pub struct FlashBlockService< > { rx: S, current: Option>, - blocks: Vec, + blocks: FlashBlockSequence, evm_config: EvmConfig, provider: Provider, canon_receiver: CanonStateNotifications, @@ -67,7 +66,7 @@ impl< Self { rx, current: None, - blocks: Vec::new(), + blocks: FlashBlockSequence::new(), evm_config, canon_receiver: provider.subscribe_to_canonical_state(), provider, @@ -94,79 +93,27 @@ impl< self.cached_state = Some((head, cached_reads)); } - /// Clear the state of the service, including: - /// - All flashblocks sequence of the current pending block - /// - Invalidate latest pending block built - /// - Cache - fn clear(&mut self) { - self.blocks.clear(); - self.current.take(); - self.cached_state.take(); - } - - /// Adds the `block` into the collection. + /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier. /// - /// Depending on its index and associated block number, it may: - /// * Be added to all the flashblocks received prior using this function. - /// * Cause a reset of the flashblocks and become the sole member of the collection. - /// * Be ignored. - pub fn add_flash_block(&mut self, flashblock: FlashBlock) { - // Flash block at index zero resets the whole state - if flashblock.index == 0 { - self.clear(); - self.blocks.push(flashblock); - } - // Flash block at the following index adds to the collection and invalidates built block - else if flashblock.index == self.blocks.last().map(|last| last.index + 1).unwrap_or(0) { - self.blocks.push(flashblock); - self.current.take(); - } - // Flash block at a different index is ignored - else if let Some(pending_block) = self.current.as_ref() { - // Delete built block if it corresponds to a different height - if pending_block.block().header().number() == flashblock.metadata.block_number { - info!( - message = "None sequential Flashblocks, keeping cache", - curr_block = %pending_block.block().header().number(), - new_block = %flashblock.metadata.block_number, - ); - } else { - error!( - message = "Received Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock", - curr_block = %pending_block.block().header().number(), - new_block = %flashblock.metadata.block_number, - ); - - self.clear(); - } - } else { - debug!("ignoring {flashblock:?}"); - } - } + /// Returns None if the flashblock doesn't attach to the latest header. + fn execute(&mut self) -> eyre::Result>> { + trace!("Attempting new flashblock"); - /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received using - /// [`Self::add_flash_block`]. - /// Builds a pending block using the configured provider and pool. - /// - /// If the origin is the actual pending block, the block is built with withdrawals. - /// - /// After Cancun, if the origin is the actual pending block, the block includes the EIP-4788 pre - /// block contract call using the parent beacon block root received from the CL. - pub fn execute(&mut self) -> eyre::Result> { let latest = self .provider .latest_header()? .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; let latest_hash = latest.hash(); - let attrs = self - .blocks - .first() - .and_then(|v| v.base.clone()) - .ok_or_eyre("Missing base flashblock")?; + let Some(attrs) = self.blocks.payload_base() else { + trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base"); + return Ok(None) + }; if attrs.parent_hash != latest_hash { - return Err(eyre!("The base flashblock is old")); + trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock"); + // doesn't attach to the latest block + return Ok(None) } let state_provider = self.provider.history_by_block_hash(latest.hash())?; @@ -182,7 +129,7 @@ impl< builder.apply_pre_execution_changes()?; - let transactions = self.blocks.iter().flat_map(|v| v.diff.transactions.clone()); + let transactions = self.blocks.iter_ready().flat_map(|v| v.diff.transactions.clone()); for encoded in transactions { let tx = N::SignedTx::decode_2718_exact(encoded.as_ref())?; @@ -204,95 +151,156 @@ impl< // update cached reads self.update_cached_reads(latest_hash, request_cache); - Ok(PendingBlock::with_executed_block( + Ok(Some(PendingBlock::with_executed_block( Instant::now() + Duration::from_secs(1), ExecutedBlock { recovered_block: block.into(), execution_output: Arc::new(execution_outcome), hashed_state: Arc::new(hashed_state), }, - )) - } - - /// Compares tip from the last notification of [`CanonStateSubscriptions`] with last computed - /// pending block and verifies that the tip is the parent of the pending block. - /// - /// Returns: - /// * `Ok(Some(true))` if tip == parent - /// * `Ok(Some(false))` if tip != parent - /// * `Ok(None)` if there weren't any new notifications or the pending block is not built - /// * `Err` if the cannon state receiver returned an error - fn verify_pending_block_integrity( - &mut self, - cx: &mut Context<'_>, - ) -> eyre::Result> { - let mut tip = None; - let fut = self.canon_receiver.recv(); - pin!(fut); - - while let Poll::Ready(result) = fut.poll_unpin(cx) { - tip = result?.tip_checked().map(RecoveredBlock::hash); - } - - Ok(tip - .zip(self.current.as_ref().map(PendingBlock::parent_hash)) - .map(|(latest, parent)| latest == parent)) + ))) } } -impl< - N: NodePrimitives, - S: Stream> + Unpin, - EvmConfig: ConfigureEvm + Unpin>, - Provider: StateProviderFactory - + CanonStateSubscriptions - + BlockReaderIdExt< - Header = HeaderTy, - Block = BlockTy, - Transaction = N::SignedTx, - Receipt = ReceiptTy, - > + Unpin, - > Stream for FlashBlockService +impl Stream for FlashBlockService +where + N: NodePrimitives, + S: Stream> + Unpin, + EvmConfig: ConfigureEvm + Unpin>, + Provider: StateProviderFactory + + CanonStateSubscriptions + + BlockReaderIdExt< + Header = HeaderTy, + Block = BlockTy, + Transaction = N::SignedTx, + Receipt = ReceiptTy, + > + Unpin, { type Item = eyre::Result>>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // Consume new flashblocks while they're ready + let mut new_flashblock = false; + // consume new flashblocks while they're ready while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) { match result { - Ok(flashblock) => this.add_flash_block(flashblock), + Ok(flashblock) => { + new_flashblock = true; + this.blocks.insert(flashblock) + } Err(err) => return Poll::Ready(Some(Err(err))), } } - // Execute block if there are flashblocks but no last pending block - let changed = if this.current.is_none() && !this.blocks.is_empty() { - match this.execute() { - Ok(block) => this.current = Some(block), - Err(err) => return Poll::Ready(Some(Err(err))), + // advance new canonical message, if any to reset flashblock + { + let fut = this.canon_receiver.recv(); + pin!(fut); + if fut.poll_unpin(cx).is_ready() { + // if we have a new canonical message, we know the currently tracked flashblock is + // invalidated + if this.current.take().is_some() { + trace!("Clearing current flashblock on new canonical block"); + return Poll::Ready(Some(Ok(None))) + } } + } - true - } else { - false - }; + if !new_flashblock && this.current.is_none() { + // no new flashbblocks received since, block is still unchanged + return Poll::Pending + } - // Verify that pending block is following up to the canonical state - match this.verify_pending_block_integrity(cx) { - // Integrity check failed: erase last block - Ok(Some(false)) => Poll::Ready(Some(Ok(None))), - // Integrity check is OK or skipped: output last block - Ok(Some(true) | None) => { - if changed { - Poll::Ready(Some(Ok(this.current.clone()))) - } else { - Poll::Pending - } + // try to build a block on top of latest + match this.execute() { + Ok(Some(new_pending)) => { + // built a new pending block + this.current = Some(new_pending.clone()); + return Poll::Ready(Some(Ok(Some(new_pending)))); + } + Ok(None) => { + // nothing to do because tracked flashblock doesn't attach to latest + } + Err(err) => { + // we can ignore this error + debug!(%err, "failed to execute flashblock"); } - // Cannot check integrity: error occurred - Err(err) => Poll::Ready(Some(Err(err))), } + + Poll::Pending + } +} + +/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index. +#[derive(Debug)] +struct FlashBlockSequence { + /// tracks the individual flashblocks in order + /// + /// With a blocktime of 2s and flashblock tickrate of ~200ms, we expect 10 or 11 flashblocks + /// per slot. + inner: BTreeMap, +} + +impl FlashBlockSequence { + const fn new() -> Self { + Self { inner: BTreeMap::new() } + } + + /// Inserts a new block into the sequence. + /// + /// A [`FlashBlock`] with index 0 resets the set. + fn insert(&mut self, flashblock: FlashBlock) { + if flashblock.index == 0 { + trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence"); + // Flash block at index zero resets the whole state + self.clear(); + self.inner.insert(flashblock.index, flashblock); + return + } + + // only insert if we we previously received the same block, assume we received index 0 + if self.block_number() == Some(flashblock.metadata.block_number) { + trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock"); + self.inner.insert(flashblock.index, flashblock); + } else { + trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following"); + } + } + + /// Returns the number of tracked flashblocks. + fn count(&self) -> usize { + self.inner.len() + } + + /// Returns the first block number + fn block_number(&self) -> Option { + Some(self.inner.values().next()?.metadata.block_number) + } + + /// Returns the payload base of the first tracked flashblock. + fn payload_base(&self) -> Option { + self.inner.values().next()?.base.clone() + } + + fn clear(&mut self) { + self.inner.clear(); + } + + /// Iterator over sequence of ready flashblocks + /// + /// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in + /// the sequence + /// + /// Note: flashblocks start at `index 0`. + fn iter_ready(&self) -> impl Iterator { + self.inner + .values() + .enumerate() + .take_while(|(idx, block)| { + // flashblock index 0 is the first flashblock + block.index == *idx as u64 + }) + .map(|(_, block)| block) } } diff --git a/crates/optimism/flashblocks/src/ws/decoding.rs b/crates/optimism/flashblocks/src/ws/decoding.rs index d96601a4f86..95de2f1a232 100644 --- a/crates/optimism/flashblocks/src/ws/decoding.rs +++ b/crates/optimism/flashblocks/src/ws/decoding.rs @@ -4,6 +4,7 @@ use alloy_rpc_types_engine::PayloadId; use serde::{Deserialize, Serialize}; use std::{fmt::Debug, io}; +/// Internal helper for decoding #[derive(Clone, Debug, PartialEq, Default, Deserialize, Serialize)] struct FlashblocksPayloadV1 { /// The payload id of the flashblock diff --git a/crates/optimism/flashblocks/src/ws/stream.rs b/crates/optimism/flashblocks/src/ws/stream.rs index faeda7c3564..8a8438b0878 100644 --- a/crates/optimism/flashblocks/src/ws/stream.rs +++ b/crates/optimism/flashblocks/src/ws/stream.rs @@ -1,5 +1,4 @@ use crate::FlashBlock; -use eyre::eyre; use futures_util::{stream::SplitStream, FutureExt, Stream, StreamExt}; use std::{ fmt::{Debug, Formatter}, @@ -13,6 +12,7 @@ use tokio_tungstenite::{ tungstenite::{Error, Message}, MaybeTlsStream, WebSocketStream, }; +use tracing::debug; use url::Url; /// An asynchronous stream of [`FlashBlock`] from a websocket connection. @@ -78,17 +78,27 @@ where } } - let msg = ready!(self - .stream - .as_mut() - .expect("Stream state should be unreachable without stream") - .poll_next_unpin(cx)); - - Poll::Ready(msg.map(|msg| match msg { - Ok(Message::Binary(bytes)) => FlashBlock::decode(bytes), - Ok(msg) => Err(eyre!("Unexpected websocket message: {msg:?}")), - Err(err) => Err(err.into()), - })) + loop { + let Some(msg) = ready!(self + .stream + .as_mut() + .expect("Stream state should be unreachable without stream") + .poll_next_unpin(cx)) + else { + return Poll::Ready(None); + }; + + match msg { + Ok(Message::Binary(bytes)) => return Poll::Ready(Some(FlashBlock::decode(bytes))), + Ok(Message::Ping(_) | Message::Pong(_)) => { + // can ginore for now + } + Ok(msg) => { + debug!("Received unexpected message: {:?}", msg); + } + Err(err) => return Poll::Ready(Some(Err(err.into()))), + } + } } } @@ -249,7 +259,7 @@ mod tests { &mut self, _ws_url: Url, ) -> impl Future> + Send + Sync { - future::ready(Err(eyre!("{}", &self.0))) + future::ready(Err(eyre::eyre!("{}", &self.0))) } } @@ -304,17 +314,11 @@ mod tests { } #[tokio::test] - async fn test_stream_fails_to_decode_non_binary_message() { + async fn test_stream_ignores_non_binary_message() { let messages = FakeConnector::from([Ok(Message::Text(Utf8Bytes::from("test")))]); let ws_url = "http://localhost".parse().unwrap(); - let stream = WsFlashBlockStream::with_connector(ws_url, messages); - - let actual_messages: Vec<_> = - stream.map(Result::unwrap_err).map(|e| format!("{e}")).collect().await; - let expected_messages = - vec!["Unexpected websocket message: Text(Utf8Bytes(b\"test\"))".to_owned()]; - - assert_eq!(actual_messages, expected_messages); + let mut stream = WsFlashBlockStream::with_connector(ws_url, messages); + assert!(stream.next().await.is_none()); } #[tokio::test]