Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/optimism/flashblocks/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ pub struct FlashBlock {
pub metadata: Metadata,
}

impl FlashBlock {
/// Returns the block number of this flashblock.
pub const fn block_number(&self) -> u64 {
self.metadata.block_number
}

/// Returns the first parent hash of this flashblock.
pub fn parent_hash(&self) -> Option<B256> {
Some(self.base.as_ref()?.parent_hash)
}
}

/// Provides metadata about the block that may be useful for indexing or analysis.
#[derive(Default, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Metadata {
Expand Down
276 changes: 142 additions & 134 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag, Decodable2718};
use alloy_primitives::B256;
use eyre::{eyre, OptionExt};
use futures_util::{FutureExt, Stream, StreamExt};
use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock};
use reth_errors::RethError;
Expand All @@ -11,20 +10,20 @@ use reth_evm::{
};
use reth_execution_types::ExecutionOutcome;
use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, RecoveredBlock,
SignedTransaction,
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, SignedTransaction,
};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use std::{
collections::BTreeMap,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::pin;
use tracing::{debug, error, info};
use tracing::{debug, trace};

/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
/// [`FlashBlock`]s.
Expand All @@ -37,7 +36,7 @@ pub struct FlashBlockService<
> {
rx: S,
current: Option<PendingBlock<N>>,
blocks: Vec<FlashBlock>,
blocks: FlashBlockSequence,
evm_config: EvmConfig,
provider: Provider,
canon_receiver: CanonStateNotifications<N>,
Expand Down Expand Up @@ -67,7 +66,7 @@ impl<
Self {
rx,
current: None,
blocks: Vec::new(),
blocks: FlashBlockSequence::new(),
evm_config,
canon_receiver: provider.subscribe_to_canonical_state(),
provider,
Expand All @@ -94,79 +93,27 @@ impl<
self.cached_state = Some((head, cached_reads));
}

/// Clear the state of the service, including:
/// - All flashblocks sequence of the current pending block
/// - Invalidate latest pending block built
/// - Cache
fn clear(&mut self) {
self.blocks.clear();
self.current.take();
self.cached_state.take();
}

/// Adds the `block` into the collection.
/// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier.
///
/// Depending on its index and associated block number, it may:
/// * Be added to all the flashblocks received prior using this function.
/// * Cause a reset of the flashblocks and become the sole member of the collection.
/// * Be ignored.
pub fn add_flash_block(&mut self, flashblock: FlashBlock) {
// Flash block at index zero resets the whole state
if flashblock.index == 0 {
self.clear();
self.blocks.push(flashblock);
}
// Flash block at the following index adds to the collection and invalidates built block
else if flashblock.index == self.blocks.last().map(|last| last.index + 1).unwrap_or(0) {
self.blocks.push(flashblock);
self.current.take();
}
// Flash block at a different index is ignored
else if let Some(pending_block) = self.current.as_ref() {
// Delete built block if it corresponds to a different height
if pending_block.block().header().number() == flashblock.metadata.block_number {
info!(
message = "None sequential Flashblocks, keeping cache",
curr_block = %pending_block.block().header().number(),
new_block = %flashblock.metadata.block_number,
);
} else {
error!(
message = "Received Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock",
curr_block = %pending_block.block().header().number(),
new_block = %flashblock.metadata.block_number,
);

self.clear();
}
} else {
debug!("ignoring {flashblock:?}");
}
}
/// Returns None if the flashblock doesn't attach to the latest header.
fn execute(&mut self) -> eyre::Result<Option<PendingBlock<N>>> {
trace!("Attempting new flashblock");

/// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received using
/// [`Self::add_flash_block`].
/// Builds a pending block using the configured provider and pool.
///
/// If the origin is the actual pending block, the block is built with withdrawals.
///
/// After Cancun, if the origin is the actual pending block, the block includes the EIP-4788 pre
/// block contract call using the parent beacon block root received from the CL.
pub fn execute(&mut self) -> eyre::Result<PendingBlock<N>> {
let latest = self
.provider
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let latest_hash = latest.hash();

let attrs = self
.blocks
.first()
.and_then(|v| v.base.clone())
.ok_or_eyre("Missing base flashblock")?;
let Some(attrs) = self.blocks.payload_base() else {
trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base");
return Ok(None)
};

if attrs.parent_hash != latest_hash {
return Err(eyre!("The base flashblock is old"));
trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
// doesn't attach to the latest block
return Ok(None)
}

let state_provider = self.provider.history_by_block_hash(latest.hash())?;
Expand All @@ -182,7 +129,7 @@ impl<

builder.apply_pre_execution_changes()?;

let transactions = self.blocks.iter().flat_map(|v| v.diff.transactions.clone());
let transactions = self.blocks.iter_ready().flat_map(|v| v.diff.transactions.clone());

for encoded in transactions {
let tx = N::SignedTx::decode_2718_exact(encoded.as_ref())?;
Expand All @@ -204,95 +151,156 @@ impl<
// update cached reads
self.update_cached_reads(latest_hash, request_cache);

Ok(PendingBlock::with_executed_block(
Ok(Some(PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
recovered_block: block.into(),
execution_output: Arc::new(execution_outcome),
hashed_state: Arc::new(hashed_state),
},
))
}

/// Compares tip from the last notification of [`CanonStateSubscriptions`] with last computed
/// pending block and verifies that the tip is the parent of the pending block.
///
/// Returns:
/// * `Ok(Some(true))` if tip == parent
/// * `Ok(Some(false))` if tip != parent
/// * `Ok(None)` if there weren't any new notifications or the pending block is not built
/// * `Err` if the cannon state receiver returned an error
fn verify_pending_block_integrity(
&mut self,
cx: &mut Context<'_>,
) -> eyre::Result<Option<bool>> {
let mut tip = None;
let fut = self.canon_receiver.recv();
pin!(fut);

while let Poll::Ready(result) = fut.poll_unpin(cx) {
tip = result?.tip_checked().map(RecoveredBlock::hash);
}

Ok(tip
.zip(self.current.as_ref().map(PendingBlock::parent_hash))
.map(|(latest, parent)| latest == parent))
)))
}
}

impl<
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
Header = HeaderTy<N>,
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
> Stream for FlashBlockService<N, S, EvmConfig, Provider>
impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
where
N: NodePrimitives,
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
Provider: StateProviderFactory
+ CanonStateSubscriptions<Primitives = N>
+ BlockReaderIdExt<
Header = HeaderTy<N>,
Block = BlockTy<N>,
Transaction = N::SignedTx,
Receipt = ReceiptTy<N>,
> + Unpin,
{
type Item = eyre::Result<Option<PendingBlock<N>>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

// Consume new flashblocks while they're ready
let mut new_flashblock = false;
// consume new flashblocks while they're ready
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
match result {
Ok(flashblock) => this.add_flash_block(flashblock),
Ok(flashblock) => {
new_flashblock = true;
this.blocks.insert(flashblock)
}
Err(err) => return Poll::Ready(Some(Err(err))),
}
}

// Execute block if there are flashblocks but no last pending block
let changed = if this.current.is_none() && !this.blocks.is_empty() {
match this.execute() {
Ok(block) => this.current = Some(block),
Err(err) => return Poll::Ready(Some(Err(err))),
// advance new canonical message, if any to reset flashblock
{
let fut = this.canon_receiver.recv();
pin!(fut);
if fut.poll_unpin(cx).is_ready() {
// if we have a new canonical message, we know the currently tracked flashblock is
// invalidated
if this.current.take().is_some() {
trace!("Clearing current flashblock on new canonical block");
return Poll::Ready(Some(Ok(None)))
}
}
}

true
} else {
false
};
if !new_flashblock && this.current.is_none() {
// no new flashbblocks received since, block is still unchanged
return Poll::Pending
}

// Verify that pending block is following up to the canonical state
match this.verify_pending_block_integrity(cx) {
// Integrity check failed: erase last block
Ok(Some(false)) => Poll::Ready(Some(Ok(None))),
// Integrity check is OK or skipped: output last block
Ok(Some(true) | None) => {
if changed {
Poll::Ready(Some(Ok(this.current.clone())))
} else {
Poll::Pending
}
// try to build a block on top of latest
match this.execute() {
Ok(Some(new_pending)) => {
// built a new pending block
this.current = Some(new_pending.clone());
return Poll::Ready(Some(Ok(Some(new_pending))));
}
Ok(None) => {
// nothing to do because tracked flashblock doesn't attach to latest
}
Err(err) => {
// we can ignore this error
debug!(%err, "failed to execute flashblock");
}
// Cannot check integrity: error occurred
Err(err) => Poll::Ready(Some(Err(err))),
}

Poll::Pending
}
}

/// Simple wrapper around an ordered B-tree to keep track of a sequence of flashblocks by index.
#[derive(Debug)]
struct FlashBlockSequence {
/// tracks the individual flashblocks in order
///
/// With a blocktime of 2s and flashblock tickrate of ~200ms, we expect 10 or 11 flashblocks
/// per slot.
inner: BTreeMap<u64, FlashBlock>,
}

impl FlashBlockSequence {
const fn new() -> Self {
Self { inner: BTreeMap::new() }
}

/// Inserts a new block into the sequence.
///
/// A [`FlashBlock`] with index 0 resets the set.
fn insert(&mut self, flashblock: FlashBlock) {
if flashblock.index == 0 {
trace!(number=%flashblock.block_number(), "Tracking new flashblock sequence");
// Flash block at index zero resets the whole state
self.clear();
self.inner.insert(flashblock.index, flashblock);
return
}

// only insert if we we previously received the same block, assume we received index 0
if self.block_number() == Some(flashblock.metadata.block_number) {
trace!(number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock");
self.inner.insert(flashblock.index, flashblock);
} else {
trace!(number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following");
}
}

/// Returns the number of tracked flashblocks.
fn count(&self) -> usize {
self.inner.len()
}

/// Returns the first block number
fn block_number(&self) -> Option<u64> {
Some(self.inner.values().next()?.metadata.block_number)
}

/// Returns the payload base of the first tracked flashblock.
fn payload_base(&self) -> Option<ExecutionPayloadBaseV1> {
self.inner.values().next()?.base.clone()
}

fn clear(&mut self) {
self.inner.clear();
}

/// Iterator over sequence of ready flashblocks
///
/// A flashblocks is not ready if there's missing previous flashblocks, i.e. there's a gap in
/// the sequence
///
/// Note: flashblocks start at `index 0`.
fn iter_ready(&self) -> impl Iterator<Item = &FlashBlock> {
self.inner
.values()
.enumerate()
.take_while(|(idx, block)| {
// flashblock index 0 is the first flashblock
block.index == *idx as u64
})
.map(|(_, block)| block)
}
}
1 change: 1 addition & 0 deletions crates/optimism/flashblocks/src/ws/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use alloy_rpc_types_engine::PayloadId;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io};

/// Internal helper for decoding
#[derive(Clone, Debug, PartialEq, Default, Deserialize, Serialize)]
struct FlashblocksPayloadV1 {
/// The payload id of the flashblock
Expand Down
Loading