From f844e5eb45110003198cd09d5d534aa9e53f2854 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Wed, 4 Mar 2026 22:28:17 +0800 Subject: [PATCH 01/11] async writable --- src/db/car/forest/index/mod.rs | 72 +++++++++++++++------------------- 1 file changed, 31 insertions(+), 41 deletions(-) diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 169ec68442f3..84a3d9783213 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -66,7 +66,7 @@ #[cfg_vis(feature = "benchmark-private", pub)] use self::util::NonMaximalU64; -use byteorder::{LittleEndian, ReadBytesExt as _, WriteBytesExt as _}; +use byteorder::{LittleEndian, ReadBytesExt as _}; use cfg_vis::cfg_vis; use cid::Cid; use itertools::Itertools as _; @@ -74,7 +74,7 @@ use positioned_io::{ReadAt, Size}; use smallvec::{SmallVec, smallvec}; use std::{ cmp, - io::{self, Read, Write}, + io::{self, Read}, iter, num::NonZeroUsize, pin::pin, @@ -395,30 +395,19 @@ impl Writer { .chain(iter::once(Slot::Empty)) } pub async fn write_into(self, writer: impl AsyncWrite) -> io::Result<()> { - let mut buf = vec![]; let mut writer = pin!(writer); let Self { version, header, slots, } = self; - /// Bridge between our sync [`Writeable`] trait, and async writing code - async fn write_via_buf( - buf: &mut Vec, - writer: impl AsyncWrite, - data: impl Writeable, - ) -> io::Result<()> { - buf.clear(); - data.write_to(&mut *buf)?; - pin!(writer).write_all(buf).await - } - write_via_buf(&mut buf, &mut writer, version).await?; - write_via_buf(&mut buf, &mut writer, &header).await?; + version.write_to(&mut writer).await?; + header.write_to(&mut writer).await?; for slot in Self::slots( header.initial_buckets.try_into().unwrap(), slots.iter().copied(), ) { - write_via_buf(&mut buf, &mut writer, slot).await?; + slot.write_to(&mut writer).await?; } Ok(()) } @@ -519,9 +508,9 @@ impl Readable for Version { } } -impl Writeable for Version { - fn write_to(&self, mut writer: impl Write) -> io::Result<()> { - writer.write_u64::(*self as u64) +impl Writable for Version { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { + writer.write_u64_le(*self as u64).await } const LEN: u64 = std::mem::size_of::() as u64; } @@ -545,9 +534,9 @@ impl Readable for Slot { } } -impl Writeable for Slot { - fn write_to(&self, writer: impl Write) -> io::Result<()> { - self.into_raw().write_to(writer) +impl Writable for Slot { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { + self.into_raw().write_to(writer).await } const LEN: u64 = RawSlot::LEN; } @@ -564,11 +553,11 @@ impl Readable for RawSlot { } } -impl Writeable for RawSlot { - fn write_to(&self, mut writer: impl Write) -> io::Result<()> { +impl Writable for RawSlot { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { let Self { hash, frame_offset } = *self; - writer.write_u64::(hash)?; - writer.write_u64::(frame_offset)?; + writer.write_u64_le(hash).await?; + writer.write_u64_le(frame_offset).await?; Ok(()) } const LEN: u64 = std::mem::size_of::() as u64 * 2; @@ -587,16 +576,16 @@ impl Readable for V1Header { } } -impl Writeable for V1Header { - fn write_to(&self, mut writer: impl Write) -> io::Result<()> { +impl Writable for V1Header { + async fn write_to(&self, writer: &mut W) -> io::Result<()> { let Self { longest_distance, collisions, initial_buckets, } = *self; - writer.write_u64::(longest_distance)?; - writer.write_u64::(collisions)?; - writer.write_u64::(initial_buckets)?; + writer.write_u64_le(longest_distance).await?; + writer.write_u64_le(collisions).await?; + writer.write_u64_le(initial_buckets).await?; Ok(()) } const LEN: u64 = std::mem::size_of::() as u64 * 3; @@ -608,26 +597,26 @@ trait Readable { Self: Sized; } -trait Writeable { +trait Writable { /// Must only return [`Err(_)`] if the underlying io fails. - fn write_to(&self, writer: impl Write) -> io::Result<()>; - /// The number of bytes that will be written on a call to [`Writeable::write_to`]. + async fn write_to(&self, writer: &mut W) -> io::Result<()>; + /// The number of bytes that will be written on a call to [`Writable::write_to`]. /// /// Implementations may panic if this is incorrect. const LEN: u64; } /// Useful for exhaustiveness checking -fn written_len(_: T) -> u64 { +fn written_len(_: T) -> u64 { T::LEN } -impl Writeable for &T +impl Writable for &T where - T: Writeable, + T: Writable, { - fn write_to(&self, writer: impl Write) -> io::Result<()> { - T::write_to(self, writer) + async fn write_to(&self, writer: &mut W) -> io::Result<()> { + T::write_to(self, writer).await } const LEN: u64 = T::LEN; } @@ -767,8 +756,9 @@ mod tests { } #[track_caller] - fn round_trip(original: &T) { - let serialized = write_to_vec(|v| original.write_to(v)); + fn round_trip(original: &T) { + let serialized = + write_to_vec(|v| tokio::runtime::Runtime::new()?.block_on(original.write_to(v))); assert_eq!( serialized.len(), usize::try_from(written_len(original)).unwrap() From 2ad657f83bf080acaf9c38162e3e87b10d4b7cc6 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 01:29:35 +0800 Subject: [PATCH 02/11] fix: support large index in forest car --- src/db/car/any.rs | 10 +- src/db/car/forest.rs | 53 +++++---- src/db/car/forest/index/mod.rs | 161 +++++++++++++++++++++------- src/db/car/many.rs | 2 +- src/tool/subcommands/archive_cmd.rs | 9 +- 5 files changed, 157 insertions(+), 78 deletions(-) diff --git a/src/db/car/any.rs b/src/db/car/any.rs index a3347a214f45..ccd5174d5a36 100644 --- a/src/db/car/any.rs +++ b/src/db/car/any.rs @@ -91,12 +91,12 @@ impl AnyCar { } /// Discard reader type and replace with dynamic trait object. - pub fn into_dyn(self) -> AnyCar> { - match self { - AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()), + pub fn into_dyn(self) -> Result>> { + Ok(match self { + AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()?), AnyCar::Plain(p) => AnyCar::Plain(p.into_dyn()), AnyCar::Memory(m) => AnyCar::Memory(m), - } + }) } /// Set the z-frame cache of the inner CAR reader. @@ -109,7 +109,7 @@ impl AnyCar { } /// Get the index size in bytes - pub fn index_size_bytes(&self) -> Option { + pub fn index_size_bytes(&self) -> Option { match self { Self::Forest(car) => Some(car.index_size_bytes()), _ => None, diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 74fd27780ef6..45dbb51c1a7d 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -50,12 +50,11 @@ use super::{CacheKey, ZstdFrameCache}; use crate::blocks::{Tipset, TipsetKey}; use crate::chain::FilecoinSnapshotMetadata; use crate::db::car::RandomAccessFileReader; -use crate::db::car::plain::write_skip_frame_header_async; +use crate::db::car::forest::index::ZstdSkipFramesEncodedDataReader; use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes}; use crate::utils::encoding::from_slice_with_fallback; use crate::utils::get_size::CidWrapper; use crate::utils::io::EitherMmapOrRandomAccessFile; -use byteorder::LittleEndian; use bytes::{BufMut as _, Bytes, BytesMut, buf::Writer}; use cid::Cid; use futures::{Stream, TryStreamExt as _}; @@ -63,7 +62,7 @@ use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore as _; use integer_encoding::VarIntReader; use nunny::Vec as NonEmpty; -use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor}; +use positioned_io::{Cursor, ReadAt, SizeCursor}; use std::io::{Seek, SeekFrom}; use std::path::Path; use std::sync::{Arc, OnceLock}; @@ -95,8 +94,8 @@ pub struct ForestCar { // Multiple `ForestCar` structures may share the same cache. The cache key is used to identify // the origin of a cached z-frame. cache_key: CacheKey, - indexed: index::Reader>, - index_size_bytes: u32, + indexed: index::Reader>>, + index_size_bytes: u64, frame_cache: Arc, header: CarV1Header, metadata: OnceLock>, @@ -104,15 +103,10 @@ pub struct ForestCar { impl ForestCar { pub fn new(reader: ReaderT) -> io::Result> { - let (header, footer) = Self::validate_car(&reader)?; - let index_size_bytes = reader.read_u32_at::( - footer.index.saturating_sub(std::mem::size_of::() as _), - )?; - let indexed = index::Reader::new(positioned_io::Slice::new( - reader, - footer.index, - Some(index_size_bytes as u64), - ))?; + let (header, index_start_pos, index_size_bytes) = Self::validate_car(&reader)?; + let indexed = index::Reader::new(index::ZstdSkipFramesEncodedDataReader::new( + positioned_io::Slice::new(reader, index_start_pos, Some(index_size_bytes)), + )?)?; Ok(ForestCar { cache_key: 0, indexed, @@ -141,9 +135,10 @@ impl ForestCar { Self::validate_car(reader).is_ok() } - fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, ForestCarFooter)> { + fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, u64, u64)> { let mut cursor = SizeCursor::new(&reader); cursor.seek(SeekFrom::End(-(ForestCarFooter::SIZE as i64)))?; + let index_end_pos = cursor.position(); let mut footer_buffer = [0; ForestCarFooter::SIZE]; cursor.read_exact(&mut footer_buffer)?; @@ -153,6 +148,7 @@ impl ForestCar { "not recognizable as a `{FOREST_CAR_FILE_EXTENSION}` file" )) })?; + let index_start_pos = footer.index - ZSTD_SKIP_FRAME_LEN; let cursor = Cursor::new_pos(&reader, 0); let mut header_zstd_frame = decode_zstd_single_frame(cursor)?.into(); @@ -162,7 +158,7 @@ impl ForestCar { let header = from_slice_with_fallback::(&block_frame) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok((header, footer)) + Ok((header, index_start_pos, index_end_pos - index_start_pos)) } pub fn head_tipset_key(&self) -> &NonEmpty { @@ -175,7 +171,7 @@ impl ForestCar { } } - pub fn index_size_bytes(&self) -> u32 { + pub fn index_size_bytes(&self) -> u64 { self.index_size_bytes } @@ -187,22 +183,22 @@ impl ForestCar { Tipset::load_required(self, &self.heaviest_tipset_key()) } - pub fn into_dyn(self) -> ForestCar> { - ForestCar { + pub fn into_dyn(self) -> io::Result>> { + Ok(ForestCar { cache_key: self.cache_key, indexed: self.indexed.map(|slice| { - let offset = slice.offset(); - positioned_io::Slice::new( - Box::new(slice.into_inner()) as Box, + let offset = slice.inner().offset(); + ZstdSkipFramesEncodedDataReader::new(positioned_io::Slice::new( + Box::new(slice.into_inner().into_inner()) as Box, offset, None, - ) - }), + )) + })?, index_size_bytes: self.index_size_bytes, frame_cache: self.frame_cache, header: self.header, metadata: self.metadata, - } + }) } pub fn with_cache(self, cache: Arc, key: CacheKey) -> Self { @@ -217,7 +213,7 @@ impl ForestCar { pub fn get_reader(&self, k: Cid) -> anyhow::Result> { for position in self.indexed.get(k)? { // escape the positioned_io::Slice - let entire_file = self.indexed.reader().get_ref(); + let entire_file = self.indexed.reader().inner().get_ref(); // `position` is the frame start offset. let cursor = Cursor::new_pos(entire_file, position); let mut decoder = zstd::Decoder::new(cursor)?.single_frame(); @@ -259,7 +255,7 @@ where Some(None) => {} None => { // Decode entire frame into memory, "position" arg is the frame start offset. - let entire_file = indexed.reader().get_ref(); // escape the positioned_io::Slice + let entire_file = indexed.reader().inner().get_ref(); // escape the positioned_io::Slice let cursor = Cursor::new_pos(entire_file, position); let mut zstd_frame = decode_zstd_single_frame(cursor)?.into(); // Parse all key-value pairs and insert them into a map @@ -332,8 +328,7 @@ impl Encoder { // Create index let writer = builder.into_writer(); - write_skip_frame_header_async(&mut sink, writer.written_len().try_into().unwrap()).await?; - writer.write_into(&mut sink).await?; + writer.write_zstd_skip_frames_into(&mut sink).await?; // Write ForestCAR.zst footer, it's a valid ZSTD skip-frame let footer = ForestCarFooter { diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 84a3d9783213..3fa1964bacb7 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -64,13 +64,15 @@ //! ├──────────────┤ <- Zstd skip frame header) //! ``` +use crate::db::car::plain::write_skip_frame_header_async; + #[cfg_vis(feature = "benchmark-private", pub)] use self::util::NonMaximalU64; use byteorder::{LittleEndian, ReadBytesExt as _}; use cfg_vis::cfg_vis; use cid::Cid; -use itertools::Itertools as _; -use positioned_io::{ReadAt, Size}; +use itertools::Itertools; +use positioned_io::{ReadAt, ReadBytesAtExt as _, Size}; use smallvec::{SmallVec, smallvec}; use std::{ cmp, @@ -180,11 +182,88 @@ where /// Replace the inner reader. /// It MUST point to the same underlying IO, else future calls to `get` /// will be incorrect. - pub fn map(self, f: impl FnOnce(R) -> T) -> Reader { - Reader { - inner: f(self.inner), + pub fn map(self, f: impl FnOnce(R) -> io::Result) -> io::Result> { + Ok(Reader { + inner: f(self.inner)?, table_offset: self.table_offset, header: self.header, + }) + } +} + +pub struct ZstdSkipFramesEncodedDataReader { + reader: R, + skip_frame_header_offsets: Vec, +} + +impl ZstdSkipFramesEncodedDataReader { + pub fn new(reader: R) -> io::Result { + let mut offset = 0; + let mut skip_frame_header_offsets = vec![]; + while let Ok(len) = reader.read_u32_at::(offset + 4) { + skip_frame_header_offsets.push(offset); + offset += 8 + len as u64; + } + Ok(Self { + reader, + skip_frame_header_offsets, + }) + } + + pub fn inner(&self) -> &R { + &self.reader + } + + pub fn into_inner(self) -> R { + self.reader + } +} + +impl Size for ZstdSkipFramesEncodedDataReader { + fn size(&self) -> io::Result> { + if let Some(size) = self.reader.size()? { + let total_header_size = (self.skip_frame_header_offsets.len() * 8) as u64; + if size >= total_header_size { + Ok(Some(size - total_header_size)) + } else { + Err(io::Error::other(format!( + "unexpected error: size({size}) < total_header_size({total_header_size})" + ))) + } + } else { + Ok(None) + } + } +} + +impl ReadAt for ZstdSkipFramesEncodedDataReader +where + R: ReadAt, +{ + fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result { + let mut amended_pos = pos; + let mut next_frame_pos = None; + for &p in self.skip_frame_header_offsets.iter() { + if p <= amended_pos { + amended_pos += 8; + } else { + next_frame_pos = Some(p); + break; + } + } + if let Some(next_frame_pos) = next_frame_pos + && amended_pos + buf.len() as u64 > next_frame_pos + { + let max_read_len = (next_frame_pos - amended_pos) as usize; + if max_read_len < buf.len() { + #[allow(clippy::indexing_slicing)] + Ok(self.reader.read_at(amended_pos, &mut buf[..max_read_len])? + + self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?) + } else { + self.reader.read_at(amended_pos, buf) + } + } else { + self.reader.read_at(amended_pos, buf) } } } @@ -358,27 +437,6 @@ pub struct Writer { } impl Writer { - pub fn written_len(&self) -> u64 { - let Self { - version, - header, - slots, - } = self; - written_len(version) - + written_len(header) - // this logic must be kept in sync with [`slots`], below - + cmp::max( - u64::try_from( - slots - .iter() - .map(|(pre, _)| *pre + 1 /* occupied */) - .sum::() - + 1, /* trailing */ - ) - .unwrap(), - header.initial_buckets + 1, /* trailing */ - ) * Slot::LEN - } fn slots( min_slots: usize, slots: impl IntoIterator, @@ -394,20 +452,40 @@ impl Writer { .pad_using(min_slots, |_ix| Slot::Empty) .chain(iter::once(Slot::Empty)) } - pub async fn write_into(self, writer: impl AsyncWrite) -> io::Result<()> { + pub async fn write_zstd_skip_frames_into(self, writer: impl AsyncWrite) -> io::Result<()> { + // write every 128MiB slots to a skip frame + const CHUNK_FRAME_DATA_MAX_BYTES: usize = 128 * 1024; + self.write_zstd_skip_frames_into_inner(writer, CHUNK_FRAME_DATA_MAX_BYTES) + .await + } + async fn write_zstd_skip_frames_into_inner( + self, + writer: impl AsyncWrite, + skip_frame_data_max_bytes: usize, + ) -> io::Result<()> { let mut writer = pin!(writer); let Self { version, header, slots, } = self; + // write version and header to a skip frame + let frame_data_len: u32 = (written_len(&version) + written_len(&header)) as u32; + write_skip_frame_header_async(&mut writer, frame_data_len).await?; version.write_to(&mut writer).await?; header.write_to(&mut writer).await?; + + let mut buf = Vec::with_capacity(skip_frame_data_max_bytes); for slot in Self::slots( header.initial_buckets.try_into().unwrap(), slots.iter().copied(), ) { - slot.write_to(&mut writer).await?; + slot.write_to(&mut buf).await?; + if buf.len() >= skip_frame_data_max_bytes { + write_skip_frame_header_async(&mut writer, buf.len() as u32).await?; + writer.write_all(&buf).await?; + buf.clear(); + } } Ok(()) } @@ -607,7 +685,7 @@ trait Writable { } /// Useful for exhaustiveness checking -fn written_len(_: T) -> u64 { +fn written_len(_: &T) -> u64 { T::LEN } @@ -660,18 +738,21 @@ mod tests { /// [`Reader`] should behave like a [`HashMap`], with a caveat for collisions. fn do_hashmap_of_cids(reference: HashMap>) { - let subject = Reader::new(write_to_vec(|v| { + let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { let writer = Builder::from_iter(reference.clone().into_iter().flat_map(|(hash, offsets)| { offsets.into_iter().map(move |offset| (hash, offset)) })) .into_writer(); - let expected_len = writer.written_len(); - block_on(writer.write_into(&mut *v))?; - assert_eq!(expected_len as usize, v.len()); + block_on(writer.write_zstd_skip_frames_into(&mut *v))?; Ok(()) })) .unwrap(); + println!( + "skip_frame_header_offsets_len: {}", + r.skip_frame_header_offsets.len() + ); + let subject = Reader::new(r).unwrap(); for (cid, expected) in reference { let actual = subject.get(cid).unwrap().into_iter().collect(); assert!(expected.is_subset(&actual)); // collisions @@ -687,18 +768,21 @@ mod tests { /// /// Additionally checks [`Reader::iter`] fn do_hashmap_of_hashes(reference: HashMap>) { - let subject = Reader::new(write_to_vec(|v| { + let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { let writer = Builder::from_iter(reference.clone().into_iter().flat_map(|(hash, offsets)| { offsets.into_iter().map(move |offset| (hash, offset)) })) .into_writer(); - let expected_len = writer.written_len(); - block_on(writer.write_into(&mut *v))?; - assert_eq!(expected_len as usize, v.len()); + block_on(writer.write_zstd_skip_frames_into(&mut *v))?; Ok(()) })) .unwrap(); + println!( + "skip_frame_header_offsets_len: {}", + r.skip_frame_header_offsets.len() + ); + let subject = Reader::new(r).unwrap(); for (hash, expected) in &reference { let actual = subject.get_by_hash(*hash).unwrap().into_iter().collect(); assert!(expected.is_subset(&actual)) // collisions @@ -757,8 +841,7 @@ mod tests { #[track_caller] fn round_trip(original: &T) { - let serialized = - write_to_vec(|v| tokio::runtime::Runtime::new()?.block_on(original.write_to(v))); + let serialized = write_to_vec(|v| block_on(original.write_to(v))); assert_eq!( serialized.len(), usize::try_from(written_len(original)).unwrap() diff --git a/src/db/car/many.rs b/src/db/car/many.rs index fffa41a63c73..6b53c3ba6747 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -108,7 +108,7 @@ impl ManyCar { read_only.push(WithHeaviestEpoch::new( any_car .with_cache(self.shared_cache.clone(), key) - .into_dyn(), + .into_dyn()?, )?); Ok(()) diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 0660320d12b4..8c418f524c96 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -55,6 +55,7 @@ use dialoguer::{Confirm, theme::ColorfulTheme}; use futures::{StreamExt as _, TryStreamExt as _}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::DAG_CBOR; +use human_repr::HumanCount as _; use indicatif::{ProgressBar, ProgressIterator, ProgressStyle}; use itertools::Itertools; use multihash_derive::MultihashDigest as _; @@ -313,7 +314,7 @@ pub struct ArchiveInfo { events: usize, head: Tipset, snapshot_version: FilecoinSnapshotVersion, - index_size_bytes: Option, + index_size_bytes: Option, } impl std::fmt::Display for ArchiveInfo { @@ -338,7 +339,7 @@ impl std::fmt::Display for ArchiveInfo { write!( f, "Index size: {}", - human_bytes::human_bytes(index_size_bytes) + index_size_bytes.human_count_bytes() )?; } Ok(()) @@ -353,7 +354,7 @@ impl ArchiveInfo { variant: String, heaviest_tipset: Tipset, snapshot_version: FilecoinSnapshotVersion, - index_size_bytes: Option, + index_size_bytes: Option, ) -> anyhow::Result { Self::from_store_with( store, @@ -373,7 +374,7 @@ impl ArchiveInfo { variant: String, heaviest_tipset: Tipset, snapshot_version: FilecoinSnapshotVersion, - index_size_bytes: Option, + index_size_bytes: Option, progress: bool, ) -> anyhow::Result { let head = heaviest_tipset; From 3ba1a6f7d83da756f6daf34738555553c1836aa2 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 02:32:30 +0800 Subject: [PATCH 03/11] backward compatibility --- benches/car-index.rs | 2 +- src/db/car/forest/index/mod.rs | 205 ++++++++++++++++++++++----------- 2 files changed, 136 insertions(+), 71 deletions(-) diff --git a/benches/car-index.rs b/benches/car-index.rs index 4da6d5d8a44d..0393c11c4eb3 100644 --- a/benches/car-index.rs +++ b/benches/car-index.rs @@ -35,7 +35,7 @@ fn bench_car_index(c: &mut Criterion) { block_on( index::Builder::from_iter(reference.clone()) .into_writer() - .write_into(&mut v), + .write_zstd_skip_frames_into(&mut v), ) .unwrap(); index::Reader::new(v).unwrap() diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 46dbd7401519..150440482a8f 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -64,7 +64,7 @@ //! ├──────────────┤ <- Zstd skip frame header) //! ``` -use crate::db::car::plain::write_skip_frame_header_async; +use crate::{db::car::plain::write_skip_frame_header_async, utils::misc::env::is_env_truthy}; #[cfg_vis(feature = "benchmark-private", pub)] use self::util::NonMaximalU64; @@ -437,6 +437,28 @@ pub struct Writer { } impl Writer { + // To keep backward compatibility, remove after NV28 release + fn written_len(&self) -> u64 { + let Self { + version, + header, + slots, + } = self; + written_len(version) + + written_len(header) + // this logic must be kept in sync with [`slots`], below + + cmp::max( + u64::try_from( + slots + .iter() + .map(|(pre, _)| *pre + 1 /* occupied */) + .sum::() + + 1, /* trailing */ + ) + .unwrap(), + header.initial_buckets + 1, /* trailing */ + ) * Slot::LEN + } fn slots( min_slots: usize, slots: impl IntoIterator, @@ -455,13 +477,20 @@ impl Writer { pub async fn write_zstd_skip_frames_into(self, writer: impl AsyncWrite) -> io::Result<()> { // write every 128MiB slots to a skip frame const CHUNK_FRAME_DATA_MAX_BYTES: usize = 128 * 1024; - self.write_zstd_skip_frames_into_inner(writer, CHUNK_FRAME_DATA_MAX_BYTES) - .await + let written_len = self.written_len(); + self.write_zstd_skip_frames_into_inner( + writer, + CHUNK_FRAME_DATA_MAX_BYTES, + u32::try_from(written_len).ok(), + ) + .await } async fn write_zstd_skip_frames_into_inner( self, writer: impl AsyncWrite, skip_frame_data_max_bytes: usize, + // To keep backward compatibility, remove after NV28 release + index_data_len: Option, ) -> io::Result<()> { let mut writer = pin!(writer); let Self { @@ -469,30 +498,43 @@ impl Writer { header, slots, } = self; - // write version and header to a skip frame - let frame_data_len: u32 = (written_len(&version) + written_len(&header)) as u32; - write_skip_frame_header_async(&mut writer, frame_data_len).await?; - version.write_to(&mut writer).await?; - header.write_to(&mut writer).await?; - - let mut buf = Vec::with_capacity(skip_frame_data_max_bytes); - for slot in Self::slots( + let slots = Self::slots( header.initial_buckets.try_into().unwrap(), slots.iter().copied(), - ) { - slot.write_to(&mut buf).await?; - if buf.len() >= skip_frame_data_max_bytes { + ); + if let Some(index_data_len) = index_data_len + && !is_env_truthy("FOREST_CAR_INDEX_USE_MULTIPLE_SKIP_FRAMES") + { + // To keep backward compatibility, remove after NV28 release + write_skip_frame_header_async(&mut writer, index_data_len).await?; + version.write_to(&mut writer).await?; + header.write_to(&mut writer).await?; + for slot in slots { + slot.write_to(&mut writer).await?; + } + } else { + // write version and header to a skip frame + let frame_data_len: u32 = (written_len(&version) + written_len(&header)) as u32; + write_skip_frame_header_async(&mut writer, frame_data_len).await?; + version.write_to(&mut writer).await?; + header.write_to(&mut writer).await?; + + let mut buf = Vec::with_capacity(skip_frame_data_max_bytes); + for slot in slots { + slot.write_to(&mut buf).await?; + if buf.len() >= skip_frame_data_max_bytes { + write_skip_frame_header_async(&mut writer, buf.len() as u32).await?; + writer.write_all(&buf).await?; + buf.clear(); + } + } + + if !buf.is_empty() { write_skip_frame_header_async(&mut writer, buf.len() as u32).await?; writer.write_all(&buf).await?; - buf.clear(); } } - if !buf.is_empty() { - write_skip_frame_header_async(&mut writer, buf.len() as u32).await?; - writer.write_all(&buf).await?; - } - Ok(()) } } @@ -744,24 +786,35 @@ mod tests { /// [`Reader`] should behave like a [`HashMap`], with a caveat for collisions. fn do_hashmap_of_cids(reference: HashMap>) { - let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { - let writer = - Builder::from_iter(reference.clone().into_iter().flat_map(|(hash, offsets)| { - offsets.into_iter().map(move |offset| (hash, offset)) + for multi_index_frame in [false, true] { + let r: ZstdSkipFramesEncodedDataReader> = + ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { + let writer = Builder::from_iter(reference.clone().into_iter().flat_map( + |(hash, offsets)| offsets.into_iter().map(move |offset| (hash, offset)), + )) + .into_writer(); + block_on(async { + if multi_index_frame { + writer + .write_zstd_skip_frames_into_inner(&mut *v, 1024, None) + .await + } else { + writer.write_zstd_skip_frames_into(&mut *v).await + } + })?; + Ok(()) })) - .into_writer(); - block_on(writer.write_zstd_skip_frames_into(&mut *v))?; - Ok(()) - })) - .unwrap(); - println!( - "skip_frame_header_offsets_len: {}", - r.skip_frame_header_offsets.len() - ); - let subject = Reader::new(r).unwrap(); - for (cid, expected) in reference { - let actual = subject.get(cid).unwrap().into_iter().collect(); - assert!(expected.is_subset(&actual)); // collisions + .unwrap(); + if multi_index_frame { + assert!(r.skip_frame_header_offsets.len() > 1); + } else { + assert_eq!(r.skip_frame_header_offsets.len(), 1); + } + let subject = Reader::new(r).unwrap(); + for (&cid, expected) in &reference { + let actual = subject.get(cid).unwrap().into_iter().collect(); + assert!(expected.is_subset(&actual)); // collisions + } } } @@ -774,41 +827,53 @@ mod tests { /// /// Additionally checks [`Reader::iter`] fn do_hashmap_of_hashes(reference: HashMap>) { - let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { - let writer = - Builder::from_iter(reference.clone().into_iter().flat_map(|(hash, offsets)| { - offsets.into_iter().map(move |offset| (hash, offset)) - })) + for multi_index_frame in [false, true] { + let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { + let writer = Builder::from_iter(reference.clone().into_iter().flat_map( + |(hash, offsets)| offsets.into_iter().map(move |offset| (hash, offset)), + )) .into_writer(); - block_on(writer.write_zstd_skip_frames_into(&mut *v))?; - Ok(()) - })) - .unwrap(); - println!( - "skip_frame_header_offsets_len: {}", - r.skip_frame_header_offsets.len() - ); - let subject = Reader::new(r).unwrap(); - for (hash, expected) in &reference { - let actual = subject.get_by_hash(*hash).unwrap().into_iter().collect(); - assert!(expected.is_subset(&actual)) // collisions - } + block_on(async { + if multi_index_frame { + writer + .write_zstd_skip_frames_into_inner(&mut *v, 1024, None) + .await + } else { + writer.write_zstd_skip_frames_into(&mut *v).await + } + })?; + Ok(()) + })) + .unwrap(); + if multi_index_frame { + assert!(r.skip_frame_header_offsets.len() > 1); + } else { + assert_eq!(r.skip_frame_header_offsets.len(), 1); + } + let subject = Reader::new(r).unwrap(); + for (hash, expected) in &reference { + let actual = subject.get_by_hash(*hash).unwrap().into_iter().collect(); + assert!(expected.is_subset(&actual)) // collisions + } - let via_iter = subject - .iter() - .unwrap() - .filter_map(|it| match it.unwrap() { - Slot::Empty => None, - Slot::Occupied(it) => Some(it), - }) - .chunk_by(|it| it.hash) - .into_iter() - .map(|(hash, group)| (hash, HashSet::from_iter(group.map(|it| it.frame_offset)))) - .collect::>(); - assert_eq!( - via_iter, - reference.tap_mut(|it| it.retain(|_, v| !v.is_empty())) - ); + let via_iter = subject + .iter() + .unwrap() + .filter_map(|it| match it.unwrap() { + Slot::Empty => None, + Slot::Occupied(it) => Some(it), + }) + .chunk_by(|it| it.hash) + .into_iter() + .map(|(hash, group)| (hash, HashSet::from_iter(group.map(|it| it.frame_offset)))) + .collect::>(); + assert_eq!( + via_iter, + reference + .clone() + .tap_mut(|it| it.retain(|_, v| !v.is_empty())) + ); + } } quickcheck::quickcheck! { From eb8002003683ad0e78c58ca6bb438ddb2b00698c Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 02:38:40 +0800 Subject: [PATCH 04/11] use const ZSTD_SKIP_FRAME_LEN --- src/db/car/forest.rs | 2 +- src/db/car/forest/index/mod.rs | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 45dbb51c1a7d..6ae9f63c3e6f 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -85,7 +85,7 @@ pub const TEMP_FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst.tmp"; pub const ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER: [u8; 4] = [0x50, 0x2A, 0x4D, 0x18]; pub const DEFAULT_FOREST_CAR_FRAME_SIZE: usize = 8000_usize.next_power_of_two(); pub const DEFAULT_FOREST_CAR_COMPRESSION_LEVEL: u16 = zstd::DEFAULT_COMPRESSION_LEVEL as _; -const ZSTD_SKIP_FRAME_LEN: u64 = 8; +pub const ZSTD_SKIP_FRAME_LEN: u64 = 8; /// `zstd` frame of Forest CAR pub type ForestCarFrame = (Vec, Bytes); diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 150440482a8f..dedbb73350b3 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -64,6 +64,7 @@ //! ├──────────────┤ <- Zstd skip frame header) //! ``` +use super::ZSTD_SKIP_FRAME_LEN; use crate::{db::car::plain::write_skip_frame_header_async, utils::misc::env::is_env_truthy}; #[cfg_vis(feature = "benchmark-private", pub)] @@ -202,7 +203,7 @@ impl ZstdSkipFramesEncodedDataReader { let mut skip_frame_header_offsets = vec![]; while let Ok(len) = reader.read_u32_at::(offset + 4) { skip_frame_header_offsets.push(offset); - offset += 8 + len as u64; + offset += ZSTD_SKIP_FRAME_LEN + len as u64; } Ok(Self { reader, @@ -222,7 +223,8 @@ impl ZstdSkipFramesEncodedDataReader { impl Size for ZstdSkipFramesEncodedDataReader { fn size(&self) -> io::Result> { if let Some(size) = self.reader.size()? { - let total_header_size = (self.skip_frame_header_offsets.len() * 8) as u64; + let total_header_size = + ZSTD_SKIP_FRAME_LEN * self.skip_frame_header_offsets.len() as u64; if size >= total_header_size { Ok(Some(size - total_header_size)) } else { @@ -241,29 +243,31 @@ where R: ReadAt, { fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result { - let mut amended_pos = pos; + let mut adjusted_pos = pos; let mut next_frame_pos = None; for &p in self.skip_frame_header_offsets.iter() { - if p <= amended_pos { - amended_pos += 8; + if p <= adjusted_pos { + adjusted_pos += ZSTD_SKIP_FRAME_LEN; } else { next_frame_pos = Some(p); break; } } if let Some(next_frame_pos) = next_frame_pos - && amended_pos + buf.len() as u64 > next_frame_pos + && adjusted_pos + buf.len() as u64 > next_frame_pos { - let max_read_len = (next_frame_pos - amended_pos) as usize; + let max_read_len = (next_frame_pos - adjusted_pos) as usize; if max_read_len < buf.len() { #[allow(clippy::indexing_slicing)] - Ok(self.reader.read_at(amended_pos, &mut buf[..max_read_len])? + Ok(self + .reader + .read_at(adjusted_pos, &mut buf[..max_read_len])? + self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?) } else { - self.reader.read_at(amended_pos, buf) + self.reader.read_at(adjusted_pos, buf) } } else { - self.reader.read_at(amended_pos, buf) + self.reader.read_at(adjusted_pos, buf) } } } From 4ad5d1d537f786e50c72787c5965555c418488aa Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 02:44:55 +0800 Subject: [PATCH 05/11] simplify logic --- src/db/car/forest/index/mod.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index dedbb73350b3..2fe9e84b4701 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -254,18 +254,14 @@ where } } if let Some(next_frame_pos) = next_frame_pos - && adjusted_pos + buf.len() as u64 > next_frame_pos + && let max_read_len = (next_frame_pos - adjusted_pos) as usize + && max_read_len < buf.len() { - let max_read_len = (next_frame_pos - adjusted_pos) as usize; - if max_read_len < buf.len() { - #[allow(clippy::indexing_slicing)] - Ok(self - .reader - .read_at(adjusted_pos, &mut buf[..max_read_len])? - + self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?) - } else { - self.reader.read_at(adjusted_pos, buf) - } + #[allow(clippy::indexing_slicing)] + Ok(self + .reader + .read_at(adjusted_pos, &mut buf[..max_read_len])? + + self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?) } else { self.reader.read_at(adjusted_pos, buf) } From ce346cbf0cc048b7342904a52dbc133ee6b4bb3e Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 02:56:38 +0800 Subject: [PATCH 06/11] fix AI comments --- src/db/car/forest.rs | 7 ++++++- src/db/car/forest/index/mod.rs | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 6ae9f63c3e6f..b11a5dfbc6ef 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -148,7 +148,12 @@ impl ForestCar { "not recognizable as a `{FOREST_CAR_FILE_EXTENSION}` file" )) })?; - let index_start_pos = footer.index - ZSTD_SKIP_FRAME_LEN; + let index_start_pos = footer.index.checked_sub(ZSTD_SKIP_FRAME_LEN).ok_or_else(|| + invalid_data(format!( + "unexpected error: footer.index({}) < ZSTD_SKIP_FRAME_LEN({ZSTD_SKIP_FRAME_LEN})", + footer.index + )), + )?; let cursor = Cursor::new_pos(&reader, 0); let mut header_zstd_frame = decode_zstd_single_frame(cursor)?.into(); diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 2fe9e84b4701..146184d85430 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -475,8 +475,8 @@ impl Writer { .chain(iter::once(Slot::Empty)) } pub async fn write_zstd_skip_frames_into(self, writer: impl AsyncWrite) -> io::Result<()> { - // write every 128MiB slots to a skip frame - const CHUNK_FRAME_DATA_MAX_BYTES: usize = 128 * 1024; + // write every 512MiB slots to a skip frame + const CHUNK_FRAME_DATA_MAX_BYTES: usize = 512 * 1024 * 1024; let written_len = self.written_len(); self.write_zstd_skip_frames_into_inner( writer, From a351a25c825a0ecc94b7c87961f67ee34315fd72 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 08:04:34 +0800 Subject: [PATCH 07/11] fix --- src/db/car/forest.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index b11a5dfbc6ef..8d8b4ede7298 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -62,7 +62,7 @@ use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore as _; use integer_encoding::VarIntReader; use nunny::Vec as NonEmpty; -use positioned_io::{Cursor, ReadAt, SizeCursor}; +use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor}; use std::io::{Seek, SeekFrom}; use std::path::Path; use std::sync::{Arc, OnceLock}; @@ -193,10 +193,11 @@ impl ForestCar { cache_key: self.cache_key, indexed: self.indexed.map(|slice| { let offset = slice.inner().offset(); + let size = slice.inner().size()?; ZstdSkipFramesEncodedDataReader::new(positioned_io::Slice::new( Box::new(slice.into_inner().into_inner()) as Box, offset, - None, + size, )) })?, index_size_bytes: self.index_size_bytes, From 76209035d7ca77cb7fa2cf208493a43376934306 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 08:13:20 +0800 Subject: [PATCH 08/11] fix --- src/db/car/forest.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 8d8b4ede7298..7548cd7e5672 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -154,6 +154,9 @@ impl ForestCar { footer.index )), )?; + let index_len = index_end_pos.checked_sub(index_start_pos).ok_or_else(|| + invalid_data(format!("unexpected error: index_end_pos({index_end_pos}) < index_start_pos({index_start_pos})")) + )?; let cursor = Cursor::new_pos(&reader, 0); let mut header_zstd_frame = decode_zstd_single_frame(cursor)?.into(); @@ -163,7 +166,7 @@ impl ForestCar { let header = from_slice_with_fallback::(&block_frame) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok((header, index_start_pos, index_end_pos - index_start_pos)) + Ok((header, index_start_pos, index_len)) } pub fn head_tipset_key(&self) -> &NonEmpty { From 5f11d243580aae789b5b41a90cb1ee174e5585af Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 5 Mar 2026 20:51:17 +0800 Subject: [PATCH 09/11] resolve comments --- src/db/car/forest.rs | 13 +++++++------ src/db/car/forest/index/mod.rs | 31 ++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 7548cd7e5672..54a026bf4ab3 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -6,16 +6,17 @@ //! See [`crate::db::car::plain`] for details on the CAR format. //! //! The `forest.car.zst` format wraps multiple CAR blocks in small (usually 8 KiB) -//! zstd frames, and has an index in a skippable zstd frame. At the end of the -//! data, there has to be a fixed-size skippable frame containing magic numbers -//! and meta information about the archive. CAR blocks may not span multiple -//! z-frames and the CAR header is kept it a separate z-frame. +//! zstd frames, and has an index in one ore more skippable zstd frames (each +//! skippable frame contains up to `u32::MAX` bytes). At the end of the data, there +//! has to be a fixed-size skippable frame containing magic numbers and meta +//! information about the archive. CAR blocks may not span multiple z-frames +//! and the CAR header is kept it a separate z-frame. //! //! Imagine a `forest.car.zst` archive with 5 blocks. They could be arranged in //! z-frames as drawn below: //! //! ```text -//! Z-Frame 1 Z-Frame 2 Z-Frame 3 Skip Frame Skip Frame +//! Z-Frame 1 Z-Frame 2 Z-Frame 3 Skip Frames Skip Frame //! ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────────┐ ┌────────────┐ //! │┌──────┐ │ │┌───────┐│ │┌───────┐│ │Offsets │ │Index offset│ //! ││Header│ │ ││Block 1││ ││Block 4││ │ Z-Frame 2 │ │Magic number│ @@ -42,7 +43,7 @@ //! # Additional reading //! //! `zstd` frame format: -//! +//! skippable `zstd` frames: //! CARv1 specification: //! diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 146184d85430..d8c5890747ca 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -65,7 +65,10 @@ //! ``` use super::ZSTD_SKIP_FRAME_LEN; -use crate::{db::car::plain::write_skip_frame_header_async, utils::misc::env::is_env_truthy}; +use crate::{ + db::car::{forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER, plain::write_skip_frame_header_async}, + utils::misc::env::is_env_truthy, +}; #[cfg_vis(feature = "benchmark-private", pub)] use self::util::NonMaximalU64; @@ -201,9 +204,11 @@ impl ZstdSkipFramesEncodedDataReader { pub fn new(reader: R) -> io::Result { let mut offset = 0; let mut skip_frame_header_offsets = vec![]; - while let Ok(len) = reader.read_u32_at::(offset + 4) { + while let Ok(data_len) = reader + .read_u32_at::(offset + ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER.len() as u64) + { skip_frame_header_offsets.push(offset); - offset += ZSTD_SKIP_FRAME_LEN + len as u64; + offset += ZSTD_SKIP_FRAME_LEN + data_len as u64; } Ok(Self { reader, @@ -243,12 +248,24 @@ where R: ReadAt, { fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result { + // Start with the logical position; we'll shift it forward to account for + // skip-frame headers as we scan through the known header offsets. let mut adjusted_pos = pos; + // Track the physical offset of the next skip-frame header that lies *after* + // the current adjusted position (i.e., still inside the read window). let mut next_frame_pos = None; + + // Walk the sorted list of skip-frame header offsets to translate the + // logical `pos` into a physical offset in the underlying reader. + // For every header whose physical position is at or before `adjusted_pos`, + // the header itself is already "behind" us, so we advance `adjusted_pos` + // by `ZSTD_SKIP_FRAME_LEN` (8 bytes) to skip past it. for &p in self.skip_frame_header_offsets.iter() { if p <= adjusted_pos { adjusted_pos += ZSTD_SKIP_FRAME_LEN; } else { + // The first header that is still ahead of us defines the boundary + // of the current contiguous read window. next_frame_pos = Some(p); break; } @@ -257,12 +274,20 @@ where && let max_read_len = (next_frame_pos - adjusted_pos) as usize && max_read_len < buf.len() { + // The next skip-frame header falls within the requested buffer range. + // Split the read into two parts so we never read across a header boundary: + // 1. Read up to (but not including) the upcoming skip-frame header. + // 2. Recursively read the remainder starting at the logical position + // just after that boundary, placing the result into the rest of the buffer. + // The two byte counts are summed to give the total bytes read. #[allow(clippy::indexing_slicing)] Ok(self .reader .read_at(adjusted_pos, &mut buf[..max_read_len])? + self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?) } else { + // No skip-frame header interrupts this read window; delegate directly + // to the underlying reader at the translated physical position. self.reader.read_at(adjusted_pos, buf) } } From 56752e2757c6ae231ab6f4f925e3cfeff5633aed Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 6 Mar 2026 04:26:22 +0800 Subject: [PATCH 10/11] do not use single frame for version and header --- src/db/car/forest/index/mod.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index d8c5890747ca..bda3dc3710a9 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -538,13 +538,12 @@ impl Writer { slot.write_to(&mut writer).await?; } } else { - // write version and header to a skip frame - let frame_data_len: u32 = (written_len(&version) + written_len(&header)) as u32; - write_skip_frame_header_async(&mut writer, frame_data_len).await?; - version.write_to(&mut writer).await?; - header.write_to(&mut writer).await?; - let mut buf = Vec::with_capacity(skip_frame_data_max_bytes); + + // write version and header + version.write_to(&mut buf).await?; + header.write_to(&mut buf).await?; + for slot in slots { slot.write_to(&mut buf).await?; if buf.len() >= skip_frame_data_max_bytes { From 8e9078c7f8098ecd54896c2b0c7af2408eac6e89 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 6 Mar 2026 04:45:20 +0800 Subject: [PATCH 11/11] fix ut --- src/db/car/forest/index/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index bda3dc3710a9..c35f1a9b91e5 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -820,7 +820,7 @@ mod tests { block_on(async { if multi_index_frame { writer - .write_zstd_skip_frames_into_inner(&mut *v, 1024, None) + .write_zstd_skip_frames_into_inner(&mut *v, 128, None) .await } else { writer.write_zstd_skip_frames_into(&mut *v).await @@ -830,7 +830,7 @@ mod tests { })) .unwrap(); if multi_index_frame { - assert!(r.skip_frame_header_offsets.len() > 1); + assert!(!r.skip_frame_header_offsets.is_empty()); } else { assert_eq!(r.skip_frame_header_offsets.len(), 1); } @@ -860,7 +860,7 @@ mod tests { block_on(async { if multi_index_frame { writer - .write_zstd_skip_frames_into_inner(&mut *v, 1024, None) + .write_zstd_skip_frames_into_inner(&mut *v, 128, None) .await } else { writer.write_zstd_skip_frames_into(&mut *v).await @@ -870,7 +870,7 @@ mod tests { })) .unwrap(); if multi_index_frame { - assert!(r.skip_frame_header_offsets.len() > 1); + assert!(!r.skip_frame_header_offsets.is_empty()); } else { assert_eq!(r.skip_frame_header_offsets.len(), 1); }