Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions crates/optimism/flashblocks/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{ExecutionPayloadBaseV1, FlashBlock};
use alloy_eips::{eip2718::WithEncoded, BlockNumberOrTag, Decodable2718};
use alloy_primitives::B256;
use eyre::{eyre, OptionExt};
use futures_util::{FutureExt, Stream, StreamExt};
use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock};
Expand All @@ -13,7 +14,7 @@ use reth_primitives_traits::{
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, RecoveredBlock,
SignedTransaction,
};
use reth_revm::{database::StateProviderDatabase, db::State};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
use reth_rpc_eth_types::{EthApiError, PendingBlock};
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
use std::{
Expand All @@ -40,6 +41,11 @@ pub struct FlashBlockService<
evm_config: EvmConfig,
provider: Provider,
canon_receiver: CanonStateNotifications<N>,
/// Cached state reads for the current block.
/// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when
/// fb received on top of the same block. Avoid redundant I/O across multiple executions
/// within the same block.
cached_state: Option<(B256, CachedReads)>,
}

impl<
Expand All @@ -65,9 +71,39 @@ impl<
evm_config,
canon_receiver: provider.subscribe_to_canonical_state(),
provider,
cached_state: None,
}
}

/// Returns the cached reads at the given head hash.
///
/// Returns a new cache instance if this is new `head` hash.
fn cached_reads(&mut self, head: B256) -> CachedReads {
if let Some((tracked, cache)) = self.cached_state.take() {
if tracked == head {
return cache
}
}

// instantiate a new cache instance
CachedReads::default()
}

/// Updates the cached reads at the given head hash
fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) {
self.cached_state = Some((head, cached_reads));
}

/// Clear the state of the service, including:
/// - All flashblocks sequence of the current pending block
/// - Invalidate latest pending block built
/// - Cache
fn clear(&mut self) {
self.blocks.clear();
self.current.take();
self.cached_state.take();
}

/// Adds the `block` into the collection.
///
/// Depending on its index and associated block number, it may:
Expand All @@ -77,8 +113,8 @@ impl<
pub fn add_flash_block(&mut self, flashblock: FlashBlock) {
// Flash block at index zero resets the whole state
if flashblock.index == 0 {
self.blocks = vec![flashblock];
self.current.take();
self.clear();
self.blocks.push(flashblock);
}
// Flash block at the following index adds to the collection and invalidates built block
else if flashblock.index == self.blocks.last().map(|last| last.index + 1).unwrap_or(0) {
Expand All @@ -101,8 +137,7 @@ impl<
new_block = %flashblock.metadata.block_number,
);

self.blocks.clear();
self.current.take();
self.clear();
}
} else {
debug!("ignoring {flashblock:?}");
Expand All @@ -122,24 +157,27 @@ impl<
.provider
.latest_header()?
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
let latest_hash = latest.hash();

let attrs = self
.blocks
.iter()
.find_map(|v| v.base.clone())
.ok_or_eyre("Missing base flashblock")?;

if attrs.parent_hash != latest.hash() {
if attrs.parent_hash != latest_hash {
return Err(eyre!("The base flashblock is old"));
}

let state_provider = self.provider.history_by_block_hash(latest.hash())?;
let state = StateProviderDatabase::new(&state_provider);
let mut db = State::builder().with_database(state).with_bundle_update().build();

let mut request_cache = self.cached_reads(latest_hash);
let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
let mut state = State::builder().with_database(cached_db).with_bundle_update().build();

let mut builder = self
.evm_config
.builder_for_next_block(&mut db, &latest, attrs.into())
.builder_for_next_block(&mut state, &latest, attrs.into())
.map_err(RethError::other)?;

builder.apply_pre_execution_changes()?;
Expand All @@ -157,12 +195,15 @@ impl<
builder.finish(NoopProvider::default())?;

let execution_outcome = ExecutionOutcome::new(
db.take_bundle(),
state.take_bundle(),
vec![execution_result.receipts],
block.number(),
vec![execution_result.requests],
);

// update cached reads
self.update_cached_reads(latest_hash, request_cache);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks self.current is never set to Some.

What about:

let pending = PendingBlock::with_executed_block(
            Instant::now() + Duration::from_secs(1),
            ExecutedBlock {
                recovered_block: block.into(),
                execution_output: Arc::new(execution_outcome),
                hashed_state: Arc::new(hashed_state),
            },
        );

 self.current = Some(pending.clone())
 Ok(pending)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set within stream impl here

Ok(PendingBlock::with_executed_block(
Instant::now() + Duration::from_secs(1),
ExecutedBlock {
Expand Down