diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 000000000..e69de29bb diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 66fefacb7..aed722003 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -128,27 +128,6 @@ pub struct FlashblocksArgs { )] pub flashblocks_block_time: u64, - /// Builder would always thry to produce fixed number of flashblocks without regard to time of - /// FCU arrival. - /// In cases of late FCU it could lead to partially filled blocks. - #[arg( - long = "flashblocks.fixed", - default_value = "false", - env = "FLASHBLOCK_FIXED" - )] - pub flashblocks_fixed: bool, - - /// Time by which blocks would be completed earlier in milliseconds. - /// - /// This time used to account for latencies, this time would be deducted from total block - /// building time before calculating number of fbs. - #[arg( - long = "flashblocks.leeway-time", - default_value = "0", - env = "FLASHBLOCK_LEEWAY_TIME" - )] - pub flashblocks_leeway_time: u64, - /// Whether to disable state root calculation for each flashblock #[arg( long = "flashblocks.disable-state-root", @@ -176,14 +155,6 @@ pub struct FlashblocksArgs { )] pub flashblocks_number_contract_use_permit: bool, - /// Build flashblock at the end of the flashblock interval - #[arg( - long = "flashblocks.build-at-interval-end", - env = "FLASHBLOCK_BUILD_AT_INTERVAL_END", - default_value = "false" - )] - pub flashblocks_build_at_interval_end: bool, - /// Offset in milliseconds for when to send flashblocks. /// Positive values send late, negative values send early. /// Example: -20 sends 20ms early, 20 sends 20ms late. diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index c7c30f6dc..62fa2ca64 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -18,19 +18,6 @@ pub struct FlashblocksConfig { /// per block is equal to the block time divided by the flashblock interval. pub interval: Duration, - /// How much time would be deducted from block build time to account for latencies in - /// milliseconds. - /// - /// If dynamic_adjustment is false this value would be deducted from first flashblock and - /// it shouldn't be more than interval - /// - /// If dynamic_adjustment is true this value would be deducted from first flashblock and - /// it shouldn't be more than interval - pub leeway_time: Duration, - - /// Disables dynamic flashblocks number adjustment based on FCU arrival time - pub fixed: bool, - /// Should we disable state root calculation for each flashblock pub disable_state_root: bool, @@ -42,9 +29,6 @@ pub struct FlashblocksConfig { /// whether to use permit signatures for the contract calls pub number_contract_use_permit: bool, - /// Build flashblock at the end of the flashblock interval - pub build_at_interval_end: bool, - /// Offset in milliseconds for when to send flashblocks. /// Positive values send late, negative values send early. pub send_offset_ms: i64, @@ -77,12 +61,9 @@ impl Default for FlashblocksConfig { Self { ws_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 1111), interval: Duration::from_millis(250), - leeway_time: Duration::from_millis(0), - fixed: false, disable_state_root: false, number_contract_address: None, number_contract_use_permit: false, - build_at_interval_end: false, send_offset_ms: 0, end_buffer_ms: 0, p2p_enabled: false, @@ -106,10 +87,6 @@ impl TryFrom for FlashblocksConfig { args.flashblocks.flashblocks_port, ); - let leeway_time = Duration::from_millis(args.flashblocks.flashblocks_leeway_time); - - let fixed = args.flashblocks.flashblocks_fixed; - let disable_state_root = args.flashblocks.flashblocks_disable_state_root; let number_contract_address = args.flashblocks.flashblocks_number_contract_address; @@ -119,12 +96,9 @@ impl TryFrom for FlashblocksConfig { Ok(Self { ws_addr, interval, - leeway_time, - fixed, disable_state_root, number_contract_address, number_contract_use_permit, - build_at_interval_end: args.flashblocks.flashblocks_build_at_interval_end, send_offset_ms: args.flashblocks.flashblocks_send_offset_ms, end_buffer_ms: args.flashblocks.flashblocks_end_buffer_ms, p2p_enabled: args.flashblocks.p2p.p2p_enabled, diff --git a/crates/op-rbuilder/src/builders/flashblocks/mod.rs b/crates/op-rbuilder/src/builders/flashblocks/mod.rs index 87d4494cb..5151d5072 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/mod.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/mod.rs @@ -11,6 +11,7 @@ mod p2p; mod payload; mod payload_handler; mod service; +mod timing; mod wspub; /// Block building strategy that progressively builds chunks of a block and makes them available diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index e86add0a6..613ab5cfa 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -4,7 +4,9 @@ use crate::{ BuilderConfig, builder_tx::BuilderTransactions, context::OpPayloadBuilderCtx, - flashblocks::{best_txs::BestFlashblocksTxs, config::FlashBlocksConfigExt}, + flashblocks::{ + best_txs::BestFlashblocksTxs, config::FlashBlocksConfigExt, timing::FlashblockScheduler, + }, generator::{BlockCell, BuildArguments, PayloadBuilder}, }, gas_limiter::AddressGasLimiter, @@ -19,8 +21,6 @@ use alloy_consensus::{ use alloy_eips::{Encodable2718, eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE}; use alloy_evm::block::BlockExecutionResult; use alloy_primitives::{Address, B256, U256}; -use alloy_rpc_types_engine::PayloadId; -use core::time::Duration; use eyre::WrapErr as _; use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, @@ -49,12 +49,7 @@ use reth_revm::{ use reth_transaction_pool::TransactionPool; use reth_trie::{HashedPostState, updates::TrieUpdates}; use revm::Database; -use std::{ - collections::BTreeMap, - ops::{Div, Rem}, - sync::Arc, - time::Instant, -}; +use std::{collections::BTreeMap, sync::Arc, time::Instant}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; @@ -90,17 +85,6 @@ type NextBestFlashblocksTxs = BestFlashblocksTxs< >, >; -/// Timing information for flashblock building -#[derive(Debug, Clone, Copy)] -pub(super) struct FlashblocksTiming { - /// Number of flashblocks to build in this block - pub flashblocks_per_block: u64, - /// Time until the first flashblock should be built - pub first_flashblock_offset: Duration, - /// Total time available for flashblock building (deadline) - pub flashblocks_deadline: Duration, -} - #[derive(Debug, Default, Clone)] pub(super) struct FlashblocksExecutionInfo { /// Index of the last consumed flashblock @@ -320,7 +304,7 @@ where da_config: self.config.da_config.clone(), gas_limit_config: self.config.gas_limit_config.clone(), builder_signer: self.config.builder_signer, - metrics: Default::default(), + metrics: self.metrics.clone(), extra_ctx, max_gas_per_txn: self.config.max_gas_per_txn, address_gas_limiter: self.address_gas_limiter.clone(), @@ -364,7 +348,6 @@ where config.attributes.payload_attributes.id.to_string(), ); - let timestamp = config.attributes.timestamp(); let disable_state_root = self.config.specific.disable_state_root; let ctx = self .get_op_payload_builder_ctx( @@ -466,46 +449,37 @@ where // return early since we don't need to build a block with transactions from the pool return Ok(()); } - // We adjust our flashblocks timings based on time the fcu block building signal arrived - let (flashblocks_per_block, first_flashblock_offset, flashblocks_deadline) = - if self.config.specific.build_at_interval_end { - let timing = self.calculate_flashblocks_timing(timestamp, fb_payload.payload_id); - ( - timing.flashblocks_per_block, - timing.first_flashblock_offset, - timing.flashblocks_deadline, - ) - } else { - let (flashblocks_per_block, first_flashblock_offset) = - self.calculate_flashblocks(timestamp); - ( - flashblocks_per_block, - first_flashblock_offset, - self.config.block_time, - ) - }; + // We adjust our flashblocks timings based on time the fcu block building signal arrived + let timestamp = config.attributes.timestamp(); + let flashblock_scheduler = + FlashblockScheduler::new(&self.config.specific, self.config.block_time, timestamp); info!( target: "payload_builder", - id = fb_payload.payload_id.to_string(), - flashblocks_per_block, - first_flashblock_offset = first_flashblock_offset.as_millis(), - flashblocks_interval = self.config.specific.interval.as_millis(), - "Performed flashblocks timing derivation", + payload_id = ?fb_payload.payload_id, + schedule = ?flashblock_scheduler, + "Computed flashblock timing schedule" ); - ctx.metrics.reduced_flashblocks_number.record( - self.config - .flashblocks_per_block() - .saturating_sub(ctx.target_flashblock_count()) as f64, - ); - ctx.metrics - .first_flashblock_time_offset - .record(first_flashblock_offset.as_millis() as f64); - let gas_per_batch = ctx.block_gas_limit() / flashblocks_per_block; + let target_flashblocks = flashblock_scheduler.target_flashblocks(); + + let expected_flashblocks = self.config.flashblocks_per_block(); + if target_flashblocks < expected_flashblocks { + warn!( + target: "payload_builder", + expected_flashblocks, + target_flashblocks, + "FCU arrived late, building fewer flashblocks" + ); + ctx.metrics + .reduced_flashblocks_number + .record((expected_flashblocks - target_flashblocks) as f64); + } + + let gas_per_batch = ctx.block_gas_limit() / target_flashblocks; let da_per_batch = ctx .da_config .max_da_block_size() - .map(|da_limit| da_limit / flashblocks_per_block); + .map(|da_limit| da_limit / target_flashblocks); // Check that builder tx won't affect fb limit too much if let Some(da_limit) = da_per_batch { // We error if we can't insert any tx aside from builder tx in flashblock @@ -517,11 +491,11 @@ where } let da_footprint_per_batch = info .da_footprint_scalar - .map(|_| ctx.block_gas_limit() / flashblocks_per_block); + .map(|_| ctx.block_gas_limit() / target_flashblocks); let extra_ctx = FlashblocksExtraCtx { flashblock_index: 1, - target_flashblock_count: flashblocks_per_block, + target_flashblock_count: target_flashblocks, target_gas_for_batch: gas_per_batch, target_da_for_batch: da_per_batch, gas_per_batch, @@ -531,7 +505,7 @@ where target_da_footprint_for_batch: da_footprint_per_batch, }; - let mut fb_cancel = block_cancel.child_token(); + let fb_cancel = block_cancel.child_token(); let mut ctx = self .get_op_payload_builder_ctx(config, fb_cancel.clone(), extra_ctx) .map_err(|e| PayloadBuilderError::Other(e.into()))?; @@ -541,92 +515,22 @@ where self.pool .best_transactions_with_attributes(ctx.best_transaction_attributes()), )); - let interval = self.config.specific.interval; - let (tx, rx) = - std::sync::mpsc::sync_channel((self.config.flashblocks_per_block() + 1) as usize); - let build_at_interval_end = self.config.specific.build_at_interval_end; - - tokio::spawn(self.task_metrics.flashblock_timer.instrument({ - let block_cancel = block_cancel.clone(); - let flashblock_index = ctx.flashblock_index(); - let block_number = ctx.block_number(); - - async move { - // If NOT building at interval end, send immediate signal to build first - // flashblock right away (preserves current default behavior). - // Otherwise, wait for first_flashblock_offset before first build. - if !build_at_interval_end && tx.send(fb_cancel.clone()).is_err() { - error!( - target: "payload_builder", - "Did not trigger first flashblock build due to payload building error or block building being cancelled"); - return; - } - - let mut timer = tokio::time::interval_at( - tokio::time::Instant::now() - .checked_add(first_flashblock_offset) - .expect("can add flashblock offset to current time"), - interval, - ); - // Set deadline to ensure the last flashblock will be built before the leeway time - let deadline_sleep = async { - tokio::time::sleep(flashblocks_deadline).await; - }; - tokio::pin!(deadline_sleep); - - loop { - tokio::select! { - _ = timer.tick() => { - debug!( - target: "payload_builder", - id = ?fb_payload.payload_id, - flashblock_index = flashblock_index, - block_number = block_number, - "Triggering next flashblock with timer", - ); - // cancel current payload building job - fb_cancel.cancel(); - fb_cancel = block_cancel.child_token(); - // this will tick at first_flashblock_offset, - // starting the next flashblock - if tx.send(fb_cancel.clone()).is_err() { - // receiver channel was dropped, return. - // this will only happen if the `build_payload` function returns, - // due to payload building error or the main cancellation token being - // cancelled. - error!( - target: "payload_builder", - "Did not trigger next flashblock build due to payload building error or block building being cancelled", - ); - return; - } - } - _ = &mut deadline_sleep => { - // Deadline reached (with leeway applied to end). Cancel current payload building job - fb_cancel.cancel(); - if tx.send(block_cancel.child_token()).is_err() { - error!( - target: "payload_builder", - "Did not trigger next flashblock build due to payload building error or block building being cancelled", - ); - } - return; - } - _ = block_cancel.cancelled() => { - drop(tx); - return; - } - } - } - } - })); + let (tx, rx) = std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); + tokio::spawn( + self.task_metrics + .flashblock_timer + .instrument(flashblock_scheduler.run( + tx, + block_cancel.clone(), + fb_cancel, + fb_payload.payload_id, + )), + ); // Process flashblocks - block on async channel receive loop { // Wait for signal before building flashblock. - // If build_at_interval_end is false, an immediate signal is sent so we don't wait. - // If build_at_interval_end is true, we wait for the timer tick (first_flashblock_offset). if let Ok(new_fb_cancel) = rx.recv() { debug!( target: "payload_builder", @@ -638,7 +542,7 @@ where ctx = ctx.with_cancel(new_fb_cancel); } else { // Channel closed - block building cancelled - self.record_flashblocks_metrics(&ctx, &info, flashblocks_per_block, &span); + self.record_flashblocks_metrics(&ctx, &info, target_flashblocks, &span); return Ok(()); } @@ -653,11 +557,6 @@ where }; let _entered = fb_span.enter(); - if ctx.flashblock_index() > ctx.target_flashblock_count() { - self.record_flashblocks_metrics(&ctx, &info, flashblocks_per_block, &span); - return Ok(()); - } - // Build flashblock after receiving signal let next_flashblocks_ctx = match self.build_next_flashblock( &ctx, @@ -670,7 +569,7 @@ where ) { Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, Ok(None) => { - self.record_flashblocks_metrics(&ctx, &info, flashblocks_per_block, &span); + self.record_flashblocks_metrics(&ctx, &info, target_flashblocks, &span); return Ok(()); } Err(err) => { @@ -944,232 +843,6 @@ where span.record("flashblock_count", ctx.flashblock_index()); } - - /// Calculate number of flashblocks. - /// If dynamic is enabled this function will take time drift into the account. - /// TODO: deprecate this flashblocks timing calculation - pub(super) fn calculate_flashblocks(&self, timestamp: u64) -> (u64, Duration) { - if self.config.specific.fixed { - return ( - self.config.flashblocks_per_block(), - // We adjust first FB to ensure that we have at least some time to make all FB in time - self.config.specific.interval - self.config.specific.leeway_time, - ); - } - - // We use this system time to determine remining time to build a block - // Things to consider: - // FCU(a) - FCU with attributes - // FCU(a) could arrive with `block_time - fb_time < delay`. In this case we could only produce 1 flashblock - // FCU(a) could arrive with `delay < fb_time` - in this case we will shrink first flashblock - // FCU(a) could arrive with `fb_time < delay < block_time - fb_time` - in this case we will issue less flashblocks - let target_time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp) - - self.config.specific.leeway_time; - let now = std::time::SystemTime::now(); - let Some(time_drift) = target_time - .duration_since(now) - .ok() - .filter(|duration| duration.as_millis() > 0) - else { - error!( - target: "payload_builder", - ?target_time, - ?now, - "FCU arrived too late or system clock are unsynced" - ); - return ( - self.config.flashblocks_per_block(), - self.config.specific.interval, - ); - }; - self.metrics.flashblocks_time_drift.record( - self.config - .block_time - .as_millis() - .saturating_sub(time_drift.as_millis()) as f64, - ); - debug!( - target: "payload_builder", - ?target_time, - time_drift = self.config.block_time.as_millis().saturating_sub(time_drift.as_millis()), - ?timestamp, - "Time drift for building round" - ); - // This is extra check to ensure that we would account at least for block time in case we have any timer discrepancies. - let time_drift = time_drift.min(self.config.block_time); - let interval = self.config.specific.interval.as_millis() as u64; - let time_drift = time_drift.as_millis() as u64; - let first_flashblock_offset = time_drift.rem(interval); - if first_flashblock_offset == 0 { - // We have perfect division, so we use interval as first fb offset - (time_drift.div(interval), Duration::from_millis(interval)) - } else { - // Non-perfect division, so we account for it. - ( - time_drift.div(interval) + 1, - Duration::from_millis(first_flashblock_offset), - ) - } - } - - /// Calculate number of flashblocks and time until first flashblock and deadline for building flashblocks - /// If dynamic is enabled this function will take time drift of FCU arrival into the account. - pub(super) fn calculate_flashblocks_timing( - &self, - timestamp: u64, - payload_id: PayloadId, - ) -> FlashblocksTiming { - let offset_delta = self.config.specific.send_offset_ms.unsigned_abs(); - if self.config.specific.fixed { - let offset = if self.config.specific.send_offset_ms > 0 { - self.config - .specific - .interval - .saturating_add(Duration::from_millis(offset_delta)) - } else { - self.config - .specific - .interval - .saturating_sub(Duration::from_millis(offset_delta)) - }; - return FlashblocksTiming { - flashblocks_per_block: self.config.flashblocks_per_block(), - first_flashblock_offset: offset, - flashblocks_deadline: self - .config - .block_time - .saturating_sub(Duration::from_millis(self.config.specific.end_buffer_ms)), - }; - } - - // FLASHBLOCK TIMING SCENARIOS - // =========================== - - // Block time = 1000ms, Flashblock interval (fb_time) = 250ms - // Target: 4 flashblocks per block - - // Timeline: Block starts at timestamp T, ends at T+1000ms - // |<------------------- block_time (1000ms) ------------------->| - - // SCENARIO 1: IDEAL - FCU arrives on time (delay = 0) - // ───────────────────────────────────────────────────── - // T T+1000ms - // │ │ - // FCU(a) ▼ │ - // arrives ├────────────┬────────────┬────────────┬────────────┤ - // │ FB 1 │ FB 2 │ FB 3 │ FB 4 │ - // │ 250ms │ 250ms │ 250ms │ 250ms │ - // └────────────┴────────────┴────────────┴────────────┘ - - // Result: 4 flashblocks, each 250ms - - // SCENARIO 2: LATE FCU - delay < fb_time (e.g., delay = 100ms) - // ───────────────────────────────────────────────────────────── - // T T+1000ms - // │ │ - // │ delay │ │ - // │◄──100ms──►│ │ - // │ ▼ FCU(a) arrives │ - // ├───────────┼────────┬────────────┬────────────┬──────────┤ - // │ (missed) │ FB 1 │ FB 2 │ FB 3 │ FB 4 │ - // │ │ 150ms │ 250ms │ 250ms │ 250ms │ - // │ │(shrunk)│ │ │ │ - // └───────────┴────────┴────────────┴────────────┴──────────┘ - // │◄─────── remaining time: 900ms ─────────────►│ - - // Result: 4 flashblocks, but FB 1 is shrunk (only 150ms) - // first_flashblock_offset = delay % fb_time = 100 % 250 = 100ms remaining - - // SCENARIO 3: VERY LATE FCU - block_time - fb_time < delay (e.g., delay = 800ms) - // ────────────────────────────────────────────────────────────────────────────── - // T T+1000ms - // │ │ - // │◄─────────────── delay (800ms) ────────────────►│ │ - // │ ▼ FCU(a) │ - // ├─────────────────────────────────────────────────┼────────┤ - // │ (missed - too late) │ FB 1 │ - // │ │ 200ms │ - // │ │ │ - // └─────────────────────────────────────────────────┴────────┘ - // │◄─200ms─►│ - - // Result: Only 1 flashblock possible (200ms remaining < 250ms interval) - let target_time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp); - let now = std::time::SystemTime::now(); - let Some(remaining_time) = target_time - .duration_since(now) - .ok() - .filter(|duration| duration.as_millis() > 0) - else { - error!( - target: "payload_builder", - id = ?payload_id, - ?target_time, - ?now, - "FCU arrived too late or system clock are unsynced", - ); - return FlashblocksTiming { - flashblocks_per_block: self.config.flashblocks_per_block(), - first_flashblock_offset: self.config.specific.interval, - flashblocks_deadline: self - .config - .block_time - .saturating_sub(Duration::from_millis(self.config.specific.end_buffer_ms)), - }; - }; - self.metrics.flashblocks_time_drift.record( - self.config - .block_time - .as_millis() - .saturating_sub(remaining_time.as_millis()) as f64, - ); - debug!( - target: "payload_builder", - id = ?payload_id, - ?target_time, - delay = self.config.block_time.as_millis().saturating_sub(remaining_time.as_millis()), - ?timestamp, - "Time delay for building round", - ); - // This is extra check to ensure that we would account at least for block time in case we have any timer discrepancies. - let remaining_time = remaining_time.min(self.config.block_time).as_millis() as u64; - let interval = self.config.specific.interval.as_millis() as u64; - let first_flashblock_offset = remaining_time.rem(interval); - let (flashblocks_per_block, offset) = if first_flashblock_offset == 0 { - // We have perfect division, so we use interval as first fb offset - ( - remaining_time.div(interval), - Duration::from_millis(interval), - ) - } else { - // Non-perfect division, set the first flashblock offset to the remainder of the division - ( - remaining_time.div(interval) + 1, - Duration::from_millis(first_flashblock_offset), - ) - }; - // Apply send_offset_ms to the timer start time. - // Positive values = send later, negative values = send earlier. - let deadline = Duration::from_millis( - remaining_time.saturating_sub(self.config.specific.end_buffer_ms), - ); - let (adjusted_offset, adjusted_deadline) = if self.config.specific.send_offset_ms >= 0 { - ( - offset.saturating_add(Duration::from_millis(offset_delta)), - deadline.saturating_add(Duration::from_millis(offset_delta)), - ) - } else { - ( - offset.saturating_sub(Duration::from_millis(offset_delta)), - deadline.saturating_sub(Duration::from_millis(offset_delta)), - ) - }; - FlashblocksTiming { - flashblocks_per_block, - first_flashblock_offset: adjusted_offset, - flashblocks_deadline: adjusted_deadline, - } - } } #[async_trait::async_trait] diff --git a/crates/op-rbuilder/src/builders/flashblocks/timing.rs b/crates/op-rbuilder/src/builders/flashblocks/timing.rs new file mode 100644 index 000000000..1aecad88b --- /dev/null +++ b/crates/op-rbuilder/src/builders/flashblocks/timing.rs @@ -0,0 +1,505 @@ +use core::time::Duration; +use std::{ops::Rem, sync::mpsc::SyncSender}; + +use reth_payload_builder::PayloadId; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, warn}; + +use crate::builders::flashblocks::config::FlashblocksConfig; + +/// Schedules and triggers flashblock builds at predetermined times during a +/// block slot. This should be created at the start of each payload building +/// job. +pub(super) struct FlashblockScheduler { + /// Wall clock time when this scheduler was created. + reference_system: std::time::SystemTime, + /// Monotonic instant when this scheduler was created. + reference_instant: tokio::time::Instant, + /// Absolute times at which to trigger flashblock builds. + send_times: Vec, +} + +impl FlashblockScheduler { + pub(super) fn new( + config: &FlashblocksConfig, + block_time: Duration, + payload_timestamp: u64, + ) -> Self { + // Capture current time for calculating relative offsets + let reference_system = std::time::SystemTime::now(); + let reference_instant = tokio::time::Instant::now(); + + let target_flashblocks = (block_time.as_millis() / config.interval.as_millis()) as u64; + + // Calculate how much time remains until the payload deadline + let remaining_time = + compute_remaining_time(block_time, payload_timestamp, reference_system); + + // Compute the schedule as relative durations from now + let intervals = compute_scheduler_intervals( + config.interval, + config.send_offset_ms, + config.end_buffer_ms, + remaining_time, + target_flashblocks, + ); + + // Convert relative durations to absolute instants for + // tokio::time::sleep_until + let send_times = intervals + .into_iter() + .map(|d| reference_instant + d) + .collect(); + + Self { + reference_system, + reference_instant, + send_times, + } + } + + /// Runs the scheduler, sending flashblock triggers at the scheduled times. + pub(super) async fn run( + self, + tx: SyncSender, + block_cancel: CancellationToken, + mut fb_cancel: CancellationToken, + payload_id: PayloadId, + ) { + let start = tokio::time::Instant::now(); + + let target_flashblocks = self.send_times.len(); + for (i, send_time) in self.send_times.into_iter().enumerate() { + tokio::select! { + _ = tokio::time::sleep_until(send_time) => { + // Cancel current flashblock building job + fb_cancel.cancel(); + + // Trigger next flashblock building job + fb_cancel = block_cancel.child_token(); + + let elapsed = start.elapsed(); + debug!( + target: "payload_builder", + id = %payload_id, + flashblock_index = i + 1, + scheduled_time = ?(send_time - start), + actual_time = ?elapsed, + drift = ?(elapsed - (send_time - start)), + "Sending flashblock trigger" + ); + + if tx.send(fb_cancel.clone()).is_err() { + // receiver channel was dropped, return. this will only + // happen if the `build_payload` function returns, due + // to payload building error or the main cancellation + // token being cancelled. + error!( + target: "payload_builder", + id = %payload_id, + "Failed to send flashblock trigger, receiver channel was dropped" + ); + return; + } + } + _ = block_cancel.cancelled() => { + warn!( + target: "payload_builder", + id = %payload_id, + missed_count = target_flashblocks - i, + target_flashblocks = target_flashblocks, + "Missing flashblocks because the payload building job was cancelled too early" + ); + return + }, + } + } + } + + /// Returns the total number of flashblocks that will be triggered. + pub(super) fn target_flashblocks(&self) -> u64 { + self.send_times.len() as u64 + } +} + +/// Computes the remaining time until the payload deadline. Calculates remaining +/// time as `payload_timestamp - now`. The result is capped at `block_time` and +/// falls back to `block_time` if the timestamp is in the past. +fn compute_remaining_time( + block_time: Duration, + payload_timestamp: u64, + reference_system: std::time::SystemTime, +) -> Duration { + let target_time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(payload_timestamp); + + // Calculate remaining time, with fallback to block_time if: + // - target_time is in the past (duration_since returns Err) + // - remaining time is 0 or negative + target_time + .duration_since(reference_system) + .ok() + .filter(|duration| duration.as_millis() > 0) + .unwrap_or(block_time) + .min(block_time) +} + +/// Computes the scheduler send time intervals as durations relative to the +/// start instant. +fn compute_scheduler_intervals( + flashblock_interval: Duration, + send_offset_ms: i64, + end_buffer_ms: u64, + remaining_time: Duration, + target_flashblocks: u64, +) -> Vec { + // Align flashblocks to remaining_time + let first_flashblock_offset = + calculate_first_flashblock_offset(remaining_time, flashblock_interval); + + let first_flashblock_offset = apply_offset(first_flashblock_offset, send_offset_ms); + let flashblocks_deadline = apply_offset( + remaining_time.saturating_sub(Duration::from_millis(end_buffer_ms)), + send_offset_ms, + ); + + compute_send_time_intervals( + first_flashblock_offset, + flashblock_interval, + flashblocks_deadline, + target_flashblocks, + ) +} + +/// Generates the actual send time intervals given timing parameters. +fn compute_send_time_intervals( + first_flashblock_offset: Duration, + interval: Duration, + deadline: Duration, + target_flashblocks: u64, +) -> Vec { + let mut send_times = vec![]; + + // Add triggers at first_flashblock_offset, then every interval until + // deadline + let mut next_time = first_flashblock_offset; + while next_time < deadline { + send_times.push(next_time); + next_time += interval; + } + send_times.push(deadline); + + // Clamp the number of triggers. Some of the calculation strategies end up + // with more triggers concentrated towards the start of the block and so + // this is needed to preserve backwards compatibility. + send_times.truncate(target_flashblocks as usize); + + send_times +} + +/// Durations cannot be negative values so we need to store the offset value as +/// an int. This is a helper function to apply the signed millisecond offset to +/// a duration. +fn apply_offset(duration: Duration, offset_ms: i64) -> Duration { + let offset_delta = offset_ms.unsigned_abs(); + if offset_ms >= 0 { + duration.saturating_add(Duration::from_millis(offset_delta)) + } else { + duration.saturating_sub(Duration::from_millis(offset_delta)) + } +} + +/// Calculates when the first flashblock should be triggered. +fn calculate_first_flashblock_offset(remaining_time: Duration, interval: Duration) -> Duration { + let remaining_time_ms = remaining_time.as_millis() as u64; + let interval_ms = interval.as_millis() as u64; + + // The math is equivalent to the modulo operation except we produce a result + // in the range of [1, interval] instead of [0, interval - 1]. + Duration::from_millis((remaining_time_ms.saturating_sub(1)).rem(interval_ms) + 1) +} + +impl std::fmt::Debug for FlashblockScheduler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list() + .entries(self.send_times.iter().map(|t| { + let offset = *t - self.reference_instant; + let wall_time = self.reference_system + offset; + let duration = wall_time.duration_since(std::time::UNIX_EPOCH).unwrap(); + let total_secs = duration.as_secs(); + let micros = duration.subsec_micros(); + let secs = total_secs % 60; + let mins = (total_secs / 60) % 60; + let hours = (total_secs / 3600) % 24; + format!("{:02}:{:02}:{:02}.{:06}", hours, mins, secs, micros) + })) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct ComputeSendTimesTestCase { + first_flashblock_offset_ms: u64, + deadline_ms: u64, + expected_send_times_ms: Vec, + } + + fn check_compute_send_times( + test_case: ComputeSendTimesTestCase, + interval: Duration, + target_flashblocks: u64, + ) { + let send_times = compute_send_time_intervals( + Duration::from_millis(test_case.first_flashblock_offset_ms), + interval, + Duration::from_millis(test_case.deadline_ms), + target_flashblocks, + ); + let expected_send_times: Vec = test_case + .expected_send_times_ms + .iter() + .map(|ms| Duration::from_millis(*ms)) + .collect(); + assert_eq!( + send_times, expected_send_times, + "Failed for test case: first_flashblock_offset_ms: {}, interval: {:?}, deadline_ms: {}", + test_case.first_flashblock_offset_ms, interval, test_case.deadline_ms, + ); + } + + #[test] + fn test_compute_send_times() { + let test_cases = vec![ComputeSendTimesTestCase { + first_flashblock_offset_ms: 150, + deadline_ms: 880, + expected_send_times_ms: vec![150, 350, 550, 750, 880], + }]; + + for test_case in test_cases { + check_compute_send_times(test_case, Duration::from_millis(200), 5); + } + } + + #[test] + fn test_apply_offset() { + assert_eq!( + apply_offset(Duration::from_millis(100), 50), + Duration::from_millis(150) + ); + assert_eq!( + apply_offset(Duration::from_millis(100), -30), + Duration::from_millis(70) + ); + assert_eq!( + apply_offset(Duration::from_millis(100), 0), + Duration::from_millis(100) + ); + // Should not underflow - saturates at zero + assert_eq!( + apply_offset(Duration::from_millis(50), -100), + Duration::ZERO + ); + } + + #[test] + fn test_calculate_first_flashblock_offset() { + // remaining_time exactly divisible by interval so we get the full + // interval + assert_eq!( + calculate_first_flashblock_offset( + Duration::from_millis(400), + Duration::from_millis(200) + ), + Duration::from_millis(200) + ); + + // remaining_time with partial interval + assert_eq!( + calculate_first_flashblock_offset( + Duration::from_millis(350), + Duration::from_millis(200) + ), + Duration::from_millis(150) + ); + + // remaining_time less than interval + assert_eq!( + calculate_first_flashblock_offset( + Duration::from_millis(140), + Duration::from_millis(200) + ), + Duration::from_millis(140) + ); + + // remaining_time equals interval + assert_eq!( + calculate_first_flashblock_offset( + Duration::from_millis(200), + Duration::from_millis(200) + ), + Duration::from_millis(200) + ); + } + + fn durations_ms(ms_values: &[u64]) -> Vec { + ms_values + .iter() + .map(|&ms| Duration::from_millis(ms)) + .collect() + } + + struct SchedulerIntervalsTestCase { + name: &'static str, + interval_ms: u64, + send_offset_ms: i64, + end_buffer_ms: u64, + remaining_time_ms: u64, + target_flashblocks: u64, + expected_intervals_ms: Vec, + } + + fn check_scheduler_intervals(test_case: SchedulerIntervalsTestCase) { + let intervals = compute_scheduler_intervals( + Duration::from_millis(test_case.interval_ms), + test_case.send_offset_ms, + test_case.end_buffer_ms, + Duration::from_millis(test_case.remaining_time_ms), + test_case.target_flashblocks, + ); + assert_eq!( + intervals, + durations_ms(&test_case.expected_intervals_ms), + "Failed test case '{}': interval={}ms, offset={}ms, buffer={}ms, remaining={}ms", + test_case.name, + test_case.interval_ms, + test_case.send_offset_ms, + test_case.end_buffer_ms, + test_case.remaining_time_ms, + ); + } + + #[test] + fn test_compute_scheduler_intervals() { + let test_cases = vec![ + // Basic cases + SchedulerIntervalsTestCase { + name: "normal timing", + interval_ms: 200, + send_offset_ms: 0, + end_buffer_ms: 0, + remaining_time_ms: 880, + target_flashblocks: 5, + expected_intervals_ms: vec![80, 280, 480, 680, 880], + }, + SchedulerIntervalsTestCase { + name: "with offset and buffer", + interval_ms: 200, + send_offset_ms: -20, + end_buffer_ms: 50, + remaining_time_ms: 800, + target_flashblocks: 5, + expected_intervals_ms: vec![180, 380, 580, 730], + }, + SchedulerIntervalsTestCase { + name: "late FCU (300ms remaining)", + interval_ms: 200, + send_offset_ms: 0, + end_buffer_ms: 0, + remaining_time_ms: 300, + target_flashblocks: 5, + expected_intervals_ms: vec![100, 300], + }, + SchedulerIntervalsTestCase { + name: "end buffer equals remaining time", + interval_ms: 200, + send_offset_ms: 0, + end_buffer_ms: 200, + remaining_time_ms: 200, + target_flashblocks: 5, + expected_intervals_ms: vec![0], + }, + SchedulerIntervalsTestCase { + name: "late FCU with offset and buffer combined", + interval_ms: 200, + send_offset_ms: -30, + end_buffer_ms: 50, + remaining_time_ms: 400, + target_flashblocks: 5, + expected_intervals_ms: vec![170, 320], + }, + SchedulerIntervalsTestCase { + name: "no end buffer", + interval_ms: 200, + send_offset_ms: 0, + end_buffer_ms: 0, + remaining_time_ms: 1000, + target_flashblocks: 5, + expected_intervals_ms: vec![200, 400, 600, 800, 1000], + }, + ]; + + for test_case in test_cases { + check_scheduler_intervals(test_case); + } + } + + struct RemainingTimeTestCase { + name: &'static str, + block_time_ms: u64, + reference_secs: u64, + payload_timestamp: u64, + expected_remaining_ms: u64, + } + + fn check_remaining_time(test_case: RemainingTimeTestCase) { + let block_time = Duration::from_millis(test_case.block_time_ms); + let reference_system = + std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(test_case.reference_secs); + + let remaining = + compute_remaining_time(block_time, test_case.payload_timestamp, reference_system); + + assert_eq!( + remaining, + Duration::from_millis(test_case.expected_remaining_ms), + "Failed test case '{}': block_time={}ms, reference={}s, timestamp={}", + test_case.name, + test_case.block_time_ms, + test_case.reference_secs, + test_case.payload_timestamp, + ); + } + + #[test] + fn test_compute_remaining_time() { + let test_cases = vec![ + RemainingTimeTestCase { + name: "future timestamp within block time", + block_time_ms: 2000, + reference_secs: 1000, + payload_timestamp: 1002, + expected_remaining_ms: 2000, + }, + RemainingTimeTestCase { + name: "remaining exceeds block time (capped)", + block_time_ms: 1000, + reference_secs: 1000, + payload_timestamp: 1005, + expected_remaining_ms: 1000, + }, + RemainingTimeTestCase { + name: "past timestamp (fallback to block time)", + block_time_ms: 1000, + reference_secs: 1000, + payload_timestamp: 999, + expected_remaining_ms: 1000, + }, + ]; + + for test_case in test_cases { + check_remaining_time(test_case); + } + } +} diff --git a/crates/op-rbuilder/src/tests/data_availability.rs b/crates/op-rbuilder/src/tests/data_availability.rs index 9621f3986..28f47b5af 100644 --- a/crates/op-rbuilder/src/tests/data_availability.rs +++ b/crates/op-rbuilder/src/tests/data_availability.rs @@ -1,4 +1,7 @@ -use crate::tests::{BlockTransactionsExt, ChainDriverExt, LocalInstance, TransactionBuilderExt}; +use crate::{ + args::{FlashblocksArgs, OpRbuilderArgs}, + tests::{BlockTransactionsExt, ChainDriverExt, LocalInstance, TransactionBuilderExt}, +}; use alloy_provider::Provider; use macros::{if_flashblocks, if_standard, rb_test}; @@ -123,7 +126,15 @@ async fn block_fill(rbuilder: LocalInstance) -> eyre::Result<()> { /// to the DA footprint limit. The DA footprint is calculated as: /// total_da_bytes_used * da_footprint_gas_scalar (stored in blob_gas_used). /// This must not exceed the block gas limit, accounting for the builder transaction. -#[rb_test] +#[rb_test(args = OpRbuilderArgs { + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 0, + flashblocks_end_buffer_ms: 50, + ..Default::default() + }, + ..Default::default() +})] async fn da_footprint_fills_to_limit(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; diff --git a/crates/op-rbuilder/src/tests/flashblocks.rs b/crates/op-rbuilder/src/tests/flashblocks.rs index 2947c788f..684059e92 100644 --- a/crates/op-rbuilder/src/tests/flashblocks.rs +++ b/crates/op-rbuilder/src/tests/flashblocks.rs @@ -9,262 +9,11 @@ use std::time::Duration; use crate::{ args::{FlashblocksArgs, OpRbuilderArgs}, tests::{ - BlockTransactionsExt, BuilderTxValidation, BundleOpts, ChainDriver, - FLASHBLOCKS_NUMBER_ADDRESS, LocalInstance, TransactionBuilderExt, - flashblocks_number_contract::FlashblocksNumber, + BlockTransactionsExt, BundleOpts, ChainDriver, FLASHBLOCKS_NUMBER_ADDRESS, LocalInstance, + TransactionBuilderExt, flashblocks_number_contract::FlashblocksNumber, }, }; -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 2000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, - ..Default::default() - }, - ..Default::default() -})] -async fn smoke_dynamic_base(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - // We align out block timestamps with current unix timestamp - for _ in 0..10 { - for _ in 0..5 { - // send a valid transaction - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver.build_new_block_with_current_timestamp(None).await?; - assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn - - // Validate builder transactions using BuilderTxValidation - block.assert_builder_tx_count(2); - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - let flashblocks = flashblocks_listener.get_flashblocks(); - assert_eq!(110, flashblocks.len()); - - flashblocks_listener.stop().await -} - -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 1000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, - ..Default::default() - }, - ..Default::default() -})] -async fn smoke_dynamic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - // We align out block timestamps with current unix timestamp - for _ in 0..10 { - for _ in 0..5 { - // send a valid transaction - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver.build_new_block_with_current_timestamp(None).await?; - assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn - - // Validate builder transactions using BuilderTxValidation - block.assert_builder_tx_count(2); - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - let flashblocks = flashblocks_listener.get_flashblocks(); - assert_eq!(60, flashblocks.len()); - - flashblocks_listener.stop().await -} - -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 1000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 200, - flashblocks_leeway_time: 50, - flashblocks_fixed: true, - ..Default::default() - }, - ..Default::default() -})] -async fn smoke_classic_unichain(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - // We align out block timestamps with current unix timestamp - for _ in 0..10 { - for _ in 0..5 { - // send a valid transaction - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver.build_new_block().await?; - assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn - - // Validate builder transactions using BuilderTxValidation - block.assert_builder_tx_count(2); - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - let flashblocks = flashblocks_listener.get_flashblocks(); - assert_eq!(60, flashblocks.len()); - - flashblocks_listener.stop().await -} - -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 2000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 200, - flashblocks_leeway_time: 50, - flashblocks_fixed: true, - ..Default::default() - }, - ..Default::default() -})] -async fn smoke_classic_base(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - // We align out block timestamps with current unix timestamp - for _ in 0..10 { - for _ in 0..5 { - // send a valid transaction - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver.build_new_block().await?; - assert_eq!(block.transactions.len(), 8, "Got: {:?}", block.transactions); // 5 normal txn + deposit + 2 builder txn - - // Validate builder transactions using BuilderTxValidation - block.assert_builder_tx_count(2); - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - let flashblocks = flashblocks_listener.get_flashblocks(); - assert_eq!(110, flashblocks.len()); - - flashblocks_listener.stop().await -} - -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 1000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, - ..Default::default() - }, - ..Default::default() -})] -async fn unichain_dynamic_with_lag(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - // We align out block timestamps with current unix timestamp - for i in 0..9 { - for _ in 0..5 { - // send a valid transaction - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver - .build_new_block_with_current_timestamp(Some(Duration::from_millis(i * 100))) - .await?; - assert_eq!( - block.transactions.len(), - 8, - "Got: {:#?}", - block.transactions - ); // 5 normal txn + deposit + 2 builder txn - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - let flashblocks = flashblocks_listener.get_flashblocks(); - assert_eq!(34, flashblocks.len()); - - flashblocks_listener.stop().await -} - -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 1000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 200, - flashblocks_leeway_time: 0, - flashblocks_fixed: false, - ..Default::default() - }, - ..Default::default() -})] -async fn dynamic_with_full_block_lag(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - for _ in 0..5 { - // send a valid transaction - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver - .build_new_block_with_current_timestamp(Some(Duration::from_millis(999))) - .await?; - // We could only produce block with deposits + builder tx because of short time frame - assert_eq!(block.transactions.len(), 2); - - let flashblocks = flashblocks_listener.get_flashblocks(); - assert_eq!(1, flashblocks.len()); - - flashblocks_listener.stop().await -} - #[rb_test(flashblocks, args = OpRbuilderArgs { chain_block_time: 1000, enable_revert_protection: true, @@ -273,8 +22,6 @@ async fn dynamic_with_full_block_lag(rbuilder: LocalInstance) -> eyre::Result<() flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, ..Default::default() }, ..Default::default() @@ -333,8 +80,6 @@ async fn test_flashblock_min_filtering(rbuilder: LocalInstance) -> eyre::Result< flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, ..Default::default() }, ..Default::default() @@ -389,8 +134,6 @@ async fn test_flashblock_max_filtering(rbuilder: LocalInstance) -> eyre::Result< flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, ..Default::default() }, ..Default::default() @@ -434,8 +177,6 @@ async fn test_flashblock_min_max_filtering(rbuilder: LocalInstance) -> eyre::Res flashblocks_port: 1239, flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, - flashblocks_leeway_time: 100, - flashblocks_fixed: false, flashblocks_disable_state_root: true, ..Default::default() }, @@ -678,9 +419,7 @@ fn verify_user_tx_hashes( } } -/// Test that build_at_interval_end causes flashblocks to be built after the interval -/// instead of immediately. With this flag enabled, the first flashblock should be -/// delayed by first_flashblock_offset. +/// Smoke test for flashblocks with end buffer. #[rb_test(flashblocks, args = OpRbuilderArgs { chain_block_time: 1000, flashblocks: FlashblocksArgs { @@ -689,13 +428,11 @@ fn verify_user_tx_hashes( flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 250, flashblocks_end_buffer_ms: 50, - flashblocks_fixed: false, - flashblocks_build_at_interval_end: true, ..Default::default() }, ..Default::default() })] -async fn build_at_interval_end_basic(rbuilder: LocalInstance) -> eyre::Result<()> { +async fn smoke_basic(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); @@ -713,19 +450,18 @@ async fn build_at_interval_end_basic(rbuilder: LocalInstance) -> eyre::Result<() } let flashblocks = flashblocks_listener.get_flashblocks(); - // With build_at_interval_end, the deadline_sleep controls when to stop - // Expected: ~4 flashblocks per block with end_buffer_ms applied at the end + // Expected: ~5 flashblocks per block (1000ms / 250ms interval, with end_buffer_ms applied) assert_eq!( 25, flashblocks.len(), - "Expected 15 flashblocks, got {:#?}", + "Expected 25 flashblocks, got {:#?}", flashblocks.len() ); flashblocks_listener.stop().await } -/// Test build_at_interval_end with send_offset_ms combined +/// Smoke test with send_offset_ms #[rb_test(flashblocks, args = OpRbuilderArgs { chain_block_time: 1000, flashblocks: FlashblocksArgs { @@ -734,14 +470,12 @@ async fn build_at_interval_end_basic(rbuilder: LocalInstance) -> eyre::Result<() flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 250, flashblocks_end_buffer_ms: 50, - flashblocks_fixed: false, - flashblocks_build_at_interval_end: true, flashblocks_send_offset_ms: -25, // Send 25ms earlier ..Default::default() }, ..Default::default() })] -async fn build_at_interval_end_with_offset(rbuilder: LocalInstance) -> eyre::Result<()> { +async fn smoke_with_offset(rbuilder: LocalInstance) -> eyre::Result<()> { let driver: ChainDriver = rbuilder.driver().await?; let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); @@ -759,64 +493,18 @@ async fn build_at_interval_end_with_offset(rbuilder: LocalInstance) -> eyre::Res } let flashblocks = flashblocks_listener.get_flashblocks(); - // Combined flags should still produce reasonable flashblock count + // Offset should still produce expected flashblock count assert_eq!( 25, flashblocks.len(), - "Expected 15 flashblocks, got {:#?}", - flashblocks.len() - ); - - flashblocks_listener.stop().await -} - -/// Test fixed mode with build_at_interval_end and end_buffer_ms -/// This verifies that the deadline is correctly calculated with the buffer -#[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 1000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 250, - flashblocks_leeway_time: 0, - flashblocks_fixed: true, - flashblocks_build_at_interval_end: true, - flashblocks_end_buffer_ms: 100, // Stop building 100ms early - ..Default::default() - }, - ..Default::default() -})] -async fn fixed_mode_with_end_buffer(rbuilder: LocalInstance) -> eyre::Result<()> { - let driver = rbuilder.driver().await?; - let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - - for _ in 0..3 { - for _ in 0..2 { - let _ = driver - .create_transaction() - .random_valid_transfer() - .send() - .await?; - } - let block = driver.build_new_block_with_current_timestamp(None).await?; - assert!(block.transactions.len() >= 4); // deposit + builder tx + user txs - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - let flashblocks = flashblocks_listener.get_flashblocks(); - // With end_buffer_ms=100 and interval=250, we should get 4 + 1 (fallback) flashblocks per block - assert_eq!( - 15, - flashblocks.len(), - "Expected 15 flashblocks with end_buffer, got {:#?}", + "Expected 25 flashblocks, got {:#?}", flashblocks.len() ); flashblocks_listener.stop().await } -/// Test dynamic mode with significant FCU delay (700ms into 1000ms block) +/// Test significant FCU delay (700ms into 1000ms block) /// Should produce fewer flashblocks due to less remaining time #[rb_test(flashblocks, args = OpRbuilderArgs { chain_block_time: 1000, @@ -826,12 +514,11 @@ async fn fixed_mode_with_end_buffer(rbuilder: LocalInstance) -> eyre::Result<()> flashblocks_addr: "127.0.0.1".into(), flashblocks_block_time: 200, flashblocks_end_buffer_ms: 50, - flashblocks_build_at_interval_end: true, ..Default::default() }, ..Default::default() })] -async fn dynamic_mode_late_fcu_reduces_flashblocks(rbuilder: LocalInstance) -> eyre::Result<()> { +async fn late_fcu_reduces_flashblocks(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); @@ -863,50 +550,81 @@ async fn dynamic_mode_late_fcu_reduces_flashblocks(rbuilder: LocalInstance) -> e flashblocks_listener.stop().await } -/// Test with 0 end_buffer, build_at_interval_end, and negative offset -/// Ensures all flashblocks are built when negative offset gives margin before deadline +/// Test progressive FCU delays across multiple blocks. +/// With 1000ms block time, 200ms flashblock interval, and 50ms end buffer: +/// - Available time = 1000 - lag - 50 = 950 - lag +/// - Flashblocks per block = ceil((available_time) / 200) + 1 (base flashblock) #[rb_test(flashblocks, args = OpRbuilderArgs { - chain_block_time: 1000, - flashblocks: FlashblocksArgs { - enabled: true, - flashblocks_port: 1239, - flashblocks_addr: "127.0.0.1".into(), - flashblocks_block_time: 250, - flashblocks_fixed: true, - flashblocks_build_at_interval_end: true, - flashblocks_send_offset_ms: -50, // Send 50ms earlier to ensure all FBs complete before deadline - ..Default::default() - }, - ..Default::default() -})] -async fn build_at_interval_end_negative_offset(rbuilder: LocalInstance) -> eyre::Result<()> { + chain_block_time: 1000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1239, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_end_buffer_ms: 50, + ..Default::default() + }, + ..Default::default() + })] +async fn progressive_lag_reduces_flashblocks(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); - for _ in 0..3 { - for _ in 0..2 { + // Test 9 blocks with increasing FCU delays (0ms, 100ms, ..., 800ms) + for i in 0..9 { + for _ in 0..5 { let _ = driver .create_transaction() .random_valid_transfer() .send() .await?; } - let block = driver.build_new_block_with_current_timestamp(None).await?; - assert!(block.transactions.len() >= 4); // deposit + builder tx + user txs + let block = driver + .build_new_block_with_current_timestamp(Some(Duration::from_millis(i * 100))) + .await?; + assert_eq!( + block.transactions.len(), + 8, + "Got: {:#?}", + block.transactions + ); // 5 normal txn + deposit + 2 builder txn + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } let flashblocks = flashblocks_listener.get_flashblocks(); - // With 1000ms block time, 250ms interval, -50ms offset: - // First FB at 200ms, then 450, 700, 950ms. Deadline at 950ms. - // All 4 flashblocks + 1 fallback = 5 per block, 3 blocks = 15 total - assert_eq!( - 15, - flashblocks.len(), - "Expected 15 flashblocks (5 per block * 3 blocks), got {}", - flashblocks.len() - ); + // Count flashblocks for each block + // Expected flashblocks per block based on lag: + // lag=0ms: ceil((1000-50)/200) + 1 = 6 + // lag=100ms: ceil((900-50)/200) + 1 = 6 + // lag=200ms: ceil((800-50)/200) + 1 = 5 + // lag=300ms: ceil((700-50)/200) + 1 = 5 + // lag=400ms: ceil((600-50)/200) + 1 = 4 + // lag=500ms: ceil((500-50)/200) + 1 = 4 + // lag=600ms: ceil((400-50)/200) + 1 = 3 + // lag=700ms: ceil((300-50)/200) + 1 = 3 + // lag=800ms: ceil((200-50)/200) + 1 = 2 + let expected_flashblocks_per_block = [6, 6, 5, 5, 4, 4, 3, 3, 2]; + for i in 0..9 { + let block_number = i + 1; // Block numbers start from 1 + let flashblocks_for_block = flashblocks + .iter() + .filter(|fb| fb.block_number() == block_number) + .count(); + assert_eq!( + flashblocks_for_block, + expected_flashblocks_per_block[i as usize], + "Block {} (lag {}ms): expected {} flashblocks, got {}", + i, + i * 100, + expected_flashblocks_per_block[i as usize], + flashblocks_for_block + ); + } + + // Total: 6+6+5+5+4+4+3+3+2 = 38 + assert_eq!(38, flashblocks.len()); flashblocks_listener.stop().await } diff --git a/crates/op-rbuilder/src/tests/forks.rs b/crates/op-rbuilder/src/tests/forks.rs index d136c375f..99e67cceb 100644 --- a/crates/op-rbuilder/src/tests/forks.rs +++ b/crates/op-rbuilder/src/tests/forks.rs @@ -1,10 +1,21 @@ -use crate::tests::{BlockTransactionsExt, LocalInstance}; +use crate::{ + args::{FlashblocksArgs, OpRbuilderArgs}, + tests::{BlockTransactionsExt, LocalInstance}, +}; use alloy_eips::{BlockNumberOrTag::Latest, Encodable2718, eip1559::MIN_PROTOCOL_BASE_FEE}; use alloy_primitives::bytes; use macros::{if_flashblocks, if_standard, rb_test}; use std::time::Duration; -#[rb_test] +#[rb_test(args = OpRbuilderArgs { + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 0, + flashblocks_end_buffer_ms: 50, + ..Default::default() + }, + ..Default::default() +})] async fn jovian_block_parameters_set(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; let tx_one = driver.create_transaction().send().await?; diff --git a/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs b/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs index daa88c7e7..2ded7bf3b 100644 --- a/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs +++ b/crates/op-rbuilder/src/tests/framework/macros/src/lib.rs @@ -29,6 +29,7 @@ const BUILDER_VARIANTS: &[VariantInfo] = &[ let mut args = #args; args.flashblocks.enabled = true; args.flashblocks.flashblocks_port = crate::tests::get_available_port(); + args.flashblocks.flashblocks_end_buffer_ms = 75; args } } @@ -39,6 +40,7 @@ const BUILDER_VARIANTS: &[VariantInfo] = &[ let mut args = crate::args::OpRbuilderArgs::default(); args.flashblocks.enabled = true; args.flashblocks.flashblocks_port = crate::tests::get_available_port(); + args.flashblocks.flashblocks_end_buffer_ms = 75; args } } diff --git a/crates/op-rbuilder/src/tests/miner_gas_limit.rs b/crates/op-rbuilder/src/tests/miner_gas_limit.rs index 619509db4..98ba2c992 100644 --- a/crates/op-rbuilder/src/tests/miner_gas_limit.rs +++ b/crates/op-rbuilder/src/tests/miner_gas_limit.rs @@ -1,4 +1,7 @@ -use crate::tests::{BlockTransactionsExt, LocalInstance}; +use crate::{ + args::{FlashblocksArgs, OpRbuilderArgs}, + tests::{BlockTransactionsExt, LocalInstance}, +}; use alloy_provider::Provider; use macros::{if_flashblocks, if_standard, rb_test}; @@ -32,7 +35,15 @@ async fn miner_gas_limit(rbuilder: LocalInstance) -> eyre::Result<()> { /// /// Standard = (785,000 - 182,706 - 21,600) / 53,000 = 10.95 = 10 transactions can fit /// Flashblocks = (785,000 - 182,706 - 21,600 - 21,600) / 53,000 = 10.54 = 10 transactions can fit -#[rb_test] +#[rb_test(args = OpRbuilderArgs { + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 0, + flashblocks_end_buffer_ms: 50, + ..Default::default() + }, + ..Default::default() +})] async fn block_fill(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; diff --git a/crates/op-rbuilder/src/tests/smoke.rs b/crates/op-rbuilder/src/tests/smoke.rs index c379b51e8..6f70bc9f6 100644 --- a/crates/op-rbuilder/src/tests/smoke.rs +++ b/crates/op-rbuilder/src/tests/smoke.rs @@ -1,5 +1,5 @@ use crate::{ - args::OpRbuilderArgs, + args::{FlashblocksArgs, OpRbuilderArgs}, tests::{BuilderTxValidation, LocalInstance, TransactionBuilderExt}, }; use alloy_primitives::TxHash; @@ -18,7 +18,15 @@ use tracing::info; /// /// Generated blocks are also validated against an external op-reth node to /// ensure their correctness. -#[rb_test] +#[rb_test(args = OpRbuilderArgs { + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 0, + flashblocks_end_buffer_ms: 50, + ..Default::default() + }, + ..Default::default() +})] async fn chain_produces_blocks(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; @@ -334,7 +342,15 @@ async fn chain_produces_big_tx_without_gas_limit(rbuilder: LocalInstance) -> eyr /// Validates that each block contains builder transactions using the /// BuilderTxValidation utility. -#[rb_test] +#[rb_test(args = OpRbuilderArgs { + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 0, + flashblocks_end_buffer_ms: 50, + ..Default::default() + }, + ..Default::default() +})] async fn block_includes_builder_transaction(rbuilder: LocalInstance) -> eyre::Result<()> { let driver = rbuilder.driver().await?; diff --git a/docs/flashblocks.md b/docs/flashblocks.md index af0980d86..7a8717316 100644 --- a/docs/flashblocks.md +++ b/docs/flashblocks.md @@ -38,14 +38,24 @@ function build_payload(build_arguments, best_payload_cell): record_metrics_and_return() // === FLASHBLOCKS TIMING CALCULATION === - // 7. Calculate dynamic flashblock timing and number of flashblocks we are able to produce based on the formula - // first_offset = (block timestamp - current timestamp) % flashblock_time - // flashblocks_count = (block timestamp - current timestamp) // flashblock_time - // Ex: If we started building flashblocks at 400ms (because of network latency) that means we have only - // 600ms to build the block (assuming block time is 1s) - // If we have 250ms flashblock time, we could build only 3 flashblocks, the first one must be sent after 100ms and others - // after 250ms each. - (flashblocks_count, first_offset) = calculate_flashblocks_timing(timestamp) + // 7. Calculate flashblock timing based on remaining time until payload deadline. + // The scheduler computes: + // - remaining_time = min(payload_timestamp - now, block_time) + // - first_offset = ((remaining_time - 1) % flashblock_interval) + 1 + // - Subsequent flashblocks are sent at first_offset + N * flashblock_interval + // - The deadline (last flashblock) = remaining_time - end_buffer_ms + // + // The send_offset_ms parameter shifts all send times (positive = late, negative = early). + // The end_buffer_ms parameter shifts the last sent time. + // The number of triggers is clamped to target_flashblocks (block_time / flashblock_interval). + // + // Ex: If FCU arrives 400ms into a 1000ms slot: + // - remaining_time = 600ms, flashblock_interval = 200ms + // - first_offset = (600-1) % 200 + 1 = 200ms + // - With end_buffer_ms=30, deadline = 570ms + // - Triggers at: 200ms, 400ms, 570ms (3 flashblocks) + scheduler = FlashblockScheduler::new(config, block_time, payload_timestamp) + tokio::spawn(scheduler.run(..)); // 8. Calculate resource limits per flashblock gas_per_flashblock = total_gas_limit / flashblocks_count @@ -122,12 +132,25 @@ function build_payload(build_arguments, best_payload_cell): ``` ### Timing Coordination -- **Timer Task**: Spawned async task that sends timing signals at regular intervals -- **First Offset**: Timeout after which we must send the flashblock. We calculate it so we send flashblocks at `current time % flashblock_time == 0` -- **Cancellation Tokens**: When we cancel the flashblock token we stop this flashblock building process and publish it. + +The `FlashblockScheduler` handles timing coordination for flashblock production: + +- **Scheduler**: Pre-computes all send times at initialization based on remaining time until the payload deadline +- **First Offset**: Aligned to `(remaining_time - 1) % interval + 1` to produce evenly spaced flashblocks +- **Trigger Clamping**: The number of triggers is clamped to `block_time / flashblock_interval` to maintain backwards compatibility +- **Cancellation Tokens**: When a flashblock trigger fires, the current flashblock building is cancelled and published + +### Configuration Parameters + +| Parameter | Description | Default | +|-----------|-------------|---------| +| `flashblocks.block-time` | Flashblock interval in milliseconds | 250 | +| `flashblocks.send-offset-ms` | Shifts all send times. Positive = late, negative = early | 0 | +| `flashblocks.end-buffer-ms` | Time reserved at end of slot for final processing | 0 | ### Caveats -If the system clock drifts too much we will print an error message and fallback to producing flashblocks with regular flashblock_time intervals, without adjusting anything. +- If the payload timestamp is in the past or remaining time is 0, the scheduler falls back to using the full block_time +- Late FCU arrivals result in fewer flashblocks being produced (proportional to remaining time) ## Block building flow These are sequence diagrams for flashblock building flow, rollup-boost, op-node and fallback sequencer interaction.