diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index b92e35c7..7f603e93 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -460,7 +460,12 @@ where schedule = ?flashblock_scheduler, "Computed flashblock timing schedule" ); - let target_flashblocks = flashblock_scheduler.target_flashblocks(); + + // Get target number of flashblocks to build. If no flashblocks are scheduled, return early. + let Some(target_flashblocks) = flashblock_scheduler.target_flashblocks() else { + self.record_flashblocks_metrics(&ctx, &info, 0, &span); + return Ok(()); + }; let expected_flashblocks = self.config.flashblocks_per_block(); if target_flashblocks < expected_flashblocks { @@ -557,6 +562,11 @@ where }; let _entered = fb_span.enter(); + if ctx.flashblock_index() > ctx.target_flashblock_count() { + self.record_flashblocks_metrics(&ctx, &info, target_flashblocks, &span); + return Ok(()); + } + // Build flashblock after receiving signal let next_flashblocks_ctx = match self.build_next_flashblock( &ctx, diff --git a/crates/op-rbuilder/src/builders/flashblocks/timing.rs b/crates/op-rbuilder/src/builders/flashblocks/timing.rs index bf819a84..33f7a4da 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/timing.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/timing.rs @@ -16,7 +16,7 @@ pub(super) struct FlashblockScheduler { /// Monotonic instant when this scheduler was created. reference_instant: tokio::time::Instant, /// Absolute times at which to trigger flashblock builds. - send_times: Vec, + send_times: Option>, } impl FlashblockScheduler { @@ -29,27 +29,32 @@ impl FlashblockScheduler { let reference_system = std::time::SystemTime::now(); let reference_instant = tokio::time::Instant::now(); - let target_flashblocks = (block_time.as_millis() / config.interval.as_millis()) as u64; - // Calculate how much time remains until the payload deadline let remaining_time = compute_remaining_time(block_time, payload_timestamp, reference_system); - // Compute the schedule as relative durations from now - let intervals = compute_scheduler_intervals( - config.interval, - config.send_offset_ms, - config.end_buffer_ms, - remaining_time, - target_flashblocks, - ); + let send_times = remaining_time.map(|remaining_time| { + // Calculate the target flashblocks based on the remaining time + let target_flashblocks = remaining_time + .as_millis() + .div_ceil(config.interval.as_millis()) as u64; + + // Compute the schedule as relative durations from now + let intervals = compute_scheduler_intervals( + config.interval, + config.send_offset_ms, + config.end_buffer_ms, + remaining_time, + target_flashblocks, + ); - // Convert relative durations to absolute instants for - // tokio::time::sleep_until - let send_times = intervals - .into_iter() - .map(|d| reference_instant + d) - .collect(); + // Convert relative durations to absolute instants for + // tokio::time::sleep_until + intervals + .into_iter() + .map(|d| reference_instant + d) + .collect() + }); Self { reference_system, @@ -66,10 +71,23 @@ impl FlashblockScheduler { mut fb_cancel: CancellationToken, payload_id: PayloadId, ) { + let Some(send_times) = self.send_times else { + // No flashblocks to schedule, return early. + return; + }; + let start = tokio::time::Instant::now(); + // Send immediate signal to build first flashblock right away. + if tx.send(fb_cancel.clone()).is_err() { + error!( + target: "payload_builder", + "Did not trigger first flashblock build due to payload building error or block building being cancelled" + ); + return; + } - let target_flashblocks = self.send_times.len(); - for (i, send_time) in self.send_times.into_iter().enumerate() { + let target_flashblocks = send_times.len(); + for (i, send_time) in send_times.into_iter().enumerate() { tokio::select! { _ = tokio::time::sleep_until(send_time) => { // Cancel current flashblock building job @@ -116,44 +134,46 @@ impl FlashblockScheduler { } } - /// Returns the total number of flashblocks that will be triggered. - pub(super) fn target_flashblocks(&self) -> u64 { - self.send_times.len() as u64 + /// Returns the total number of flashblocks that will be triggered, or + /// `None` if no flashblocks are scheduled. + pub(super) fn target_flashblocks(&self) -> Option { + self.send_times + .as_ref() + .map(|send_times| send_times.len() as u64) + .filter(|&count| count > 0) } } /// Computes the remaining time until the payload deadline. Calculates remaining /// time as `payload_timestamp - now`. The result is capped at `block_time`. If -/// the timestamp is in the past (late FCU), sets remaining time to 0 to try to -/// emit one flashblock. +/// the timestamp is in the past (late FCU), returns `None`. fn compute_remaining_time( block_time: Duration, payload_timestamp: u64, reference_system: std::time::SystemTime, -) -> Duration { +) -> Option { let target_time = std::time::SystemTime::UNIX_EPOCH + Duration::from_secs(payload_timestamp); target_time .duration_since(reference_system) .ok() - .filter(|duration| duration.as_millis() > 0) + .filter(|d| !d.is_zero()) .map(|d| d.min(block_time)) - .unwrap_or_else(|| { + .or_else(|| { // If we're here then the payload timestamp is in the past. This // happens when the FCU is really late and it also means we're // expecting a getPayload call basically right away, so we don't // have any time to build. let delay_ms = reference_system .duration_since(target_time) - .map(|d| d.as_millis()) - .unwrap_or(0); + .map_or(0, |d| d.as_millis()); warn!( target: "payload_builder", payload_timestamp, delay_ms, "Late FCU: payload timestamp is in the past" ); - Duration::ZERO + None }) } @@ -167,45 +187,32 @@ fn compute_scheduler_intervals( target_flashblocks: u64, ) -> Vec { // Align flashblocks to remaining_time - let first_flashblock_offset = - calculate_first_flashblock_offset(remaining_time, flashblock_interval); - - let first_flashblock_offset = apply_offset(first_flashblock_offset, send_offset_ms); - let flashblocks_deadline = apply_offset( - remaining_time.saturating_sub(Duration::from_millis(end_buffer_ms)), - send_offset_ms, - ); - + let first_flashblock_timing = + calculate_first_flashblock_timing(remaining_time, flashblock_interval); compute_send_time_intervals( - first_flashblock_offset, + first_flashblock_timing, flashblock_interval, - flashblocks_deadline, + send_offset_ms, + remaining_time.saturating_sub(Duration::from_millis(end_buffer_ms)), target_flashblocks, ) } /// Generates the actual send time intervals given timing parameters. fn compute_send_time_intervals( - first_flashblock_offset: Duration, + first_flashblock_timing: Duration, interval: Duration, + send_offset_ms: i64, deadline: Duration, target_flashblocks: u64, ) -> Vec { - let mut send_times = vec![]; - - // Add triggers at first_flashblock_offset, then every interval until - // deadline - let mut next_time = first_flashblock_offset; - while next_time < deadline { - send_times.push(next_time); - next_time += interval; - } - send_times.push(deadline); + let mut send_times = Vec::with_capacity(target_flashblocks as usize); - // Clamp the number of triggers. Some of the calculation strategies end up - // with more triggers concentrated towards the start of the block and so - // this is needed to preserve backwards compatibility. - send_times.truncate(target_flashblocks as usize); + let mut timing = first_flashblock_timing; + for _ in 0..target_flashblocks { + send_times.push(apply_offset(timing.min(deadline), send_offset_ms).min(deadline)); + timing = timing.saturating_add(interval); + } send_times } @@ -223,7 +230,7 @@ fn apply_offset(duration: Duration, offset_ms: i64) -> Duration { } /// Calculates when the first flashblock should be triggered. -fn calculate_first_flashblock_offset(remaining_time: Duration, interval: Duration) -> Duration { +fn calculate_first_flashblock_timing(remaining_time: Duration, interval: Duration) -> Duration { let remaining_time_ms = remaining_time.as_millis() as u64; let interval_ms = interval.as_millis() as u64; @@ -235,7 +242,7 @@ fn calculate_first_flashblock_offset(remaining_time: Duration, interval: Duratio impl std::fmt::Debug for FlashblockScheduler { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_list() - .entries(self.send_times.iter().map(|t| { + .entries(self.send_times.iter().flatten().map(|t| { let offset = *t - self.reference_instant; let wall_time = self.reference_system + offset; let duration = wall_time.duration_since(std::time::UNIX_EPOCH).unwrap(); @@ -255,7 +262,7 @@ mod tests { use super::*; struct ComputeSendTimesTestCase { - first_flashblock_offset_ms: u64, + first_flashblock_timing_ms: u64, deadline_ms: u64, expected_send_times_ms: Vec, } @@ -263,11 +270,13 @@ mod tests { fn check_compute_send_times( test_case: ComputeSendTimesTestCase, interval: Duration, + send_offset_ms: i64, target_flashblocks: u64, ) { let send_times = compute_send_time_intervals( - Duration::from_millis(test_case.first_flashblock_offset_ms), + Duration::from_millis(test_case.first_flashblock_timing_ms), interval, + send_offset_ms, Duration::from_millis(test_case.deadline_ms), target_flashblocks, ); @@ -278,21 +287,47 @@ mod tests { .collect(); assert_eq!( send_times, expected_send_times, - "Failed for test case: first_flashblock_offset_ms: {}, interval: {:?}, deadline_ms: {}", - test_case.first_flashblock_offset_ms, interval, test_case.deadline_ms, + "Failed for test case: first_flashblock_timing_ms: {}, interval: {:?}, deadline_ms: {}", + test_case.first_flashblock_timing_ms, interval, test_case.deadline_ms, ); } #[test] fn test_compute_send_times() { let test_cases = vec![ComputeSendTimesTestCase { - first_flashblock_offset_ms: 150, + first_flashblock_timing_ms: 150, deadline_ms: 880, expected_send_times_ms: vec![150, 350, 550, 750, 880], }]; for test_case in test_cases { - check_compute_send_times(test_case, Duration::from_millis(200), 5); + check_compute_send_times(test_case, Duration::from_millis(200), 0, 5); + } + } + + #[test] + fn test_compute_send_times_with_negative_send_offset() { + let test_cases = vec![ComputeSendTimesTestCase { + first_flashblock_timing_ms: 150, + deadline_ms: 880, + expected_send_times_ms: vec![140, 340, 540, 740, 870], + }]; + + for test_case in test_cases { + check_compute_send_times(test_case, Duration::from_millis(200), -10, 5); + } + } + + #[test] + fn test_compute_send_times_with_positive_send_offset() { + let test_cases = vec![ComputeSendTimesTestCase { + first_flashblock_timing_ms: 150, + deadline_ms: 880, + expected_send_times_ms: vec![160, 360, 560, 760, 880], + }]; + + for test_case in test_cases { + check_compute_send_times(test_case, Duration::from_millis(200), 10, 5); } } @@ -318,11 +353,11 @@ mod tests { } #[test] - fn test_calculate_first_flashblock_offset() { + fn test_calculate_first_flashblock_timing() { // remaining_time exactly divisible by interval so we get the full // interval assert_eq!( - calculate_first_flashblock_offset( + calculate_first_flashblock_timing( Duration::from_millis(400), Duration::from_millis(200) ), @@ -331,7 +366,7 @@ mod tests { // remaining_time with partial interval assert_eq!( - calculate_first_flashblock_offset( + calculate_first_flashblock_timing( Duration::from_millis(350), Duration::from_millis(200) ), @@ -340,7 +375,7 @@ mod tests { // remaining_time less than interval assert_eq!( - calculate_first_flashblock_offset( + calculate_first_flashblock_timing( Duration::from_millis(140), Duration::from_millis(200) ), @@ -349,7 +384,7 @@ mod tests { // remaining_time equals interval assert_eq!( - calculate_first_flashblock_offset( + calculate_first_flashblock_timing( Duration::from_millis(200), Duration::from_millis(200) ), @@ -370,7 +405,6 @@ mod tests { send_offset_ms: i64, end_buffer_ms: u64, remaining_time_ms: u64, - target_flashblocks: u64, expected_intervals_ms: Vec, } @@ -380,7 +414,7 @@ mod tests { test_case.send_offset_ms, test_case.end_buffer_ms, Duration::from_millis(test_case.remaining_time_ms), - test_case.target_flashblocks, + test_case.remaining_time_ms.div_ceil(test_case.interval_ms), ); assert_eq!( intervals, @@ -404,7 +438,6 @@ mod tests { send_offset_ms: 0, end_buffer_ms: 0, remaining_time_ms: 880, - target_flashblocks: 5, expected_intervals_ms: vec![80, 280, 480, 680, 880], }, SchedulerIntervalsTestCase { @@ -413,7 +446,6 @@ mod tests { send_offset_ms: -20, end_buffer_ms: 50, remaining_time_ms: 800, - target_flashblocks: 5, expected_intervals_ms: vec![180, 380, 580, 730], }, SchedulerIntervalsTestCase { @@ -422,7 +454,6 @@ mod tests { send_offset_ms: 0, end_buffer_ms: 0, remaining_time_ms: 300, - target_flashblocks: 5, expected_intervals_ms: vec![100, 300], }, SchedulerIntervalsTestCase { @@ -431,7 +462,6 @@ mod tests { send_offset_ms: 0, end_buffer_ms: 200, remaining_time_ms: 200, - target_flashblocks: 5, expected_intervals_ms: vec![0], }, SchedulerIntervalsTestCase { @@ -440,7 +470,6 @@ mod tests { send_offset_ms: -30, end_buffer_ms: 50, remaining_time_ms: 400, - target_flashblocks: 5, expected_intervals_ms: vec![170, 320], }, SchedulerIntervalsTestCase { @@ -449,7 +478,6 @@ mod tests { send_offset_ms: 0, end_buffer_ms: 0, remaining_time_ms: 1000, - target_flashblocks: 5, expected_intervals_ms: vec![200, 400, 600, 800, 1000], }, ]; @@ -464,7 +492,7 @@ mod tests { block_time_ms: u64, reference_ms: u64, payload_timestamp: u64, - expected_remaining_ms: u64, + expected_remaining_ms: Option, } fn check_remaining_time(test_case: RemainingTimeTestCase) { @@ -477,7 +505,7 @@ mod tests { assert_eq!( remaining, - Duration::from_millis(test_case.expected_remaining_ms), + test_case.expected_remaining_ms, "Failed test case '{}': block_time={}ms, reference={}ms, timestamp={}", test_case.name, test_case.block_time_ms, @@ -494,28 +522,28 @@ mod tests { block_time_ms: 2000, reference_ms: 1_000_000, payload_timestamp: 1002, - expected_remaining_ms: 2000, + expected_remaining_ms: Some(Duration::from_millis(2000)), }, RemainingTimeTestCase { name: "remaining exceeds block time (capped)", block_time_ms: 1000, reference_ms: 1_000_000, payload_timestamp: 1005, - expected_remaining_ms: 1000, + expected_remaining_ms: Some(Duration::from_millis(1000)), }, RemainingTimeTestCase { name: "late FCU (844ms past timestamp)", block_time_ms: 1000, reference_ms: 1_000_844, // 1000.844 seconds payload_timestamp: 1000, - expected_remaining_ms: 0, + expected_remaining_ms: None, }, RemainingTimeTestCase { name: "late FCU (1ms past timestamp)", block_time_ms: 1000, reference_ms: 1_000_001, // 1000.001 seconds payload_timestamp: 1000, - expected_remaining_ms: 0, + expected_remaining_ms: None, }, ]; diff --git a/docs/flashblocks.md b/docs/flashblocks.md index 7a871731..e42ba207 100644 --- a/docs/flashblocks.md +++ b/docs/flashblocks.md @@ -136,8 +136,8 @@ function build_payload(build_arguments, best_payload_cell): The `FlashblockScheduler` handles timing coordination for flashblock production: - **Scheduler**: Pre-computes all send times at initialization based on remaining time until the payload deadline -- **First Offset**: Aligned to `(remaining_time - 1) % interval + 1` to produce evenly spaced flashblocks -- **Trigger Clamping**: The number of triggers is clamped to `block_time / flashblock_interval` to maintain backwards compatibility +- **Offset**: Applied to interval timings to ensure evenly spaced flashblocks +- **Trigger Clamping**: The number of triggers is set from the target_flashblocks count, which is computed based on remaining time and the configured flashblocks interval - **Cancellation Tokens**: When a flashblock trigger fires, the current flashblock building is cancelled and published ### Configuration Parameters @@ -149,7 +149,7 @@ The `FlashblockScheduler` handles timing coordination for flashblock production: | `flashblocks.end-buffer-ms` | Time reserved at end of slot for final processing | 0 | ### Caveats -- If the payload timestamp is in the past or remaining time is 0, the scheduler falls back to using the full block_time +- If the payload timestamp is in the past or remaining time is 0, the scheduler will not trigger any flashblocks. - Late FCU arrivals result in fewer flashblocks being produced (proportional to remaining time) ## Block building flow