Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 54 additions & 29 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State}
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use std::{
cmp::Ordering,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::pin;
use tracing::{debug, error, info};
use tracing::{debug, error};

/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
/// [`FlashBlock`]s.
Expand All @@ -37,6 +38,7 @@ pub struct FlashBlockService<
> {
rx: S,
current: Option<PendingBlock<N>>,
height: Option<u64>,
blocks: Vec<FlashBlock>,
evm_config: EvmConfig,
provider: Provider,
Expand Down Expand Up @@ -67,6 +69,7 @@ impl<
Self {
rx,
current: None,
height: None,
blocks: Vec::new(),
evm_config,
canon_receiver: provider.subscribe_to_canonical_state(),
Expand Down Expand Up @@ -111,36 +114,58 @@ impl<
/// * Cause a reset of the flashblocks and become the sole member of the collection.
/// * Be ignored.
pub fn add_flash_block(&mut self, flashblock: FlashBlock) {
// Flash block at index zero resets the whole state
if flashblock.index == 0 {
self.clear();
self.blocks.push(flashblock);
}
// Flash block at the following index adds to the collection and invalidates built block
else if flashblock.index == self.blocks.last().map(|last| last.index + 1).unwrap_or(0) {
self.blocks.push(flashblock);
self.current.take();
}
// Flash block at a different index is ignored
else if let Some(pending_block) = self.current.as_ref() {
// Delete built block if it corresponds to a different height
if pending_block.block().header().number() == flashblock.metadata.block_number {
info!(
message = "None sequential Flashblocks, keeping cache",
curr_block = %pending_block.block().header().number(),
new_block = %flashblock.metadata.block_number,
);
} else {
error!(
message = "Received Flashblock for new block, zeroing Flashblocks until we receive a base Flashblock",
curr_block = %pending_block.block().header().number(),
new_block = %flashblock.metadata.block_number,
);
let height = match self.height {
Some(height) => height,
None => {
if flashblock.index != 0 {
error!(message = "Receiving first flashblock with non zero index");
return;
}
// extract height from base flashblock
self.height = Some(flashblock.metadata.block_number);
flashblock.metadata.block_number
}
};

// compare flashblock block number against current pending block number
// check against parent hash is done in `execute`
match flashblock.metadata.block_number.cmp(&height) {
Ordering::Equal => {
// Current pending block, directly process it

self.clear();
// Fb index zero resets the whole state (base flashblock)
if flashblock.index == 0 {
self.clear();
self.blocks.push(flashblock);
}
// Fb at the following index adds to the collection and invalidates built block
else if flashblock.index ==
self.blocks.last().map(|last| last.index + 1).unwrap_or(0)
{
self.blocks.push(flashblock);
// invalidate current built block
self.current.take();
} else {
// TODO: store non sequential flashblock index for later processing
debug!("ignoring early {flashblock:?} at current {height}");
}
}
Ordering::Greater => {
// Flashblocks for pending block not parent of current canonical head,
// Store for future processing when current chain sync
// TODO: store non sequential block target fb for later processing
// but for now, we still reset the sequence on each base fb
if flashblock.index == 0 {
self.clear();
self.height = Some(flashblock.metadata.block_number);
self.blocks.push(flashblock);
} else {
debug!("early {flashblock:?} at current {height}");
}
}
Ordering::Less => {
debug!("ignoring old {flashblock:?} at current {height}");
}
} else {
debug!("ignoring {flashblock:?}");
}
}

Expand Down