Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ where
) -> Result<(), PayloadBuilderError> {
let block_build_start_time = Instant::now();
let BuildArguments {
mut cached_reads,
config,
cancel: block_cancel,
..
} = args;

// We log only every 100th block to reduce usage
Expand Down Expand Up @@ -264,7 +264,7 @@ where
// 1. execute the pre steps and seal an early block with that
let sequencer_tx_start_time = Instant::now();
let mut state = State::builder()
.with_database(db)
.with_database(cached_reads.as_db_mut(db))
.with_bundle_update()
.build();

Expand Down
55 changes: 51 additions & 4 deletions crates/op-rbuilder/src/builders/generator.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use alloy_primitives::B256;
use futures_util::{Future, FutureExt};
use reth::{
providers::{BlockReaderIdExt, StateProviderFactory},
tasks::TaskSpawner,
};
use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, HeaderForPayload, PayloadConfig};
use reth_node_api::{PayloadBuilderAttributes, PayloadKind};
use reth_basic_payload_builder::{
BasicPayloadJobGeneratorConfig, HeaderForPayload, PayloadConfig, PrecachedState,
};
use reth_node_api::{NodePrimitives, PayloadBuilderAttributes, PayloadKind};
use reth_payload_builder::{
KeepPayloadJobAlive, PayloadBuilderError, PayloadJob, PayloadJobGenerator,
};
use reth_payload_primitives::BuiltPayload;
use reth_primitives_traits::HeaderTy;
use reth_provider::CanonStateNotification;
use reth_revm::cached::CachedReads;
use std::{
sync::{Arc, Mutex},
Expand Down Expand Up @@ -74,6 +78,8 @@ pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
last_payload: Arc<Mutex<CancellationToken>>,
/// The extra block deadline in seconds
extra_block_deadline: std::time::Duration,
/// Stored `cached_reads` for new payload jobs.
pre_cached: Option<PrecachedState>,
}

// === impl EmptyBlockPayloadJobGenerator ===
Expand All @@ -97,8 +103,18 @@ impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
ensure_only_one_payload,
last_payload: Arc::new(Mutex::new(CancellationToken::new())),
extra_block_deadline,
pre_cached: None,
}
}

/// Returns the pre-cached reads for the given parent header if it matches the cached state's
/// block.
fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
self.pre_cached
.as_ref()
.filter(|pc| pc.block == parent)
.map(|pc| pc.cached.clone())
}
}

impl<Client, Tasks, Builder> PayloadJobGenerator
Expand Down Expand Up @@ -182,12 +198,38 @@ where
cancel: cancel_token,
deadline,
build_complete: None,
cached_reads: self.maybe_pre_cached(parent_header.hash()),
};

job.spawn_build_job();

Ok(job)
}

fn on_new_state<N: NodePrimitives>(&mut self, new_state: CanonStateNotification<N>) {
let mut cached = CachedReads::default();

// extract the state from the notification and put it into the cache
let committed = new_state.committed();
let new_execution_outcome = committed.execution_outcome();
for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
if let Some(info) = acc.info.clone() {
// we want pre cache existing accounts and their storage
// this only includes changed accounts and storage but is better than nothing
let storage = acc
.storage
.iter()
.map(|(key, slot)| (*key, slot.present_value))
.collect();
cached.insert_account(addr, info, storage);
}
}

self.pre_cached = Some(PrecachedState {
block: committed.tip().hash(),
cached,
});
}
}

use std::{
Expand All @@ -214,6 +256,11 @@ where
pub(crate) cancel: CancellationToken,
pub(crate) deadline: Pin<Box<Sleep>>, // Add deadline
pub(crate) build_complete: Option<oneshot::Receiver<Result<(), PayloadBuilderError>>>,
/// Caches all disk reads for the state the new payloads builds on
///
/// This is used to avoid reading the same state over and over again when new attempts are
/// triggered, because during the building process we'll repeatedly execute the transactions.
pub(crate) cached_reads: Option<CachedReads>,
}

impl<Tasks, Builder> PayloadJob for BlockPayloadJob<Tasks, Builder>
Expand Down Expand Up @@ -274,10 +321,10 @@ where

let (tx, rx) = oneshot::channel();
self.build_complete = Some(rx);

let cached_reads = self.cached_reads.take().unwrap_or_default();
self.executor.spawn_blocking(Box::pin(async move {
let args = BuildArguments {
cached_reads: Default::default(),
cached_reads,
config: payload_config,
cancel,
};
Expand Down
Loading