diff --git a/benches/car-index.rs b/benches/car-index.rs index 4da6d5d8a44d..0393c11c4eb3 100644 --- a/benches/car-index.rs +++ b/benches/car-index.rs @@ -35,7 +35,7 @@ fn bench_car_index(c: &mut Criterion) { block_on( index::Builder::from_iter(reference.clone()) .into_writer() - .write_into(&mut v), + .write_zstd_skip_frames_into(&mut v), ) .unwrap(); index::Reader::new(v).unwrap() diff --git a/src/db/car/any.rs b/src/db/car/any.rs index a3347a214f45..ccd5174d5a36 100644 --- a/src/db/car/any.rs +++ b/src/db/car/any.rs @@ -91,12 +91,12 @@ impl AnyCar { } /// Discard reader type and replace with dynamic trait object. - pub fn into_dyn(self) -> AnyCar> { - match self { - AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()), + pub fn into_dyn(self) -> Result>> { + Ok(match self { + AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()?), AnyCar::Plain(p) => AnyCar::Plain(p.into_dyn()), AnyCar::Memory(m) => AnyCar::Memory(m), - } + }) } /// Set the z-frame cache of the inner CAR reader. @@ -109,7 +109,7 @@ impl AnyCar { } /// Get the index size in bytes - pub fn index_size_bytes(&self) -> Option { + pub fn index_size_bytes(&self) -> Option { match self { Self::Forest(car) => Some(car.index_size_bytes()), _ => None, diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 74fd27780ef6..54a026bf4ab3 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -6,16 +6,17 @@ //! See [`crate::db::car::plain`] for details on the CAR format. //! //! The `forest.car.zst` format wraps multiple CAR blocks in small (usually 8 KiB) -//! zstd frames, and has an index in a skippable zstd frame. At the end of the -//! data, there has to be a fixed-size skippable frame containing magic numbers -//! and meta information about the archive. CAR blocks may not span multiple -//! z-frames and the CAR header is kept it a separate z-frame. +//! zstd frames, and has an index in one ore more skippable zstd frames (each +//! skippable frame contains up to `u32::MAX` bytes). At the end of the data, there +//! has to be a fixed-size skippable frame containing magic numbers and meta +//! information about the archive. CAR blocks may not span multiple z-frames +//! and the CAR header is kept it a separate z-frame. //! //! Imagine a `forest.car.zst` archive with 5 blocks. They could be arranged in //! z-frames as drawn below: //! //! ```text -//! Z-Frame 1 Z-Frame 2 Z-Frame 3 Skip Frame Skip Frame +//! Z-Frame 1 Z-Frame 2 Z-Frame 3 Skip Frames Skip Frame //! ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────────┐ ┌────────────┐ //! │┌──────┐ │ │┌───────┐│ │┌───────┐│ │Offsets │ │Index offset│ //! ││Header│ │ ││Block 1││ ││Block 4││ │ Z-Frame 2 │ │Magic number│ @@ -42,7 +43,7 @@ //! # Additional reading //! //! `zstd` frame format: -//! +//! skippable `zstd` frames: //! CARv1 specification: //! @@ -50,12 +51,11 @@ use super::{CacheKey, ZstdFrameCache}; use crate::blocks::{Tipset, TipsetKey}; use crate::chain::FilecoinSnapshotMetadata; use crate::db::car::RandomAccessFileReader; -use crate::db::car::plain::write_skip_frame_header_async; +use crate::db::car::forest::index::ZstdSkipFramesEncodedDataReader; use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes}; use crate::utils::encoding::from_slice_with_fallback; use crate::utils::get_size::CidWrapper; use crate::utils::io::EitherMmapOrRandomAccessFile; -use byteorder::LittleEndian; use bytes::{BufMut as _, Bytes, BytesMut, buf::Writer}; use cid::Cid; use futures::{Stream, TryStreamExt as _}; @@ -63,7 +63,7 @@ use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore as _; use integer_encoding::VarIntReader; use nunny::Vec as NonEmpty; -use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor}; +use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor}; use std::io::{Seek, SeekFrom}; use std::path::Path; use std::sync::{Arc, OnceLock}; @@ -86,7 +86,7 @@ pub const TEMP_FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst.tmp"; pub const ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER: [u8; 4] = [0x50, 0x2A, 0x4D, 0x18]; pub const DEFAULT_FOREST_CAR_FRAME_SIZE: usize = 8000_usize.next_power_of_two(); pub const DEFAULT_FOREST_CAR_COMPRESSION_LEVEL: u16 = zstd::DEFAULT_COMPRESSION_LEVEL as _; -const ZSTD_SKIP_FRAME_LEN: u64 = 8; +pub const ZSTD_SKIP_FRAME_LEN: u64 = 8; /// `zstd` frame of Forest CAR pub type ForestCarFrame = (Vec, Bytes); @@ -95,8 +95,8 @@ pub struct ForestCar { // Multiple `ForestCar` structures may share the same cache. The cache key is used to identify // the origin of a cached z-frame. cache_key: CacheKey, - indexed: index::Reader>, - index_size_bytes: u32, + indexed: index::Reader>>, + index_size_bytes: u64, frame_cache: Arc, header: CarV1Header, metadata: OnceLock>, @@ -104,15 +104,10 @@ pub struct ForestCar { impl ForestCar { pub fn new(reader: ReaderT) -> io::Result> { - let (header, footer) = Self::validate_car(&reader)?; - let index_size_bytes = reader.read_u32_at::( - footer.index.saturating_sub(std::mem::size_of::() as _), - )?; - let indexed = index::Reader::new(positioned_io::Slice::new( - reader, - footer.index, - Some(index_size_bytes as u64), - ))?; + let (header, index_start_pos, index_size_bytes) = Self::validate_car(&reader)?; + let indexed = index::Reader::new(index::ZstdSkipFramesEncodedDataReader::new( + positioned_io::Slice::new(reader, index_start_pos, Some(index_size_bytes)), + )?)?; Ok(ForestCar { cache_key: 0, indexed, @@ -141,9 +136,10 @@ impl ForestCar { Self::validate_car(reader).is_ok() } - fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, ForestCarFooter)> { + fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, u64, u64)> { let mut cursor = SizeCursor::new(&reader); cursor.seek(SeekFrom::End(-(ForestCarFooter::SIZE as i64)))?; + let index_end_pos = cursor.position(); let mut footer_buffer = [0; ForestCarFooter::SIZE]; cursor.read_exact(&mut footer_buffer)?; @@ -153,6 +149,15 @@ impl ForestCar { "not recognizable as a `{FOREST_CAR_FILE_EXTENSION}` file" )) })?; + let index_start_pos = footer.index.checked_sub(ZSTD_SKIP_FRAME_LEN).ok_or_else(|| + invalid_data(format!( + "unexpected error: footer.index({}) < ZSTD_SKIP_FRAME_LEN({ZSTD_SKIP_FRAME_LEN})", + footer.index + )), + )?; + let index_len = index_end_pos.checked_sub(index_start_pos).ok_or_else(|| + invalid_data(format!("unexpected error: index_end_pos({index_end_pos}) < index_start_pos({index_start_pos})")) + )?; let cursor = Cursor::new_pos(&reader, 0); let mut header_zstd_frame = decode_zstd_single_frame(cursor)?.into(); @@ -162,7 +167,7 @@ impl ForestCar { let header = from_slice_with_fallback::(&block_frame) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok((header, footer)) + Ok((header, index_start_pos, index_len)) } pub fn head_tipset_key(&self) -> &NonEmpty { @@ -175,7 +180,7 @@ impl ForestCar { } } - pub fn index_size_bytes(&self) -> u32 { + pub fn index_size_bytes(&self) -> u64 { self.index_size_bytes } @@ -187,22 +192,23 @@ impl ForestCar { Tipset::load_required(self, &self.heaviest_tipset_key()) } - pub fn into_dyn(self) -> ForestCar> { - ForestCar { + pub fn into_dyn(self) -> io::Result>> { + Ok(ForestCar { cache_key: self.cache_key, indexed: self.indexed.map(|slice| { - let offset = slice.offset(); - positioned_io::Slice::new( - Box::new(slice.into_inner()) as Box, + let offset = slice.inner().offset(); + let size = slice.inner().size()?; + ZstdSkipFramesEncodedDataReader::new(positioned_io::Slice::new( + Box::new(slice.into_inner().into_inner()) as Box, offset, - None, - ) - }), + size, + )) + })?, index_size_bytes: self.index_size_bytes, frame_cache: self.frame_cache, header: self.header, metadata: self.metadata, - } + }) } pub fn with_cache(self, cache: Arc, key: CacheKey) -> Self { @@ -217,7 +223,7 @@ impl ForestCar { pub fn get_reader(&self, k: Cid) -> anyhow::Result> { for position in self.indexed.get(k)? { // escape the positioned_io::Slice - let entire_file = self.indexed.reader().get_ref(); + let entire_file = self.indexed.reader().inner().get_ref(); // `position` is the frame start offset. let cursor = Cursor::new_pos(entire_file, position); let mut decoder = zstd::Decoder::new(cursor)?.single_frame(); @@ -259,7 +265,7 @@ where Some(None) => {} None => { // Decode entire frame into memory, "position" arg is the frame start offset. - let entire_file = indexed.reader().get_ref(); // escape the positioned_io::Slice + let entire_file = indexed.reader().inner().get_ref(); // escape the positioned_io::Slice let cursor = Cursor::new_pos(entire_file, position); let mut zstd_frame = decode_zstd_single_frame(cursor)?.into(); // Parse all key-value pairs and insert them into a map @@ -332,8 +338,7 @@ impl Encoder { // Create index let writer = builder.into_writer(); - write_skip_frame_header_async(&mut sink, writer.written_len().try_into().unwrap()).await?; - writer.write_into(&mut sink).await?; + writer.write_zstd_skip_frames_into(&mut sink).await?; // Write ForestCAR.zst footer, it's a valid ZSTD skip-frame let footer = ForestCarFooter { diff --git a/src/db/car/forest/index/mod.rs b/src/db/car/forest/index/mod.rs index 84a3d9783213..c35f1a9b91e5 100644 --- a/src/db/car/forest/index/mod.rs +++ b/src/db/car/forest/index/mod.rs @@ -64,13 +64,19 @@ //! ├──────────────┤ <- Zstd skip frame header) //! ``` +use super::ZSTD_SKIP_FRAME_LEN; +use crate::{ + db::car::{forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER, plain::write_skip_frame_header_async}, + utils::misc::env::is_env_truthy, +}; + #[cfg_vis(feature = "benchmark-private", pub)] use self::util::NonMaximalU64; use byteorder::{LittleEndian, ReadBytesExt as _}; use cfg_vis::cfg_vis; use cid::Cid; -use itertools::Itertools as _; -use positioned_io::{ReadAt, Size}; +use itertools::Itertools; +use positioned_io::{ReadAt, ReadBytesAtExt as _, Size}; use smallvec::{SmallVec, smallvec}; use std::{ cmp, @@ -180,11 +186,109 @@ where /// Replace the inner reader. /// It MUST point to the same underlying IO, else future calls to `get` /// will be incorrect. - pub fn map(self, f: impl FnOnce(R) -> T) -> Reader { - Reader { - inner: f(self.inner), + pub fn map(self, f: impl FnOnce(R) -> io::Result) -> io::Result> { + Ok(Reader { + inner: f(self.inner)?, table_offset: self.table_offset, header: self.header, + }) + } +} + +pub struct ZstdSkipFramesEncodedDataReader { + reader: R, + skip_frame_header_offsets: Vec, +} + +impl ZstdSkipFramesEncodedDataReader { + pub fn new(reader: R) -> io::Result { + let mut offset = 0; + let mut skip_frame_header_offsets = vec![]; + while let Ok(data_len) = reader + .read_u32_at::(offset + ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER.len() as u64) + { + skip_frame_header_offsets.push(offset); + offset += ZSTD_SKIP_FRAME_LEN + data_len as u64; + } + Ok(Self { + reader, + skip_frame_header_offsets, + }) + } + + pub fn inner(&self) -> &R { + &self.reader + } + + pub fn into_inner(self) -> R { + self.reader + } +} + +impl Size for ZstdSkipFramesEncodedDataReader { + fn size(&self) -> io::Result> { + if let Some(size) = self.reader.size()? { + let total_header_size = + ZSTD_SKIP_FRAME_LEN * self.skip_frame_header_offsets.len() as u64; + if size >= total_header_size { + Ok(Some(size - total_header_size)) + } else { + Err(io::Error::other(format!( + "unexpected error: size({size}) < total_header_size({total_header_size})" + ))) + } + } else { + Ok(None) + } + } +} + +impl ReadAt for ZstdSkipFramesEncodedDataReader +where + R: ReadAt, +{ + fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result { + // Start with the logical position; we'll shift it forward to account for + // skip-frame headers as we scan through the known header offsets. + let mut adjusted_pos = pos; + // Track the physical offset of the next skip-frame header that lies *after* + // the current adjusted position (i.e., still inside the read window). + let mut next_frame_pos = None; + + // Walk the sorted list of skip-frame header offsets to translate the + // logical `pos` into a physical offset in the underlying reader. + // For every header whose physical position is at or before `adjusted_pos`, + // the header itself is already "behind" us, so we advance `adjusted_pos` + // by `ZSTD_SKIP_FRAME_LEN` (8 bytes) to skip past it. + for &p in self.skip_frame_header_offsets.iter() { + if p <= adjusted_pos { + adjusted_pos += ZSTD_SKIP_FRAME_LEN; + } else { + // The first header that is still ahead of us defines the boundary + // of the current contiguous read window. + next_frame_pos = Some(p); + break; + } + } + if let Some(next_frame_pos) = next_frame_pos + && let max_read_len = (next_frame_pos - adjusted_pos) as usize + && max_read_len < buf.len() + { + // The next skip-frame header falls within the requested buffer range. + // Split the read into two parts so we never read across a header boundary: + // 1. Read up to (but not including) the upcoming skip-frame header. + // 2. Recursively read the remainder starting at the logical position + // just after that boundary, placing the result into the rest of the buffer. + // The two byte counts are summed to give the total bytes read. + #[allow(clippy::indexing_slicing)] + Ok(self + .reader + .read_at(adjusted_pos, &mut buf[..max_read_len])? + + self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?) + } else { + // No skip-frame header interrupts this read window; delegate directly + // to the underlying reader at the translated physical position. + self.reader.read_at(adjusted_pos, buf) } } } @@ -358,7 +462,8 @@ pub struct Writer { } impl Writer { - pub fn written_len(&self) -> u64 { + // To keep backward compatibility, remove after NV28 release + fn written_len(&self) -> u64 { let Self { version, header, @@ -394,21 +499,66 @@ impl Writer { .pad_using(min_slots, |_ix| Slot::Empty) .chain(iter::once(Slot::Empty)) } - pub async fn write_into(self, writer: impl AsyncWrite) -> io::Result<()> { + pub async fn write_zstd_skip_frames_into(self, writer: impl AsyncWrite) -> io::Result<()> { + // write every 512MiB slots to a skip frame + const CHUNK_FRAME_DATA_MAX_BYTES: usize = 512 * 1024 * 1024; + let written_len = self.written_len(); + self.write_zstd_skip_frames_into_inner( + writer, + CHUNK_FRAME_DATA_MAX_BYTES, + u32::try_from(written_len).ok(), + ) + .await + } + async fn write_zstd_skip_frames_into_inner( + self, + writer: impl AsyncWrite, + skip_frame_data_max_bytes: usize, + // To keep backward compatibility, remove after NV28 release + index_data_len: Option, + ) -> io::Result<()> { let mut writer = pin!(writer); let Self { version, header, slots, } = self; - version.write_to(&mut writer).await?; - header.write_to(&mut writer).await?; - for slot in Self::slots( + let slots = Self::slots( header.initial_buckets.try_into().unwrap(), slots.iter().copied(), - ) { - slot.write_to(&mut writer).await?; + ); + if let Some(index_data_len) = index_data_len + && !is_env_truthy("FOREST_CAR_INDEX_USE_MULTIPLE_SKIP_FRAMES") + { + // To keep backward compatibility, remove after NV28 release + write_skip_frame_header_async(&mut writer, index_data_len).await?; + version.write_to(&mut writer).await?; + header.write_to(&mut writer).await?; + for slot in slots { + slot.write_to(&mut writer).await?; + } + } else { + let mut buf = Vec::with_capacity(skip_frame_data_max_bytes); + + // write version and header + version.write_to(&mut buf).await?; + header.write_to(&mut buf).await?; + + for slot in slots { + slot.write_to(&mut buf).await?; + if buf.len() >= skip_frame_data_max_bytes { + write_skip_frame_header_async(&mut writer, buf.len() as u32).await?; + writer.write_all(&buf).await?; + buf.clear(); + } + } + + if !buf.is_empty() { + write_skip_frame_header_async(&mut writer, buf.len() as u32).await?; + writer.write_all(&buf).await?; + } } + Ok(()) } } @@ -607,7 +757,7 @@ trait Writable { } /// Useful for exhaustiveness checking -fn written_len(_: T) -> u64 { +fn written_len(_: &T) -> u64 { T::LEN } @@ -660,21 +810,35 @@ mod tests { /// [`Reader`] should behave like a [`HashMap`], with a caveat for collisions. fn do_hashmap_of_cids(reference: HashMap>) { - let subject = Reader::new(write_to_vec(|v| { - let writer = - Builder::from_iter(reference.clone().into_iter().flat_map(|(hash, offsets)| { - offsets.into_iter().map(move |offset| (hash, offset)) + for multi_index_frame in [false, true] { + let r: ZstdSkipFramesEncodedDataReader> = + ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { + let writer = Builder::from_iter(reference.clone().into_iter().flat_map( + |(hash, offsets)| offsets.into_iter().map(move |offset| (hash, offset)), + )) + .into_writer(); + block_on(async { + if multi_index_frame { + writer + .write_zstd_skip_frames_into_inner(&mut *v, 128, None) + .await + } else { + writer.write_zstd_skip_frames_into(&mut *v).await + } + })?; + Ok(()) })) - .into_writer(); - let expected_len = writer.written_len(); - block_on(writer.write_into(&mut *v))?; - assert_eq!(expected_len as usize, v.len()); - Ok(()) - })) - .unwrap(); - for (cid, expected) in reference { - let actual = subject.get(cid).unwrap().into_iter().collect(); - assert!(expected.is_subset(&actual)); // collisions + .unwrap(); + if multi_index_frame { + assert!(!r.skip_frame_header_offsets.is_empty()); + } else { + assert_eq!(r.skip_frame_header_offsets.len(), 1); + } + let subject = Reader::new(r).unwrap(); + for (&cid, expected) in &reference { + let actual = subject.get(cid).unwrap().into_iter().collect(); + assert!(expected.is_subset(&actual)); // collisions + } } } @@ -687,38 +851,53 @@ mod tests { /// /// Additionally checks [`Reader::iter`] fn do_hashmap_of_hashes(reference: HashMap>) { - let subject = Reader::new(write_to_vec(|v| { - let writer = - Builder::from_iter(reference.clone().into_iter().flat_map(|(hash, offsets)| { - offsets.into_iter().map(move |offset| (hash, offset)) - })) + for multi_index_frame in [false, true] { + let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| { + let writer = Builder::from_iter(reference.clone().into_iter().flat_map( + |(hash, offsets)| offsets.into_iter().map(move |offset| (hash, offset)), + )) .into_writer(); - let expected_len = writer.written_len(); - block_on(writer.write_into(&mut *v))?; - assert_eq!(expected_len as usize, v.len()); - Ok(()) - })) - .unwrap(); - for (hash, expected) in &reference { - let actual = subject.get_by_hash(*hash).unwrap().into_iter().collect(); - assert!(expected.is_subset(&actual)) // collisions - } + block_on(async { + if multi_index_frame { + writer + .write_zstd_skip_frames_into_inner(&mut *v, 128, None) + .await + } else { + writer.write_zstd_skip_frames_into(&mut *v).await + } + })?; + Ok(()) + })) + .unwrap(); + if multi_index_frame { + assert!(!r.skip_frame_header_offsets.is_empty()); + } else { + assert_eq!(r.skip_frame_header_offsets.len(), 1); + } + let subject = Reader::new(r).unwrap(); + for (hash, expected) in &reference { + let actual = subject.get_by_hash(*hash).unwrap().into_iter().collect(); + assert!(expected.is_subset(&actual)) // collisions + } - let via_iter = subject - .iter() - .unwrap() - .filter_map(|it| match it.unwrap() { - Slot::Empty => None, - Slot::Occupied(it) => Some(it), - }) - .chunk_by(|it| it.hash) - .into_iter() - .map(|(hash, group)| (hash, HashSet::from_iter(group.map(|it| it.frame_offset)))) - .collect::>(); - assert_eq!( - via_iter, - reference.tap_mut(|it| it.retain(|_, v| !v.is_empty())) - ); + let via_iter = subject + .iter() + .unwrap() + .filter_map(|it| match it.unwrap() { + Slot::Empty => None, + Slot::Occupied(it) => Some(it), + }) + .chunk_by(|it| it.hash) + .into_iter() + .map(|(hash, group)| (hash, HashSet::from_iter(group.map(|it| it.frame_offset)))) + .collect::>(); + assert_eq!( + via_iter, + reference + .clone() + .tap_mut(|it| it.retain(|_, v| !v.is_empty())) + ); + } } quickcheck::quickcheck! { @@ -757,8 +936,7 @@ mod tests { #[track_caller] fn round_trip(original: &T) { - let serialized = - write_to_vec(|v| tokio::runtime::Runtime::new()?.block_on(original.write_to(v))); + let serialized = write_to_vec(|v| block_on(original.write_to(v))); assert_eq!( serialized.len(), usize::try_from(written_len(original)).unwrap() diff --git a/src/db/car/many.rs b/src/db/car/many.rs index fffa41a63c73..6b53c3ba6747 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -108,7 +108,7 @@ impl ManyCar { read_only.push(WithHeaviestEpoch::new( any_car .with_cache(self.shared_cache.clone(), key) - .into_dyn(), + .into_dyn()?, )?); Ok(()) diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index e08f9e2fa778..8c418f524c96 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -314,7 +314,7 @@ pub struct ArchiveInfo { events: usize, head: Tipset, snapshot_version: FilecoinSnapshotVersion, - index_size_bytes: Option, + index_size_bytes: Option, } impl std::fmt::Display for ArchiveInfo { @@ -354,7 +354,7 @@ impl ArchiveInfo { variant: String, heaviest_tipset: Tipset, snapshot_version: FilecoinSnapshotVersion, - index_size_bytes: Option, + index_size_bytes: Option, ) -> anyhow::Result { Self::from_store_with( store, @@ -374,7 +374,7 @@ impl ArchiveInfo { variant: String, heaviest_tipset: Tipset, snapshot_version: FilecoinSnapshotVersion, - index_size_bytes: Option, + index_size_bytes: Option, progress: bool, ) -> anyhow::Result { let head = heaviest_tipset;