Skip to content

Commit 0ec0644

Browse files
avaloncheSozinMakundaz
authored
Add caching to generator (#221)
* WIP right now it has consensus error 2025-08-01T16:10:44.956046Z ERROR engine::persistence: Persistence service failed err=ProviderError(Database(Write(DatabaseWriteError { info: DatabaseErrorInfo { message: "the given key value is mismatched to the current cursor position", code: -30418 }, operation: CursorAppendDup, table_name: "AccountChangeSets", key: [0, 0, 0, 0, 0, 0, 0, 9] }))) * fmt * cache reth db reads in flashblocks payload generation * save tip to cache on new committed state --------- Co-authored-by: Solar Mithril <[email protected]> Co-authored-by: Ash Kunda <[email protected]>
1 parent 3e3ead7 commit 0ec0644

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

crates/op-rbuilder/src/builders/flashblocks/payload.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ where
193193
) -> Result<(), PayloadBuilderError> {
194194
let block_build_start_time = Instant::now();
195195
let BuildArguments {
196+
mut cached_reads,
196197
config,
197198
cancel: block_cancel,
198-
..
199199
} = args;
200200

201201
// We log only every 100th block to reduce usage
@@ -264,7 +264,7 @@ where
264264
// 1. execute the pre steps and seal an early block with that
265265
let sequencer_tx_start_time = Instant::now();
266266
let mut state = State::builder()
267-
.with_database(db)
267+
.with_database(cached_reads.as_db_mut(db))
268268
.with_bundle_update()
269269
.build();
270270

crates/op-rbuilder/src/builders/generator.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1+
use alloy_primitives::B256;
12
use futures_util::{Future, FutureExt};
23
use reth::{
34
providers::{BlockReaderIdExt, StateProviderFactory},
45
tasks::TaskSpawner,
56
};
6-
use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, HeaderForPayload, PayloadConfig};
7-
use reth_node_api::{PayloadBuilderAttributes, PayloadKind};
7+
use reth_basic_payload_builder::{
8+
BasicPayloadJobGeneratorConfig, HeaderForPayload, PayloadConfig, PrecachedState,
9+
};
10+
use reth_node_api::{NodePrimitives, PayloadBuilderAttributes, PayloadKind};
811
use reth_payload_builder::{
912
KeepPayloadJobAlive, PayloadBuilderError, PayloadJob, PayloadJobGenerator,
1013
};
1114
use reth_payload_primitives::BuiltPayload;
1215
use reth_primitives_traits::HeaderTy;
16+
use reth_provider::CanonStateNotification;
1317
use reth_revm::cached::CachedReads;
1418
use std::{
1519
sync::{Arc, Mutex},
@@ -74,6 +78,8 @@ pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
7478
last_payload: Arc<Mutex<CancellationToken>>,
7579
/// The extra block deadline in seconds
7680
extra_block_deadline: std::time::Duration,
81+
/// Stored `cached_reads` for new payload jobs.
82+
pre_cached: Option<PrecachedState>,
7783
}
7884

7985
// === impl EmptyBlockPayloadJobGenerator ===
@@ -97,8 +103,18 @@ impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
97103
ensure_only_one_payload,
98104
last_payload: Arc::new(Mutex::new(CancellationToken::new())),
99105
extra_block_deadline,
106+
pre_cached: None,
100107
}
101108
}
109+
110+
/// Returns the pre-cached reads for the given parent header if it matches the cached state's
111+
/// block.
112+
fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
113+
self.pre_cached
114+
.as_ref()
115+
.filter(|pc| pc.block == parent)
116+
.map(|pc| pc.cached.clone())
117+
}
102118
}
103119

104120
impl<Client, Tasks, Builder> PayloadJobGenerator
@@ -182,12 +198,38 @@ where
182198
cancel: cancel_token,
183199
deadline,
184200
build_complete: None,
201+
cached_reads: self.maybe_pre_cached(parent_header.hash()),
185202
};
186203

187204
job.spawn_build_job();
188205

189206
Ok(job)
190207
}
208+
209+
fn on_new_state<N: NodePrimitives>(&mut self, new_state: CanonStateNotification<N>) {
210+
let mut cached = CachedReads::default();
211+
212+
// extract the state from the notification and put it into the cache
213+
let committed = new_state.committed();
214+
let new_execution_outcome = committed.execution_outcome();
215+
for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
216+
if let Some(info) = acc.info.clone() {
217+
// we want pre cache existing accounts and their storage
218+
// this only includes changed accounts and storage but is better than nothing
219+
let storage = acc
220+
.storage
221+
.iter()
222+
.map(|(key, slot)| (*key, slot.present_value))
223+
.collect();
224+
cached.insert_account(addr, info, storage);
225+
}
226+
}
227+
228+
self.pre_cached = Some(PrecachedState {
229+
block: committed.tip().hash(),
230+
cached,
231+
});
232+
}
191233
}
192234

193235
use std::{
@@ -214,6 +256,11 @@ where
214256
pub(crate) cancel: CancellationToken,
215257
pub(crate) deadline: Pin<Box<Sleep>>, // Add deadline
216258
pub(crate) build_complete: Option<oneshot::Receiver<Result<(), PayloadBuilderError>>>,
259+
/// Caches all disk reads for the state the new payloads builds on
260+
///
261+
/// This is used to avoid reading the same state over and over again when new attempts are
262+
/// triggered, because during the building process we'll repeatedly execute the transactions.
263+
pub(crate) cached_reads: Option<CachedReads>,
217264
}
218265

219266
impl<Tasks, Builder> PayloadJob for BlockPayloadJob<Tasks, Builder>
@@ -274,10 +321,10 @@ where
274321

275322
let (tx, rx) = oneshot::channel();
276323
self.build_complete = Some(rx);
277-
324+
let cached_reads = self.cached_reads.take().unwrap_or_default();
278325
self.executor.spawn_blocking(Box::pin(async move {
279326
let args = BuildArguments {
280-
cached_reads: Default::default(),
327+
cached_reads,
281328
config: payload_config,
282329
cancel,
283330
};

0 commit comments

Comments
 (0)