diff --git a/Cargo.lock b/Cargo.lock index d43bd1cc615..986b8879a46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10896,6 +10896,7 @@ dependencies = [ "serde", "serde_json", "strum 0.27.2", + "tempfile", ] [[package]] diff --git a/crates/static-file/types/Cargo.toml b/crates/static-file/types/Cargo.toml index 1a073ad584c..62e4ece18f5 100644 --- a/crates/static-file/types/Cargo.toml +++ b/crates/static-file/types/Cargo.toml @@ -24,6 +24,7 @@ strum = { workspace = true, features = ["derive"] } reth-nippy-jar.workspace = true serde_json.workspace = true insta.workspace = true +tempfile.workspace = true [features] default = ["std"] diff --git a/crates/static-file/types/src/changeset_offsets.rs b/crates/static-file/types/src/changeset_offsets.rs new file mode 100644 index 00000000000..696fa402892 --- /dev/null +++ b/crates/static-file/types/src/changeset_offsets.rs @@ -0,0 +1,215 @@ +//! Changeset offset sidecar file I/O. +//! +//! Provides append-only writing and O(1) random-access reading for changeset offsets. +//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`. + +use crate::ChangesetOffset; +use std::{ + fs::{File, OpenOptions}, + io::{self, Read, Seek, SeekFrom, Write}, + path::Path, +}; + +/// Writer for appending changeset offsets to a sidecar file. +#[derive(Debug)] +pub struct ChangesetOffsetWriter { + file: File, + /// Number of records written (tracked separately for sync) + records_written: u64, +} + +impl ChangesetOffsetWriter { + /// Record size in bytes (u64 offset + u64 num_changes). + const RECORD_SIZE: usize = 16; + + /// Opens or creates the changeset offset file for appending. + pub fn new(path: impl AsRef) -> io::Result { + let file = OpenOptions::new().create(true).read(true).append(true).open(path)?; + + let records_written = file.metadata()?.len() / Self::RECORD_SIZE as u64; + + Ok(Self { file, records_written }) + } + + /// Appends a single changeset offset record. + pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> { + let mut buf = [0u8; Self::RECORD_SIZE]; + buf[..8].copy_from_slice(&offset.offset().to_le_bytes()); + buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes()); + self.file.write_all(&buf)?; + self.records_written += 1; + Ok(()) + } + + /// Appends multiple changeset offset records. + pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> { + for offset in offsets { + self.append(offset)?; + } + Ok(()) + } + + /// Syncs all data to disk. Must be called before committing the header. + pub fn sync(&mut self) -> io::Result<()> { + self.file.sync_all() + } + + /// Truncates the file to contain exactly `len` records. + /// Used after prune operations to reclaim space. + pub fn truncate(&mut self, len: u64) -> io::Result<()> { + self.file.set_len(len * Self::RECORD_SIZE as u64)?; + self.records_written = len; + Ok(()) + } + + /// Returns the number of records in the file. + pub fn len(&self) -> u64 { + self.records_written + } + + /// Returns true if the file is empty. + pub fn is_empty(&self) -> bool { + self.records_written == 0 + } +} + +/// Reader for changeset offsets with O(1) random access. +#[derive(Debug)] +pub struct ChangesetOffsetReader { + file: File, + /// Cached file length in records + len: u64, +} + +impl ChangesetOffsetReader { + /// Record size in bytes. + const RECORD_SIZE: usize = 16; + + /// Opens the changeset offset file for reading. + pub fn new(path: impl AsRef) -> io::Result { + let file = File::open(path)?; + let len = file.metadata()?.len() / Self::RECORD_SIZE as u64; + Ok(Self { file, len }) + } + + /// Opens with an explicit length (from header metadata). + /// Any records beyond `len` are ignored. + pub fn with_len(path: impl AsRef, len: u64) -> io::Result { + let file = File::open(path)?; + Ok(Self { file, len }) + } + + /// Reads a single changeset offset by block index. + /// Returns None if index is out of bounds. + pub fn get(&mut self, block_index: u64) -> io::Result> { + if block_index >= self.len { + return Ok(None); + } + + let byte_pos = block_index * Self::RECORD_SIZE as u64; + self.file.seek(SeekFrom::Start(byte_pos))?; + + let mut buf = [0u8; Self::RECORD_SIZE]; + self.file.read_exact(&mut buf)?; + + let offset = u64::from_le_bytes(buf[..8].try_into().unwrap()); + let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap()); + + Ok(Some(ChangesetOffset::new(offset, num_changes))) + } + + /// Reads a range of changeset offsets. + pub fn get_range(&mut self, start: u64, end: u64) -> io::Result> { + let end = end.min(self.len); + if start >= end { + return Ok(Vec::new()); + } + + let count = (end - start) as usize; + let byte_pos = start * Self::RECORD_SIZE as u64; + self.file.seek(SeekFrom::Start(byte_pos))?; + + let mut result = Vec::with_capacity(count); + let mut buf = [0u8; Self::RECORD_SIZE]; + + for _ in 0..count { + self.file.read_exact(&mut buf)?; + let offset = u64::from_le_bytes(buf[..8].try_into().unwrap()); + let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap()); + result.push(ChangesetOffset::new(offset, num_changes)); + } + + Ok(result) + } + + /// Returns the number of valid records. + pub fn len(&self) -> u64 { + self.len + } + + /// Returns true if there are no records. + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_write_and_read() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + // Write + { + let mut writer = ChangesetOffsetWriter::new(&path).unwrap(); + writer.append(&ChangesetOffset::new(0, 5)).unwrap(); + writer.append(&ChangesetOffset::new(5, 3)).unwrap(); + writer.append(&ChangesetOffset::new(8, 10)).unwrap(); + writer.sync().unwrap(); + assert_eq!(writer.len(), 3); + } + + // Read + { + let mut reader = ChangesetOffsetReader::new(&path).unwrap(); + assert_eq!(reader.len(), 3); + + let entry = reader.get(0).unwrap().unwrap(); + assert_eq!(entry.offset(), 0); + assert_eq!(entry.num_changes(), 5); + + let entry = reader.get(1).unwrap().unwrap(); + assert_eq!(entry.offset(), 5); + assert_eq!(entry.num_changes(), 3); + + let entry = reader.get(2).unwrap().unwrap(); + assert_eq!(entry.offset(), 8); + assert_eq!(entry.num_changes(), 10); + + assert!(reader.get(3).unwrap().is_none()); + } + } + + #[test] + fn test_truncate() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + let mut writer = ChangesetOffsetWriter::new(&path).unwrap(); + writer.append(&ChangesetOffset::new(0, 1)).unwrap(); + writer.append(&ChangesetOffset::new(1, 2)).unwrap(); + writer.append(&ChangesetOffset::new(3, 3)).unwrap(); + writer.sync().unwrap(); + + writer.truncate(2).unwrap(); + assert_eq!(writer.len(), 2); + + let mut reader = ChangesetOffsetReader::new(&path).unwrap(); + assert_eq!(reader.len(), 2); + assert!(reader.get(2).unwrap().is_none()); + } +} diff --git a/crates/static-file/types/src/lib.rs b/crates/static-file/types/src/lib.rs index 02998cca77d..5a37e826637 100644 --- a/crates/static-file/types/src/lib.rs +++ b/crates/static-file/types/src/lib.rs @@ -15,11 +15,18 @@ mod compression; mod event; mod segment; +#[cfg(feature = "std")] +mod changeset_offsets; +#[cfg(feature = "std")] +pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter}; + use alloy_primitives::BlockNumber; pub use compression::Compression; use core::ops::RangeInclusive; pub use event::StaticFileProducerEvent; -pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment}; +pub use segment::{ + ChangesetOffset, SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment, +}; /// Map keyed by [`StaticFileSegment`]. pub type StaticFileMap = alloc::boxed::Box>; diff --git a/crates/static-file/types/src/segment.rs b/crates/static-file/types/src/segment.rs index 791172be70c..d26160d6110 100644 --- a/crates/static-file/types/src/segment.rs +++ b/crates/static-file/types/src/segment.rs @@ -211,6 +211,11 @@ pub struct ChangesetOffset { } impl ChangesetOffset { + /// Creates a new changeset offset. + pub const fn new(offset: u64, num_changes: u64) -> Self { + Self { offset, num_changes } + } + /// Returns the start offset for the row for this block pub const fn offset(&self) -> u64 { self.offset @@ -227,6 +232,53 @@ impl ChangesetOffset { } } +/// Metadata for changeset offsets stored in a separate sidecar file. +/// This replaces the inline `Vec` to enable incremental writes. +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Default)] +#[allow(dead_code)] // Used in upcoming commits +pub struct ChangesetOffsetsMeta { + /// Number of valid entries (blocks) in the sidecar file. + /// This is the authoritative count - any bytes beyond `len * 16` in the file are ignored. + len: u64, + /// Format version for future compatibility. + version: u8, +} + +impl ChangesetOffsetsMeta { + /// Size in bytes of each offset record in the sidecar file. + pub const RECORD_SIZE: usize = 16; + + /// Creates new metadata with the given length. + pub const fn new(len: u64) -> Self { + Self { len, version: 1 } + } + + /// Returns the number of valid changeset offset entries. + pub const fn len(&self) -> u64 { + self.len + } + + /// Returns true if there are no entries. + pub const fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Increment the length by the given amount. + pub fn increment(&mut self, count: u64) { + self.len += count; + } + + /// Set the length to a new value (for prune operations). + pub fn set_len(&mut self, new_len: u64) { + self.len = new_len; + } + + /// Returns the expected file size in bytes. + pub const fn expected_file_size(&self) -> u64 { + self.len * Self::RECORD_SIZE as u64 + } +} + /// A segment header that contains information common to all segments. Used for storage. #[derive(Debug, Eq, PartialEq, Hash, Clone)] pub struct SegmentHeader { diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index b9241321261..b0f7690f39d 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -59,6 +59,9 @@ const INDEX_FILE_EXTENSION: &str = "idx"; const OFFSETS_FILE_EXTENSION: &str = "off"; /// The file extension used for configuration files. pub const CONFIG_FILE_EXTENSION: &str = "conf"; +/// The file extension used for changeset offset sidecar files. +#[allow(dead_code)] // Used in upcoming integration +pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff"; /// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a /// memory-mapped file. diff --git a/docs/design/incremental-changeset-offsets.md b/docs/design/incremental-changeset-offsets.md new file mode 100644 index 00000000000..2f5a48d541d --- /dev/null +++ b/docs/design/incremental-changeset-offsets.md @@ -0,0 +1,171 @@ +# Incremental Changeset Offset Storage + +## Problem + +Currently, `SegmentHeader.changeset_offsets` stores a `Vec` that gets fully serialized and written to the NippyJar config file on **every commit**: + +```rust +pub struct SegmentHeader { + // ... + changeset_offsets: Option>, // 16 bytes per block +} +``` + +For a segment with 500k blocks, this means **~8MB rewritten on every commit** - even when only appending a single block. + +### Current Write Path +``` +commit() + → NippyJarWriter::commit() + → sync_all() // flush data + row offsets + → finalize() + → freeze_config() // atomic_write_file of ENTIRE NippyJar config + → bincode serialize entire SegmentHeader (including Vec) +``` + +## Solution + +Move changeset offsets to a **separate append-only sidecar file** with fixed-width records, keeping only a small descriptor in `SegmentHeader`. + +### New Data Layout + +``` +segment_xxx.nippy # existing data file +segment_xxx.off # existing row offsets +segment_xxx.conf # config (now smaller - no Vec) +segment_xxx.csoff # NEW: changeset offsets sidecar (fixed 16-byte records) +``` + +### New SegmentHeader Structure + +```rust +pub struct SegmentHeader { + expected_block_range: SegmentRangeInclusive, + block_range: Option, + tx_range: Option, + segment: StaticFileSegment, + // REMOVED: changeset_offsets: Option> + // NEW: metadata only + changeset_offsets_meta: Option, +} + +pub struct ChangesetOffsetsMeta { + /// Number of valid entries (blocks) in the sidecar file + len: u64, + /// Format version for future compatibility + version: u8, +} +``` + +### Sidecar File Format + +Fixed-width binary records (16 bytes each): +``` +[offset: u64 LE][num_changes: u64 LE] // block N +[offset: u64 LE][num_changes: u64 LE] // block N+1 +... +``` + +Random access: `byte_position = block_index * 16` + +## New Commit Protocol + +### Append Path (adding blocks) + +``` +1. Write data rows to .nippy +2. Write row offsets to .off +3. Append new ChangesetOffset records to .csoff +4. fdatasync(.csoff) // ensure offsets durable +5. Update SegmentHeader.changeset_offsets_meta.len += N +6. atomic_write_file(.conf) // commit header last +``` + +**Crash safety**: If crash occurs: +- After step 3 but before 6: extra bytes in .csoff ignored (header has old len) +- After step 6: all data consistent + +### Prune Path (removing blocks from tail) + +``` +1. Update SegmentHeader.changeset_offsets_meta.len = new_len +2. atomic_write_file(.conf) // commit header +3. (Optional) Truncate .csoff to len * 16 bytes +``` + +**Crash safety**: Header is source of truth for valid length. Garbage beyond `len * 16` is ignored. + +## Implementation Plan + +### Phase 1: Add sidecar infrastructure +- [ ] Add `ChangesetOffsetsMeta` struct in `segment.rs` +- [ ] Add `.csoff` file path helpers in `NippyJar` +- [ ] Add `ChangesetOffsetWriter` for append-only writes +- [ ] Add `ChangesetOffsetReader` for O(1) lookups + +### Phase 2: Modify SegmentHeader +- [ ] Replace `changeset_offsets: Option>` with `changeset_offsets_meta` +- [ ] Update serialize/deserialize with backwards compatibility +- [ ] Migrate existing data on open (one-time conversion) + +### Phase 3: Update write path +- [ ] Modify `StaticFileProviderRW::commit()` to use new protocol +- [ ] Ensure fdatasync ordering: data → offsets → header +- [ ] Update prune logic to shrink `len` instead of rewriting + +### Phase 4: Update read path +- [ ] Modify `changeset_offsets()` to read from sidecar +- [ ] Cache hot entries in memory if needed +- [ ] Add mmap support for large segments + +## API Changes + +### Before +```rust +impl SegmentHeader { + pub fn changeset_offsets(&self) -> Option<&Vec>; +} +``` + +### After +```rust +impl SegmentHeader { + pub fn changeset_offsets_meta(&self) -> Option<&ChangesetOffsetsMeta>; +} + +// New reader for lookups +pub struct ChangesetOffsetReader { ... } + +impl ChangesetOffsetReader { + pub fn get(&self, block_index: u64) -> Option; + pub fn get_range(&self, range: Range) -> Vec; +} +``` + +## Performance Impact + +| Operation | Before | After | +|-----------|--------|-------| +| Append 1 block | O(total_blocks) write | O(1) write (16 bytes) | +| Commit overhead | ~8MB for 500k blocks | ~100 bytes (header only) | +| Random lookup | O(1) from memory | O(1) from mmap/pread | +| Prune | O(remaining_blocks) | O(1) (len update only) | + +## Migration Strategy + +On segment open, if `changeset_offsets` (old Vec) is present and `changeset_offsets_meta` is absent: +1. Write Vec contents to new `.csoff` file +2. Set `changeset_offsets_meta.len = vec.len()` +3. Clear old `changeset_offsets` field +4. Commit header + +This is a one-time migration per segment file. + +## Risks & Mitigations + +| Risk | Mitigation | +|------|------------| +| Commit ordering bug (header before offsets) | Enforce fdatasync on .csoff before header commit | +| Crash during append leaves partial record | Only advance `len` after fdatasync; validate `file_size >= len * 16` on open | +| Prune from middle (not just tail) | Currently unsupported; would need tombstones or compaction | +| Platform fsync semantics | Fsync directory after rename on Linux for full durability |