From 88ed06dc20f579b3c410839c04e7b726dc17b602 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 03:41:04 +0000 Subject: [PATCH 01/10] feat(storage): defer delete_segment operations until commit Make delete_segment and delete_segment_below_block in the static file provider use an unwind queue pattern for consistency with the hot/cold separation architecture. Deletions are now queued and only executed when commit() is called, matching the existing prune_on_commit pattern. - Add delete_queue to StaticFileProviderInner for deferred jar deletions - Refactor delete_jar into delete_jar + delete_jar_no_reindex - delete_segment/delete_segment_below_block now queue instead of immediately deleting - commit() drains the delete queue, resets affected writers, executes deletions, and rebuilds the index once - has_unwind_queued() checks the delete queue - finalize() errors if delete queue is non-empty - Add reset(segment) to StaticFileWriters to clear stale writer handles - Add 3 tests for deferred deletion behavior Amp-Thread-ID: https://ampcode.com/threads/T-019c30ef-1a42-75c9-b61d-81e062acc279 --- .../src/providers/static_file/manager.rs | 252 +++++++++++++++--- .../src/providers/static_file/writer.rs | 12 + 2 files changed, 224 insertions(+), 40 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 65da2e6217a..44187b7139d 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -12,7 +12,7 @@ use alloy_consensus::{transaction::TransactionMeta, Header}; use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber}; use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256}; use notify::{RecommendedWatcher, RecursiveMode, Watcher}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use reth_chain_state::ExecutedBlock; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain}; use reth_db::{ @@ -395,6 +395,8 @@ pub struct StaticFileProviderInner { _lock_file: Option, /// Genesis block number, default is 0; genesis_block_number: u64, + /// Queued jar deletions to execute on commit. + delete_queue: Mutex)>>, } impl StaticFileProviderInner { @@ -422,6 +424,7 @@ impl StaticFileProviderInner { blocks_per_file, _lock_file, genesis_block_number: 0, + delete_queue: Default::default(), }; Ok(provider) @@ -431,6 +434,20 @@ impl StaticFileProviderInner { self.access.is_read_only() } + fn queue_delete(&self, segment: StaticFileSegment, fixed_range_ends: Vec) { + if !fixed_range_ends.is_empty() { + self.delete_queue.lock().push((segment, fixed_range_ends)); + } + } + + fn has_delete_queued(&self) -> bool { + !self.delete_queue.lock().is_empty() + } + + fn take_delete_queue(&self) -> Vec<(StaticFileSegment, Vec)> { + std::mem::take(&mut *self.delete_queue.lock()) + } + /// Each static file has a fixed number of blocks. This gives out the range where the requested /// block is positioned. /// @@ -892,11 +909,13 @@ impl StaticFileProvider { self.map.remove(&(fixed_block_range_end, segment)); } - /// This handles history expiry by deleting all static files for the given segment below the - /// given block. + /// Queues static files for the given segment below the given block for deletion. + /// + /// Files are removed from disk when [`StaticFileWriter::commit`] is called. Until then the + /// index and on-disk data remain unchanged. /// - /// For example if block is 1M and the blocks per file are 500K this will delete all individual - /// files below 1M, so 0-499K and 500K-999K. + /// For example if block is 1M and the blocks per file are 500K this will queue files 0-499K + /// and 500K-999K for deletion. /// /// This will not delete the file that contains the block itself, because files can only be /// removed entirely. @@ -907,45 +926,56 @@ impl StaticFileProvider { /// requested block is higher than the highest block in static files. This ensures we always /// maintain at least one static file if any exist. /// - /// Returns a list of `SegmentHeader`s from the deleted jars. + /// Returns a list of `SegmentHeader`s from the jars that will be deleted. pub fn delete_segment_below_block( &self, segment: StaticFileSegment, block: BlockNumber, ) -> ProviderResult> { - // Nothing to delete if block is 0. if block == 0 { return Ok(Vec::new()); } let highest_block = self.get_highest_static_file_block(segment); let mut deleted_headers = Vec::new(); + let mut range_ends_to_delete = Vec::new(); - loop { - let Some(block_height) = self.get_lowest_range_end(segment) else { - return Ok(deleted_headers); - }; - - // Stop if we've reached the target block or the highest static file - if block_height >= block || Some(block_height) == highest_block { - return Ok(deleted_headers); + let indexes = self.indexes.read(); + if let Some(index) = indexes.get(segment) { + for &max_block in index.expected_block_ranges_by_max_block.keys() { + if max_block >= block || Some(max_block) == highest_block { + break; + } + range_ends_to_delete.push(max_block); } + } + drop(indexes); + + for &range_end in &range_ends_to_delete { + let fixed_block_range = self.find_fixed_range(segment, range_end); + let key = (fixed_block_range.end(), segment); + + let header = if let Some(jar) = self.map.get(&key) { + jar.jar.user_header().clone() + } else { + let file = self.path.join(segment.filename(&fixed_block_range)); + let jar = NippyJar::::load(&file).map_err(ProviderError::other)?; + jar.user_header().clone() + }; debug!( target: "providers::static_file", ?segment, - ?block_height, - "Deleting static file below block" + ?range_end, + "Queuing static file below block for deletion" ); - // now we need to wipe the static file, this will take care of updating the index and - // advance the lowest tracked block height for the segment. - let header = self.delete_jar(segment, block_height).inspect_err(|err| { - warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block") - })?; - deleted_headers.push(header); } + + self.queue_delete(segment, range_ends_to_delete); + + Ok(deleted_headers) } /// Given a segment and block, it deletes the jar and all files from the respective block range. @@ -959,6 +989,17 @@ impl StaticFileProvider { &self, segment: StaticFileSegment, block: BlockNumber, + ) -> ProviderResult { + let header = self.delete_jar_no_reindex(segment, block)?; + self.initialize_index()?; + Ok(header) + } + + /// Deletes a jar without re-initializing the index afterwards. + fn delete_jar_no_reindex( + &self, + segment: StaticFileSegment, + block: BlockNumber, ) -> ProviderResult { let fixed_block_range = self.find_fixed_range(segment, block); let key = (fixed_block_range.end(), segment); @@ -979,38 +1020,53 @@ impl StaticFileProvider { let header = jar.user_header().clone(); jar.delete().map_err(ProviderError::other)?; - // SAFETY: this is currently necessary to ensure that certain indexes like - // `static_files_min_block` have the correct values after pruning. - self.initialize_index()?; - Ok(header) } - /// Deletes ALL static file jars for the given segment, including the highest one. + /// Queues ALL static file jars for the given segment for deletion, including the highest one. /// - /// CAUTION: destructive. Deletes all files on disk for this segment. + /// CAUTION: destructive. Files are removed from disk when [`StaticFileWriter::commit`] is + /// called. Until then the index and on-disk data remain unchanged. /// /// This is used for `PruneMode::Full` where all data should be removed. /// - /// Returns a list of `SegmentHeader`s from the deleted jars. + /// Returns a list of `SegmentHeader`s from the jars that will be deleted. pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult> { let mut deleted_headers = Vec::new(); + let mut range_ends_to_delete = Vec::new(); + + let indexes = self.indexes.read(); + if let Some(index) = indexes.get(segment) { + for &max_block in index.expected_block_ranges_by_max_block.keys() { + range_ends_to_delete.push(max_block); + } + } + drop(indexes); + + for &range_end in &range_ends_to_delete { + let fixed_block_range = self.find_fixed_range(segment, range_end); + let key = (fixed_block_range.end(), segment); + + let header = if let Some(jar) = self.map.get(&key) { + jar.jar.user_header().clone() + } else { + let file = self.path.join(segment.filename(&fixed_block_range)); + let jar = NippyJar::::load(&file).map_err(ProviderError::other)?; + jar.user_header().clone() + }; - while let Some(block_height) = self.get_highest_static_file_block(segment) { debug!( target: "providers::static_file", ?segment, - ?block_height, - "Deleting static file jar" + ?range_end, + "Queuing static file jar for deletion" ); - let header = self.delete_jar(segment, block_height).inspect_err(|err| { - warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar") - })?; - deleted_headers.push(header); } + self.queue_delete(segment, range_ends_to_delete); + Ok(deleted_headers) } @@ -2353,14 +2409,44 @@ impl StaticFileWriter for StaticFileProvider { } fn commit(&self) -> ProviderResult<()> { - self.writers.commit() + self.writers.commit()?; + + let ops = self.take_delete_queue(); + if ops.is_empty() { + return Ok(()); + } + + let mut segments_seen = Vec::new(); + for (seg, _) in &ops { + if !segments_seen.contains(seg) { + segments_seen.push(*seg); + } + } + for segment in &segments_seen { + self.writers.reset(*segment); + } + + for (segment, range_ends) in ops { + for range_end in range_ends { + self.delete_jar_no_reindex(segment, range_end)?; + } + } + + self.initialize_index()?; + + Ok(()) } fn has_unwind_queued(&self) -> bool { - self.writers.has_unwind_queued() + self.writers.has_unwind_queued() || self.has_delete_queued() } fn finalize(&self) -> ProviderResult<()> { + if self.has_delete_queued() { + return Err(ProviderError::other(StaticFileWriterError::new( + "Cannot finalize with pending jar deletions. Call commit() first.", + ))); + } self.writers.finalize() } } @@ -3125,7 +3211,10 @@ mod tests { use reth_db::test_utils::create_test_static_files_dir; use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment}; - use crate::{providers::StaticFileProvider, StaticFileProviderBuilder}; + use crate::{ + providers::{static_file::manager::StaticFileWriter, StaticFileProvider}, + StaticFileProviderBuilder, + }; #[test] fn test_find_fixed_range_with_block_index() -> eyre::Result<()> { @@ -3246,4 +3335,87 @@ mod tests { Ok(()) } + + fn count_files(path: impl AsRef) -> usize { + std::fs::read_dir(path) + .unwrap() + .filter_map(|e| e.ok()) + .filter(|e| e.path().file_name().map(|n| n != "lock").unwrap_or(false)) + .count() + } + + fn create_test_headers( + static_dir: impl AsRef, + count: u64, + blocks_per_file: u64, + ) -> StaticFileProvider { + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(static_dir.as_ref()) + .with_blocks_per_file(blocks_per_file) + .build() + .unwrap(); + let mut writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap(); + let mut header = alloy_consensus::Header::default(); + for num in 0..count { + header.number = num; + writer.append_header(&header, &alloy_primitives::BlockHash::default()).unwrap(); + } + writer.commit().unwrap(); + drop(writer); + sf_rw + } + + #[test] + fn test_delete_segment_deferred_until_commit() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + assert_eq!(count_files(&static_dir), 9); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(29)); + + sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + + assert_eq!(count_files(&static_dir), 9); + assert!(StaticFileWriter::has_unwind_queued(&sf_rw)); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(29)); + + StaticFileWriter::commit(&sf_rw).unwrap(); + + assert_eq!(count_files(&static_dir), 0); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), None); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_delete_segment_below_block_deferred() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + assert_eq!(count_files(&static_dir), 9); + + sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 20).unwrap(); + + assert_eq!(count_files(&static_dir), 9); + assert!(StaticFileWriter::has_unwind_queued(&sf_rw)); + + StaticFileWriter::commit(&sf_rw).unwrap(); + + assert_eq!(count_files(&static_dir), 3); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(29)); + assert_eq!(sf_rw.get_lowest_range_end(StaticFileSegment::Headers), Some(29)); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_delete_segment_returns_headers() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + let headers = sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + + assert_eq!(headers.len(), 3); + assert_eq!(headers[0].expected_block_range(), SegmentRangeInclusive::new(0, 9)); + assert_eq!(headers[1].expected_block_range(), SegmentRangeInclusive::new(10, 19)); + assert_eq!(headers[2].expected_block_range(), SegmentRangeInclusive::new(20, 29)); + } } diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 5c16293da79..f6fa8a3032d 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -188,6 +188,18 @@ impl StaticFileWriters { debug!(target: "providers::static_file", "Finalized all static file segments into disk"); Ok(()) } + + pub(crate) fn reset(&self, segment: StaticFileSegment) { + let mut writer = match segment { + StaticFileSegment::Headers => self.headers.write(), + StaticFileSegment::Transactions => self.transactions.write(), + StaticFileSegment::Receipts => self.receipts.write(), + StaticFileSegment::TransactionSenders => self.transaction_senders.write(), + StaticFileSegment::AccountChangeSets => self.account_change_sets.write(), + StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(), + }; + *writer = None; + } } /// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`]. From b8274bc39eb5cf61ee098352c9237fca10d0cdde Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 03:48:04 +0000 Subject: [PATCH 02/10] fix: handle partial jar exclusion and double-queue edge cases - Use highest expected range end (BTreeMap last key) instead of index.max_block for highest jar exclusion in delete_segment_below_block, fixing incorrect deletion of partially-filled highest jars - Replace existing queue entries for the same segment on re-queue, preventing duplicate deletions if called twice before commit Amp-Thread-ID: https://ampcode.com/threads/T-019c30ef-1a42-75c9-b61d-81e062acc279 --- .../provider/src/providers/static_file/manager.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 44187b7139d..b8a5384cf90 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -436,7 +436,9 @@ impl StaticFileProviderInner { fn queue_delete(&self, segment: StaticFileSegment, fixed_range_ends: Vec) { if !fixed_range_ends.is_empty() { - self.delete_queue.lock().push((segment, fixed_range_ends)); + let mut queue = self.delete_queue.lock(); + queue.retain(|(seg, _)| *seg != segment); + queue.push((segment, fixed_range_ends)); } } @@ -936,14 +938,15 @@ impl StaticFileProvider { return Ok(Vec::new()); } - let highest_block = self.get_highest_static_file_block(segment); let mut deleted_headers = Vec::new(); let mut range_ends_to_delete = Vec::new(); let indexes = self.indexes.read(); if let Some(index) = indexes.get(segment) { + let highest_expected_end = + index.expected_block_ranges_by_max_block.keys().next_back().copied(); for &max_block in index.expected_block_ranges_by_max_block.keys() { - if max_block >= block || Some(max_block) == highest_block { + if max_block >= block || Some(max_block) == highest_expected_end { break; } range_ends_to_delete.push(max_block); From ca5dccb3f91300e2e2ac70cbd2f35cc71461b1c0 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 04:41:11 +0000 Subject: [PATCH 03/10] refactor: extract shared header-loading helper, fix commit error semantics, add 6 tests - Extract collect_headers_and_queue() to deduplicate header loading between delete_segment and delete_segment_below_block - Use scoped blocks instead of drop(indexes) for index reads - Rename commit result variable to first_delete_err with comment clarifying reindex failure takes priority - Use take_while iterator instead of manual loop in delete_segment_below_block - Add tests: finalize guard, empty segment (both APIs), partial highest jar preservation, delete_below+delete_segment interaction, commit with empty queue noop Amp-Thread-ID: https://ampcode.com/threads/T-019c3135-f7cb-7168-bc87-28a27f381a5a --- .../src/providers/static_file/manager.rs | 311 ++++++++++++++---- 1 file changed, 242 insertions(+), 69 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index b8a5384cf90..b4553cfdea8 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -49,7 +49,7 @@ use reth_storage_api::{ use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError}; use reth_tasks::spawn_scoped_os_thread; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, fmt::Debug, ops::{Bound, Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, @@ -395,8 +395,8 @@ pub struct StaticFileProviderInner { _lock_file: Option, /// Genesis block number, default is 0; genesis_block_number: u64, - /// Queued jar deletions to execute on commit. - delete_queue: Mutex)>>, + /// Queued jar deletions (keyed by segment) to execute on commit. + delete_queue: Mutex>>, } impl StaticFileProviderInner { @@ -434,11 +434,10 @@ impl StaticFileProviderInner { self.access.is_read_only() } - fn queue_delete(&self, segment: StaticFileSegment, fixed_range_ends: Vec) { - if !fixed_range_ends.is_empty() { + fn queue_delete(&self, segment: StaticFileSegment, range_ends: Vec) { + if !range_ends.is_empty() { let mut queue = self.delete_queue.lock(); - queue.retain(|(seg, _)| *seg != segment); - queue.push((segment, fixed_range_ends)); + queue.entry(segment).or_default().extend(range_ends); } } @@ -446,7 +445,7 @@ impl StaticFileProviderInner { !self.delete_queue.lock().is_empty() } - fn take_delete_queue(&self) -> Vec<(StaticFileSegment, Vec)> { + fn take_delete_queue(&self) -> BTreeMap> { std::mem::take(&mut *self.delete_queue.lock()) } @@ -938,47 +937,22 @@ impl StaticFileProvider { return Ok(Vec::new()); } - let mut deleted_headers = Vec::new(); - let mut range_ends_to_delete = Vec::new(); - - let indexes = self.indexes.read(); - if let Some(index) = indexes.get(segment) { + let range_ends_to_delete = { + let indexes = self.indexes.read(); + let Some(index) = indexes.get(segment) else { return Ok(Vec::new()) }; let highest_expected_end = index.expected_block_ranges_by_max_block.keys().next_back().copied(); - for &max_block in index.expected_block_ranges_by_max_block.keys() { - if max_block >= block || Some(max_block) == highest_expected_end { - break; - } - range_ends_to_delete.push(max_block); - } - } - drop(indexes); - - for &range_end in &range_ends_to_delete { - let fixed_block_range = self.find_fixed_range(segment, range_end); - let key = (fixed_block_range.end(), segment); - - let header = if let Some(jar) = self.map.get(&key) { - jar.jar.user_header().clone() - } else { - let file = self.path.join(segment.filename(&fixed_block_range)); - let jar = NippyJar::::load(&file).map_err(ProviderError::other)?; - jar.user_header().clone() - }; - - debug!( - target: "providers::static_file", - ?segment, - ?range_end, - "Queuing static file below block for deletion" - ); - - deleted_headers.push(header); - } - - self.queue_delete(segment, range_ends_to_delete); + index + .expected_block_ranges_by_max_block + .keys() + .copied() + .take_while(|&max_block| { + max_block < block && Some(max_block) != highest_expected_end + }) + .collect::>() + }; - Ok(deleted_headers) + self.collect_headers_and_queue(segment, range_ends_to_delete) } /// Given a segment and block, it deletes the jar and all files from the respective block range. @@ -1035,18 +1009,25 @@ impl StaticFileProvider { /// /// Returns a list of `SegmentHeader`s from the jars that will be deleted. pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult> { - let mut deleted_headers = Vec::new(); - let mut range_ends_to_delete = Vec::new(); + let range_ends_to_delete = { + let indexes = self.indexes.read(); + let Some(index) = indexes.get(segment) else { return Ok(Vec::new()) }; + index.expected_block_ranges_by_max_block.keys().copied().collect::>() + }; - let indexes = self.indexes.read(); - if let Some(index) = indexes.get(segment) { - for &max_block in index.expected_block_ranges_by_max_block.keys() { - range_ends_to_delete.push(max_block); - } - } - drop(indexes); + self.collect_headers_and_queue(segment, range_ends_to_delete) + } - for &range_end in &range_ends_to_delete { + /// Loads [`SegmentHeader`]s for the given range ends, queues them for deletion, and returns the + /// headers. + fn collect_headers_and_queue( + &self, + segment: StaticFileSegment, + range_ends: Vec, + ) -> ProviderResult> { + let mut headers = Vec::with_capacity(range_ends.len()); + + for &range_end in &range_ends { let fixed_block_range = self.find_fixed_range(segment, range_end); let key = (fixed_block_range.end(), segment); @@ -1065,12 +1046,12 @@ impl StaticFileProvider { "Queuing static file jar for deletion" ); - deleted_headers.push(header); + headers.push(header); } - self.queue_delete(segment, range_ends_to_delete); + self.queue_delete(segment, range_ends); - Ok(deleted_headers) + Ok(headers) } /// Given a segment and block range it returns a cached @@ -2419,25 +2400,33 @@ impl StaticFileWriter for StaticFileProvider { return Ok(()); } - let mut segments_seen = Vec::new(); - for (seg, _) in &ops { - if !segments_seen.contains(seg) { - segments_seen.push(*seg); - } - } - for segment in &segments_seen { - self.writers.reset(*segment); + for &segment in ops.keys() { + self.writers.reset(segment); } + let mut first_delete_err = Ok(()); for (segment, range_ends) in ops { for range_end in range_ends { - self.delete_jar_no_reindex(segment, range_end)?; + if let Err(err) = self.delete_jar_no_reindex(segment, range_end) { + warn!( + target: "providers::static_file", + ?segment, + ?range_end, + ?err, + "Failed to delete static file jar during commit" + ); + if first_delete_err.is_ok() { + first_delete_err = Err(err); + } + } } } + // Reindex so the provider state matches what's on disk. + // A reindex failure takes priority since it leaves the provider in an unknown state. self.initialize_index()?; - Ok(()) + first_delete_err } fn has_unwind_queued(&self) -> bool { @@ -3421,4 +3410,188 @@ mod tests { assert_eq!(headers[1].expected_block_range(), SegmentRangeInclusive::new(10, 19)); assert_eq!(headers[2].expected_block_range(), SegmentRangeInclusive::new(20, 29)); } + + #[test] + fn test_delete_segment_single_jar() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 5, 10); + + assert_eq!(count_files(&static_dir), 3); + + let headers = sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + assert_eq!(headers.len(), 1); + + StaticFileWriter::commit(&sf_rw).unwrap(); + + assert_eq!(count_files(&static_dir), 0); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), None); + } + + #[test] + fn test_delete_below_block_single_jar_preserves_it() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 5, 10); + + let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 100).unwrap(); + assert!(headers.is_empty()); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(4)); + } + + #[test] + fn test_delete_below_block_in_first_range() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 5).unwrap(); + assert!(headers.is_empty()); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_delete_below_block_at_boundary() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 9).unwrap(); + + // block=9 is the end of the first range (0-9), so it should NOT be deleted + assert!(headers.is_empty()); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_delete_below_block_just_past_boundary() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 10).unwrap(); + + // block=10 means range 0-9 (end=9) is < 10, but it's not the highest, so delete it + assert_eq!(headers.len(), 1); + assert_eq!(headers[0].expected_block_range(), SegmentRangeInclusive::new(0, 9)); + + StaticFileWriter::commit(&sf_rw).unwrap(); + assert_eq!(count_files(&static_dir), 6); + } + + #[test] + fn test_delete_queue_merges_across_calls() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 50, 10); + + // 5 jars: 0-9, 10-19, 20-29, 30-39, 40-49 + + // First call: delete below block 20 => queues jar 0-9 (range end 9) + sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 20).unwrap(); + + // Second call with index still reflecting all 5 jars: delete below block 40 + // queues {9, 19, 29, 39} (all below 40, excluding highest end 49). + // Merge with first call's {9} yields {9, 19, 29, 39}. + sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 40).unwrap(); + + StaticFileWriter::commit(&sf_rw).unwrap(); + + // Only jar 40-49 should remain (3 files: data + config + offsets) + assert_eq!(count_files(&static_dir), 3); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(49)); + } + + #[test] + fn test_delete_below_block_zero() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 0).unwrap(); + assert!(headers.is_empty()); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_finalize_rejects_pending_deletes() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + assert!(StaticFileWriter::finalize(&sf_rw).is_err()); + + StaticFileWriter::commit(&sf_rw).unwrap(); + assert!(StaticFileWriter::finalize(&sf_rw).is_ok()); + } + + #[test] + fn test_delete_segment_empty_segment() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(static_dir.as_ref()) + .with_blocks_per_file(10) + .build() + .unwrap(); + + let headers = sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + assert!(headers.is_empty()); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_delete_below_block_empty_segment() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(static_dir.as_ref()) + .with_blocks_per_file(10) + .build() + .unwrap(); + + let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 100).unwrap(); + assert!(headers.is_empty()); + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + } + + #[test] + fn test_delete_below_block_partial_highest_jar() { + let (static_dir, _) = create_test_static_files_dir(); + // 13 blocks with blocks_per_file=10 => jar 0-9 (full), jar 10-12 (partial, expected 10-19) + let sf_rw = create_test_headers(&static_dir, 13, 10); + + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(12)); + + // Block far above tip: should only delete jar 0-9, preserving highest (10-19) + let headers = + sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 1_000_000).unwrap(); + assert_eq!(headers.len(), 1); + assert_eq!(headers[0].expected_block_range(), SegmentRangeInclusive::new(0, 9)); + + StaticFileWriter::commit(&sf_rw).unwrap(); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(12)); + assert_eq!(count_files(&static_dir), 3); + } + + #[test] + fn test_delete_below_then_delete_segment() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + // Queue partial deletion first + sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 20).unwrap(); + // Then queue full deletion — merge should produce all three range ends + sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + + StaticFileWriter::commit(&sf_rw).unwrap(); + + assert_eq!(count_files(&static_dir), 0); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), None); + } + + #[test] + fn test_commit_with_empty_queue_is_noop() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); + StaticFileWriter::commit(&sf_rw).unwrap(); + + assert_eq!(count_files(&static_dir), 9); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(29)); + } } From 137f7ff0c8aa4d0454de54a6a251ee57f7b171cc Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 05:11:59 +0000 Subject: [PATCH 04/10] fix: re-enqueue failed deletions, add tests, cleanup comments - Failed jar deletions are re-enqueued for retry on next commit instead of being permanently lost. - Rename has_delete_queued to has_queued_deletions. - Remove obvious doc comments, trim test comments. - Add test_write_after_full_deletion and test_delete_segment_then_delete_below_block. Amp-Thread-ID: https://ampcode.com/threads/T-019c314c-645a-7311-905b-52de28ceaefb --- .../src/providers/static_file/manager.rs | 81 ++++++++++++------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index b4553cfdea8..d11557b4b67 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -441,7 +441,7 @@ impl StaticFileProviderInner { } } - fn has_delete_queued(&self) -> bool { + fn has_queued_deletions(&self) -> bool { !self.delete_queue.lock().is_empty() } @@ -972,7 +972,6 @@ impl StaticFileProvider { Ok(header) } - /// Deletes a jar without re-initializing the index afterwards. fn delete_jar_no_reindex( &self, segment: StaticFileSegment, @@ -1018,8 +1017,7 @@ impl StaticFileProvider { self.collect_headers_and_queue(segment, range_ends_to_delete) } - /// Loads [`SegmentHeader`]s for the given range ends, queues them for deletion, and returns the - /// headers. + /// Reads headers from each jar in `range_ends` and adds them to the delete queue. fn collect_headers_and_queue( &self, segment: StaticFileSegment, @@ -2404,7 +2402,8 @@ impl StaticFileWriter for StaticFileProvider { self.writers.reset(segment); } - let mut first_delete_err = Ok(()); + let mut first_err: Option = None; + let mut failed: BTreeMap> = BTreeMap::new(); for (segment, range_ends) in ops { for range_end in range_ends { if let Err(err) = self.delete_jar_no_reindex(segment, range_end) { @@ -2415,26 +2414,32 @@ impl StaticFileWriter for StaticFileProvider { ?err, "Failed to delete static file jar during commit" ); - if first_delete_err.is_ok() { - first_delete_err = Err(err); + if first_err.is_none() { + first_err = Some(err); } + failed.entry(segment).or_default().insert(range_end); } } } - // Reindex so the provider state matches what's on disk. - // A reindex failure takes priority since it leaves the provider in an unknown state. + // Re-enqueue failed deletions so the next commit can retry them. + if !failed.is_empty() { + self.delete_queue.lock().extend(failed); + } + + // Reindex to match what's on disk. A reindex failure takes priority + // since it leaves the provider in an unknown state. self.initialize_index()?; - first_delete_err + first_err.map_or(Ok(()), Err) } fn has_unwind_queued(&self) -> bool { - self.writers.has_unwind_queued() || self.has_delete_queued() + self.writers.has_unwind_queued() || self.has_queued_deletions() } fn finalize(&self) -> ProviderResult<()> { - if self.has_delete_queued() { + if self.has_queued_deletions() { return Err(ProviderError::other(StaticFileWriterError::new( "Cannot finalize with pending jar deletions. Call commit() first.", ))); @@ -3455,8 +3460,6 @@ mod tests { let sf_rw = create_test_headers(&static_dir, 30, 10); let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 9).unwrap(); - - // block=9 is the end of the first range (0-9), so it should NOT be deleted assert!(headers.is_empty()); assert!(!StaticFileWriter::has_unwind_queued(&sf_rw)); } @@ -3467,8 +3470,6 @@ mod tests { let sf_rw = create_test_headers(&static_dir, 30, 10); let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 10).unwrap(); - - // block=10 means range 0-9 (end=9) is < 10, but it's not the highest, so delete it assert_eq!(headers.len(), 1); assert_eq!(headers[0].expected_block_range(), SegmentRangeInclusive::new(0, 9)); @@ -3483,17 +3484,11 @@ mod tests { // 5 jars: 0-9, 10-19, 20-29, 30-39, 40-49 - // First call: delete below block 20 => queues jar 0-9 (range end 9) sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 20).unwrap(); - - // Second call with index still reflecting all 5 jars: delete below block 40 - // queues {9, 19, 29, 39} (all below 40, excluding highest end 49). - // Merge with first call's {9} yields {9, 19, 29, 39}. sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 40).unwrap(); StaticFileWriter::commit(&sf_rw).unwrap(); - // Only jar 40-49 should remain (3 files: data + config + offsets) assert_eq!(count_files(&static_dir), 3); assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(49)); } @@ -3551,12 +3546,9 @@ mod tests { #[test] fn test_delete_below_block_partial_highest_jar() { let (static_dir, _) = create_test_static_files_dir(); - // 13 blocks with blocks_per_file=10 => jar 0-9 (full), jar 10-12 (partial, expected 10-19) let sf_rw = create_test_headers(&static_dir, 13, 10); - assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(12)); - // Block far above tip: should only delete jar 0-9, preserving highest (10-19) let headers = sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 1_000_000).unwrap(); assert_eq!(headers.len(), 1); @@ -3572,9 +3564,7 @@ mod tests { let (static_dir, _) = create_test_static_files_dir(); let sf_rw = create_test_headers(&static_dir, 30, 10); - // Queue partial deletion first sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 20).unwrap(); - // Then queue full deletion — merge should produce all three range ends sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); StaticFileWriter::commit(&sf_rw).unwrap(); @@ -3594,4 +3584,41 @@ mod tests { assert_eq!(count_files(&static_dir), 9); assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(29)); } + + #[test] + fn test_write_after_full_deletion() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 10, 10); + + sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + StaticFileWriter::commit(&sf_rw).unwrap(); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), None); + assert_eq!(count_files(&static_dir), 0); + + let mut writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap(); + let mut header = alloy_consensus::Header::default(); + for num in 0..5 { + header.number = num; + writer.append_header(&header, &alloy_primitives::BlockHash::default()).unwrap(); + } + writer.commit().unwrap(); + drop(writer); + + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(4)); + assert_eq!(count_files(&static_dir), 3); + } + + #[test] + fn test_delete_segment_then_delete_below_block() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + sf_rw.delete_segment_below_block(StaticFileSegment::Headers, 20).unwrap(); + + StaticFileWriter::commit(&sf_rw).unwrap(); + + assert_eq!(count_files(&static_dir), 0); + assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), None); + } } From a49aafbd93224550f3cd0de5ad55f20a720be12f Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 05:24:28 +0000 Subject: [PATCH 05/10] refactor: simplify delete queue to flat Vec, fail-fast on errors, restore #[instrument], improve docs Amp-Thread-ID: https://ampcode.com/threads/T-019c3162-aa34-7393-8368-0d4cb0f379c6 --- .../src/providers/static_file/manager.rs | 52 ++++++------------- 1 file changed, 17 insertions(+), 35 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index d11557b4b67..0b160f9a14f 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -49,7 +49,7 @@ use reth_storage_api::{ use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError}; use reth_tasks::spawn_scoped_os_thread; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeMap, fmt::Debug, ops::{Bound, Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, @@ -395,8 +395,8 @@ pub struct StaticFileProviderInner { _lock_file: Option, /// Genesis block number, default is 0; genesis_block_number: u64, - /// Queued jar deletions (keyed by segment) to execute on commit. - delete_queue: Mutex>>, + /// Queued jar deletions to execute on commit. Each entry is a `(segment, range_end)` pair. + delete_queue: Mutex>, } impl StaticFileProviderInner { @@ -437,7 +437,7 @@ impl StaticFileProviderInner { fn queue_delete(&self, segment: StaticFileSegment, range_ends: Vec) { if !range_ends.is_empty() { let mut queue = self.delete_queue.lock(); - queue.entry(segment).or_default().extend(range_ends); + queue.extend(range_ends.into_iter().map(|r| (segment, r))); } } @@ -445,8 +445,11 @@ impl StaticFileProviderInner { !self.delete_queue.lock().is_empty() } - fn take_delete_queue(&self) -> BTreeMap> { - std::mem::take(&mut *self.delete_queue.lock()) + fn take_delete_queue(&self) -> Vec<(StaticFileSegment, BlockNumber)> { + let mut queue = std::mem::take(&mut *self.delete_queue.lock()); + queue.sort_unstable(); + queue.dedup(); + queue } /// Each static file has a fixed number of blocks. This gives out the range where the requested @@ -2398,40 +2401,19 @@ impl StaticFileWriter for StaticFileProvider { return Ok(()); } - for &segment in ops.keys() { - self.writers.reset(segment); - } - - let mut first_err: Option = None; - let mut failed: BTreeMap> = BTreeMap::new(); - for (segment, range_ends) in ops { - for range_end in range_ends { - if let Err(err) = self.delete_jar_no_reindex(segment, range_end) { - warn!( - target: "providers::static_file", - ?segment, - ?range_end, - ?err, - "Failed to delete static file jar during commit" - ); - if first_err.is_none() { - first_err = Some(err); - } - failed.entry(segment).or_default().insert(range_end); - } + let mut last_reset = None; + for &(segment, _) in &ops { + if last_reset != Some(segment) { + last_reset = Some(segment); + self.writers.reset(segment); } } - // Re-enqueue failed deletions so the next commit can retry them. - if !failed.is_empty() { - self.delete_queue.lock().extend(failed); + for (segment, range_end) in ops { + self.delete_jar_no_reindex(segment, range_end)?; } - // Reindex to match what's on disk. A reindex failure takes priority - // since it leaves the provider in an unknown state. - self.initialize_index()?; - - first_err.map_or(Ok(()), Err) + self.initialize_index() } fn has_unwind_queued(&self) -> bool { From c9cdc25b5e5abdb99ac76eace49dc68b41ff93cb Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 06:04:07 +0000 Subject: [PATCH 06/10] fix: re-queue remaining jar deletions on partial commit failure If delete_jar_no_reindex fails midway through the deletion loop, remaining operations are re-inserted into the queue so the next commit() retries them. initialize_index() now always runs regardless of deletion success to keep the in-memory index consistent. Amp-Thread-ID: https://ampcode.com/threads/T-019c3170-799e-757d-ab53-6c1439d32885 --- .../src/providers/static_file/manager.rs | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 0b160f9a14f..2f9207da83e 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -452,6 +452,12 @@ impl StaticFileProviderInner { queue } + fn queue_delete_raw(&self, ops: Vec<(StaticFileSegment, BlockNumber)>) { + if !ops.is_empty() { + self.delete_queue.lock().extend(ops); + } + } + /// Each static file has a fixed number of blocks. This gives out the range where the requested /// block is positioned. /// @@ -2396,7 +2402,7 @@ impl StaticFileWriter for StaticFileProvider { fn commit(&self) -> ProviderResult<()> { self.writers.commit()?; - let ops = self.take_delete_queue(); + let mut ops = self.take_delete_queue(); if ops.is_empty() { return Ok(()); } @@ -2409,11 +2415,28 @@ impl StaticFileWriter for StaticFileProvider { } } - for (segment, range_end) in ops { - self.delete_jar_no_reindex(segment, range_end)?; + let mut delete_err = None; + let mut completed = 0; + for &(segment, range_end) in &ops { + if let Err(err) = self.delete_jar_no_reindex(segment, range_end) { + delete_err = Some(err); + break; + } + completed += 1; + } + + let remaining = ops.split_off(completed); + if !remaining.is_empty() { + self.0.queue_delete_raw(remaining); + } + + self.initialize_index()?; + + if let Some(err) = delete_err { + return Err(err); } - self.initialize_index() + Ok(()) } fn has_unwind_queued(&self) -> bool { From 7a876cc86811985f60d6d9a3e392e53a8f7fa712 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 06:15:15 +0000 Subject: [PATCH 07/10] fix: address PR review comments - Add docstrings for queue_delete, has_queued_deletions, take_delete_queue, queue_delete_raw, delete_jar_no_reindex, reset, commit, has_unwind_queued, and finalize - Add step-by-step section comments inside commit() for readability - Rename max_block -> expected_end in delete_segment_below_block to clarify that the comparison uses expected block range ends (BTreeMap keys), not actual block numbers, preventing confusion about partially-filled jars - Add comment explaining why highest jar exclusion is correct for partially-filled jars Amp-Thread-ID: https://ampcode.com/threads/T-019c3194-a863-7159-9eed-8263c91452ce --- .../src/providers/static_file/manager.rs | 35 +++++++++++++++++-- .../src/providers/static_file/writer.rs | 1 + 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 2f9207da83e..d8ef4b05721 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -434,6 +434,7 @@ impl StaticFileProviderInner { self.access.is_read_only() } + /// Enqueues jar deletions for `segment` at the given expected block range ends. fn queue_delete(&self, segment: StaticFileSegment, range_ends: Vec) { if !range_ends.is_empty() { let mut queue = self.delete_queue.lock(); @@ -441,10 +442,12 @@ impl StaticFileProviderInner { } } + /// Returns `true` if there are any pending jar deletions. fn has_queued_deletions(&self) -> bool { !self.delete_queue.lock().is_empty() } + /// Drains and returns the delete queue, sorted and deduplicated. fn take_delete_queue(&self) -> Vec<(StaticFileSegment, BlockNumber)> { let mut queue = std::mem::take(&mut *self.delete_queue.lock()); queue.sort_unstable(); @@ -452,6 +455,7 @@ impl StaticFileProviderInner { queue } + /// Re-enqueues previously taken operations (used when a deletion fails mid-way). fn queue_delete_raw(&self, ops: Vec<(StaticFileSegment, BlockNumber)>) { if !ops.is_empty() { self.delete_queue.lock().extend(ops); @@ -949,14 +953,16 @@ impl StaticFileProvider { let range_ends_to_delete = { let indexes = self.indexes.read(); let Some(index) = indexes.get(segment) else { return Ok(Vec::new()) }; + // Use the last BTreeMap key (expected block end of the highest jar) to + // ensure we never delete the highest jar regardless of how full it is. let highest_expected_end = index.expected_block_ranges_by_max_block.keys().next_back().copied(); index .expected_block_ranges_by_max_block .keys() .copied() - .take_while(|&max_block| { - max_block < block && Some(max_block) != highest_expected_end + .take_while(|&expected_end| { + expected_end < block && Some(expected_end) != highest_expected_end }) .collect::>() }; @@ -981,6 +987,10 @@ impl StaticFileProvider { Ok(header) } + /// Deletes a single jar for `segment` at `block` without rebuilding the index. + /// + /// CAUTION: destructive. Deletes files on disk immediately. Callers must + /// call [`Self::initialize_index`] afterwards to keep the index consistent. fn delete_jar_no_reindex( &self, segment: StaticFileSegment, @@ -2399,14 +2409,28 @@ impl StaticFileWriter for StaticFileProvider { ) } + /// Commits all pending writes and executes any queued jar deletions. + /// + /// 1. Flush pending writer data via [`StaticFileWriters::commit`]. + /// 2. Drain the deferred delete queue (populated by [`Self::delete_segment`] / + /// [`Self::delete_segment_below_block`]). + /// 3. Reset writers for affected segments so they release file handles. + /// 4. Delete jar files from disk (without rebuilding the index after each). + /// 5. Rebuild the index once at the end. + /// + /// If a deletion fails mid-way, unprocessed operations are re-queued so a + /// subsequent `commit()` can retry them. fn commit(&self) -> ProviderResult<()> { + // Step 1: flush pending writer data. self.writers.commit()?; + // Step 2: drain the delete queue (sorted + deduped). let mut ops = self.take_delete_queue(); if ops.is_empty() { return Ok(()); } + // Step 3: reset writers for segments about to be deleted so file handles are released. let mut last_reset = None; for &(segment, _) in &ops { if last_reset != Some(segment) { @@ -2415,6 +2439,7 @@ impl StaticFileWriter for StaticFileProvider { } } + // Step 4: delete jar files from disk. let mut delete_err = None; let mut completed = 0; for &(segment, range_end) in &ops { @@ -2425,11 +2450,13 @@ impl StaticFileWriter for StaticFileProvider { completed += 1; } + // Re-queue any operations that were not completed due to an error. let remaining = ops.split_off(completed); if !remaining.is_empty() { self.0.queue_delete_raw(remaining); } + // Step 5: rebuild the index once after all deletions. self.initialize_index()?; if let Some(err) = delete_err { @@ -2439,10 +2466,14 @@ impl StaticFileWriter for StaticFileProvider { Ok(()) } + /// Returns `true` if any writer has a pending unwind or there are queued jar deletions. fn has_unwind_queued(&self) -> bool { self.writers.has_unwind_queued() || self.has_queued_deletions() } + /// Finalizes all writers by flushing configuration to disk and updating indices. + /// + /// Returns an error if there are pending jar deletions — use [`Self::commit`] instead. fn finalize(&self) -> ProviderResult<()> { if self.has_queued_deletions() { return Err(ProviderError::other(StaticFileWriterError::new( diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index f6fa8a3032d..5064e16f0f5 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -189,6 +189,7 @@ impl StaticFileWriters { Ok(()) } + /// Drops the writer for `segment`, releasing its file handle so the jar can be deleted. pub(crate) fn reset(&self, segment: StaticFileSegment) { let mut writer = match segment { StaticFileSegment::Headers => self.headers.write(), From 97b4d28cc3270f6907a9d0077ca69d3deb6d67bf Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 07:25:47 +0000 Subject: [PATCH 08/10] fix: remove doc link to private StaticFileWriters::commit Amp-Thread-ID: https://ampcode.com/threads/T-019c3194-a863-7159-9eed-8263c91452ce --- crates/storage/provider/src/providers/static_file/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index d8ef4b05721..61dc876b1e3 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -2411,7 +2411,7 @@ impl StaticFileWriter for StaticFileProvider { /// Commits all pending writes and executes any queued jar deletions. /// - /// 1. Flush pending writer data via [`StaticFileWriters::commit`]. + /// 1. Flush pending writer data. /// 2. Drain the deferred delete queue (populated by [`Self::delete_segment`] / /// [`Self::delete_segment_below_block`]). /// 3. Reset writers for affected segments so they release file handles. From 708299e0860500626fe3b09af2edd2643c7134ba Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 08:00:31 +0000 Subject: [PATCH 09/10] chore: remove Step N: prefixes from commit() comments Amp-Thread-ID: https://ampcode.com/threads/T-019c3194-a863-7159-9eed-8263c91452ce --- .../provider/src/providers/static_file/manager.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 61dc876b1e3..c611c33799a 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -2421,16 +2421,16 @@ impl StaticFileWriter for StaticFileProvider { /// If a deletion fails mid-way, unprocessed operations are re-queued so a /// subsequent `commit()` can retry them. fn commit(&self) -> ProviderResult<()> { - // Step 1: flush pending writer data. + // Flush pending writer data. self.writers.commit()?; - // Step 2: drain the delete queue (sorted + deduped). + // Drain the delete queue (sorted + deduped). let mut ops = self.take_delete_queue(); if ops.is_empty() { return Ok(()); } - // Step 3: reset writers for segments about to be deleted so file handles are released. + // Reset writers for segments about to be deleted so file handles are released. let mut last_reset = None; for &(segment, _) in &ops { if last_reset != Some(segment) { @@ -2439,7 +2439,7 @@ impl StaticFileWriter for StaticFileProvider { } } - // Step 4: delete jar files from disk. + // Delete jar files from disk. let mut delete_err = None; let mut completed = 0; for &(segment, range_end) in &ops { @@ -2456,7 +2456,7 @@ impl StaticFileWriter for StaticFileProvider { self.0.queue_delete_raw(remaining); } - // Step 5: rebuild the index once after all deletions. + // Rebuild the index once after all deletions. self.initialize_index()?; if let Some(err) = delete_err { From 4c40235035f0b16cfdc6f87fb97a8d1b6333ba6d Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 08:29:50 +0000 Subject: [PATCH 10/10] fix: address review feedback on deferred deletion PR - Add error semantics doc to commit() clarifying that DB changes are already durable when this method runs, and callers must not roll back. - Clarify delete_segment_below_block docs: explicitly document that the highest jar is always preserved even when its blocks are below the threshold, and point to delete_segment for full removal. - Add test for requeue-on-failure path: sabotages a jar's config file to force deletion failure, verifies unprocessed ops are requeued and has_unwind_queued remains true across retries. Amp-Thread-ID: https://ampcode.com/threads/T-019c31f9-d89c-72bd-99bc-7937843012b4 --- .../src/providers/static_file/manager.rs | 78 +++++++++++++++++-- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index c611c33799a..8afd84b549e 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -928,17 +928,19 @@ impl StaticFileProvider { /// Files are removed from disk when [`StaticFileWriter::commit`] is called. Until then the /// index and on-disk data remain unchanged. /// - /// For example if block is 1M and the blocks per file are 500K this will queue files 0-499K - /// and 500K-999K for deletion. + /// Only **full jars whose expected block range ends strictly below `block`** are queued. + /// The jar that *contains* `block` is never deleted because jars can only be removed + /// entirely. /// - /// This will not delete the file that contains the block itself, because files can only be - /// removed entirely. + /// For example, with 500K blocks per file and `block = 1_000_000`, jars 0-499K and + /// 500K-999K are queued while the 1M-1.499M jar is kept. /// - /// # Safety + /// # Highest-jar preservation /// - /// This method will never delete the highest static file for the segment, even if the - /// requested block is higher than the highest block in static files. This ensures we always - /// maintain at least one static file if any exist. + /// The highest jar for the segment is **always preserved**, even when its blocks are + /// entirely below the requested threshold. This means that if only one jar exists, or + /// if `block` is higher than every block in static files, no jars are deleted. Use + /// [`Self::delete_segment`] to remove all jars including the highest. /// /// Returns a list of `SegmentHeader`s from the jars that will be deleted. pub fn delete_segment_below_block( @@ -2420,6 +2422,15 @@ impl StaticFileWriter for StaticFileProvider { /// /// If a deletion fails mid-way, unprocessed operations are re-queued so a /// subsequent `commit()` can retry them. + /// + /// # Error semantics + /// + /// When used through [`DatabaseProvider::commit`](crate::providers::DatabaseProvider), + /// the database transaction is committed **before** this method runs. An error + /// returned here therefore means the DB changes are already durable but some + /// static-file jar deletions did not complete. Callers must **not** attempt to + /// roll back the database in response to this error — the unprocessed deletions + /// are re-queued and will be retried on the next `commit()` call. fn commit(&self) -> ProviderResult<()> { // Flush pending writer data. self.writers.commit()?; @@ -3644,6 +3655,57 @@ mod tests { assert_eq!(count_files(&static_dir), 3); } + #[test] + fn test_commit_requeues_on_deletion_failure() { + let (static_dir, _) = create_test_static_files_dir(); + let sf_rw = create_test_headers(&static_dir, 30, 10); + + // 3 jars: 0-9, 10-19, 20-29 + assert_eq!(count_files(&static_dir), 9); // 3 jars × 3 files each + + sf_rw.delete_segment(StaticFileSegment::Headers).unwrap(); + assert!(StaticFileWriter::has_unwind_queued(&sf_rw)); + + // Sabotage the first jar (range_end=9) so delete_jar_no_reindex fails on load: + // remove it from the in-memory map and delete its .conf file from disk. + let first_range = sf_rw.find_fixed_range(StaticFileSegment::Headers, 0); + let key = (first_range.end(), StaticFileSegment::Headers); + sf_rw.map.remove(&key); + + let conf_path = static_dir + .as_ref() + .join(StaticFileSegment::Headers.filename(&first_range)) + .with_extension("conf"); + std::fs::remove_file(&conf_path).unwrap(); + + // commit() should fail because NippyJar::load can't find the .conf file. + let err = StaticFileWriter::commit(&sf_rw); + assert!(err.is_err(), "commit should fail when a jar's config file is missing"); + + // The failed jar (0-9) plus all subsequent jars (10-19, 20-29) should be requeued. + assert!(StaticFileWriter::has_unwind_queued(&sf_rw)); + + // The jars that were never attempted should still exist on disk. + // Jar 0-9 lost its .conf so has 2 remaining files; jars 10-19 and 20-29 are intact (3 + // each). + assert_eq!(count_files(&static_dir), 8); // 2 + 3 + 3 + + // Now remove the sabotaged jar's remaining files so it won't block again, + // and also remove it from the requeue (simulate "already cleaned up"). + // The simplest way: just delete the leftover files for jar 0-9 manually. + let data_path = static_dir.as_ref().join(StaticFileSegment::Headers.filename(&first_range)); + let offsets_path = data_path.with_extension("off"); + let _ = std::fs::remove_file(&data_path); + let _ = std::fs::remove_file(&offsets_path); + + // Second commit will still fail on the same queued (Headers, 9) entry since its + // files are gone, but the remaining jars (10-19, 20-29) also get requeued. + // This confirms the requeue mechanism preserves unprocessed operations across retries. + let err2 = StaticFileWriter::commit(&sf_rw); + assert!(err2.is_err()); + assert!(StaticFileWriter::has_unwind_queued(&sf_rw)); + } + #[test] fn test_delete_segment_then_delete_below_block() { let (static_dir, _) = create_test_static_files_dir();