Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions crates/static-file/types/src/changeset_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
use crate::ChangesetOffset;
use std::{
fs::{File, OpenOptions},
io::{self, Read, Seek, SeekFrom, Write},
io::{self, Write},
os::unix::fs::FileExt,
path::Path,
};

Expand Down Expand Up @@ -177,16 +178,14 @@ impl ChangesetOffsetReader {

/// Reads a single changeset offset by block index.
/// Returns None if index is out of bounds.
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
pub fn get(&self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
if block_index >= self.len {
return Ok(None);
}

let byte_pos = block_index * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;

let mut buf = [0u8; Self::RECORD_SIZE];
self.file.read_exact(&mut buf)?;
self.file.read_exact_at(&mut buf, byte_pos)?;

let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
Expand All @@ -195,21 +194,21 @@ impl ChangesetOffsetReader {
}

/// Reads a range of changeset offsets.
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
pub fn get_range(&self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
let end = end.min(self.len);
if start >= end {
return Ok(Vec::new());
}

let count = (end - start) as usize;
let byte_pos = start * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;

let mut result = Vec::with_capacity(count);
let mut buf = [0u8; Self::RECORD_SIZE];

for _ in 0..count {
self.file.read_exact(&mut buf)?;
for i in 0..count {
let pos = byte_pos + (i as u64) * Self::RECORD_SIZE as u64;
self.file.read_exact_at(&mut buf, pos)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
result.push(ChangesetOffset::new(offset, num_changes));
Expand Down Expand Up @@ -251,7 +250,7 @@ mod tests {

// Read
{
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
assert_eq!(reader.len(), 3);

let entry = reader.get(0).unwrap().unwrap();
Expand Down Expand Up @@ -284,7 +283,7 @@ mod tests {
writer.truncate(2).unwrap();
assert_eq!(writer.len(), 2);

let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
assert_eq!(reader.len(), 2);
assert!(reader.get(2).unwrap().is_none());
}
Expand Down Expand Up @@ -317,7 +316,7 @@ mod tests {
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);

// Verify the complete record is readable
let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
let reader = ChangesetOffsetReader::new(&path, 1).unwrap();
assert_eq!(reader.len(), 1);
let entry = reader.get(0).unwrap().unwrap();
assert_eq!(entry.offset(), 100);
Expand All @@ -340,7 +339,7 @@ mod tests {
}

// Open with len=2, ignoring the 3rd record
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
let reader = ChangesetOffsetReader::new(&path, 2).unwrap();
assert_eq!(reader.len(), 2);

// First two records should be readable
Expand Down Expand Up @@ -397,7 +396,7 @@ mod tests {

// Verify the records are correct
{
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
let reader = ChangesetOffsetReader::new(&path, 3).unwrap();
assert_eq!(reader.len(), 3);

let entry0 = reader.get(0).unwrap().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/static-file/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ mod compression;
mod event;
mod segment;

#[cfg(feature = "std")]
#[cfg(all(feature = "std", unix))]
mod changeset_offsets;
#[cfg(feature = "std")]
#[cfg(all(feature = "std", unix))]
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};

use alloy_primitives::BlockNumber;
Expand Down
27 changes: 10 additions & 17 deletions crates/storage/provider/src/providers/static_file/jar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_db::static_file::{
use reth_db_api::table::{Decompress, Value};
use reth_node_types::NodePrimitives;
use reth_primitives_traits::{SealedHeader, SignedTransaction};
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
use reth_static_file_types::ChangesetOffset;
use reth_storage_api::range_size_hint;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
Expand Down Expand Up @@ -111,15 +111,11 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
return Ok(None);
};

let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
if let Some(reader) = self.jar.value().csoff_reader() {
reader.get(index).map_err(ProviderError::other)
} else {
Ok(None)
}

let len = header.changeset_offsets_len();
let mut reader =
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
reader.get(index).map_err(ProviderError::other)
}

/// Reads all changeset offsets from the sidecar file.
Expand All @@ -138,15 +134,12 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
return Ok(Some(Vec::new()));
}

let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
if let Some(reader) = self.jar.value().csoff_reader() {
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
} else {
Ok(None)
}

let mut reader =
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
}
}

Expand Down
25 changes: 22 additions & 3 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ mod metrics;
mod writer_tests;

use reth_nippy_jar::NippyJar;
use reth_static_file_types::{SegmentHeader, StaticFileSegment};
use reth_static_file_types::{ChangesetOffsetReader, SegmentHeader, StaticFileSegment};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{ops::Deref, sync::Arc};
use std::{io, ops::Deref, sync::Arc};

/// Alias type for each specific `NippyJar`.
type LoadedJarRef<'a> =
Expand All @@ -29,14 +29,28 @@ type LoadedJarRef<'a> =
pub struct LoadedJar {
jar: NippyJar<SegmentHeader>,
mmap_handle: Arc<reth_nippy_jar::DataReader>,
csoff_reader: Option<ChangesetOffsetReader>,
}

impl LoadedJar {
fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
match jar.open_data_reader() {
Ok(data_reader) => {
let mmap_handle = Arc::new(data_reader);
Ok(Self { jar, mmap_handle })

let csoff_reader = if jar.user_header().segment().is_change_based() {
let csoff_path = jar.data_path().with_extension("csoff");
let len = jar.user_header().changeset_offsets_len();
match ChangesetOffsetReader::new(&csoff_path, len) {
Ok(reader) => Some(reader),
Err(err) if err.kind() == io::ErrorKind::NotFound && len == 0 => None,
Err(err) => return Err(ProviderError::other(err)),
}
} else {
None
};

Ok(Self { jar, mmap_handle, csoff_reader })
}
Err(e) => Err(ProviderError::other(e)),
}
Expand All @@ -55,6 +69,11 @@ impl LoadedJar {
fn size(&self) -> usize {
self.mmap_handle.size() + self.mmap_handle.offsets_size()
}

/// Returns a reference to the cached changeset offset reader.
const fn csoff_reader(&self) -> Option<&ChangesetOffsetReader> {
self.csoff_reader.as_ref()
}
}

impl Deref for LoadedJar {
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {

// Step 2: Validate sidecar offsets against actual NippyJar state
let valid_blocks = if actual_sidecar_blocks > 0 {
let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
.map_err(ProviderError::other)?;

// Find last block where offset + num_changes <= actual_nippy_rows
Expand Down Expand Up @@ -896,7 +896,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
// Read offset for the block after last_block from sidecar.
// Use committed length from header, ignoring any uncommitted records
// that may exist in the file after a crash.
let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
.map_err(ProviderError::other)?;
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
next_offset.offset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ mod tests {
assert_eq!(get_header_block_count(&provider, 0), 5);

// Verify offsets are correct
let mut reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
let reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();

let o0 = reader.get(0).unwrap().unwrap();
assert_eq!(o0.offset(), 0);
Expand Down
Loading