Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/car-index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn bench_car_index(c: &mut Criterion) {
block_on(
index::Builder::from_iter(reference.clone())
.into_writer()
.write_into(&mut v),
.write_zstd_skip_frames_into(&mut v),
)
.unwrap();
index::Reader::new(v).unwrap()
Expand Down
10 changes: 5 additions & 5 deletions src/db/car/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Discard reader type and replace with dynamic trait object.
pub fn into_dyn(self) -> AnyCar<Box<dyn super::RandomAccessFileReader>> {
match self {
AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()),
pub fn into_dyn(self) -> Result<AnyCar<Box<dyn super::RandomAccessFileReader>>> {
Ok(match self {
AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()?),
AnyCar::Plain(p) => AnyCar::Plain(p.into_dyn()),
AnyCar::Memory(m) => AnyCar::Memory(m),
}
})
}

/// Set the z-frame cache of the inner CAR reader.
Expand All @@ -109,7 +109,7 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Get the index size in bytes
pub fn index_size_bytes(&self) -> Option<u32> {
pub fn index_size_bytes(&self) -> Option<u64> {
match self {
Self::Forest(car) => Some(car.index_size_bytes()),
_ => None,
Expand Down
79 changes: 42 additions & 37 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
//! See [`crate::db::car::plain`] for details on the CAR format.
//!
//! The `forest.car.zst` format wraps multiple CAR blocks in small (usually 8 KiB)
//! zstd frames, and has an index in a skippable zstd frame. At the end of the
//! data, there has to be a fixed-size skippable frame containing magic numbers
//! and meta information about the archive. CAR blocks may not span multiple
//! z-frames and the CAR header is kept it a separate z-frame.
//! zstd frames, and has an index in one ore more skippable zstd frames (each
//! skippable frame contains up to `u32::MAX` bytes). At the end of the data, there
//! has to be a fixed-size skippable frame containing magic numbers and meta
//! information about the archive. CAR blocks may not span multiple z-frames
//! and the CAR header is kept it a separate z-frame.
//!
//! Imagine a `forest.car.zst` archive with 5 blocks. They could be arranged in
//! z-frames as drawn below:
//!
//! ```text
//! Z-Frame 1 Z-Frame 2 Z-Frame 3 Skip Frame Skip Frame
//! Z-Frame 1 Z-Frame 2 Z-Frame 3 Skip Frames Skip Frame
//! ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────────┐ ┌────────────┐
//! │┌──────┐ │ │┌───────┐│ │┌───────┐│ │Offsets │ │Index offset│
//! ││Header│ │ ││Block 1││ ││Block 4││ │ Z-Frame 2 │ │Magic number│
Expand All @@ -42,28 +43,27 @@
//! # Additional reading
//!
//! `zstd` frame format: <https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md>
//!
//! skippable `zstd` frames: <https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#skippable-frames>
//! CARv1 specification: <https://ipld.io/specs/transport/car/carv1/>
//!

use super::{CacheKey, ZstdFrameCache};
use crate::blocks::{Tipset, TipsetKey};
use crate::chain::FilecoinSnapshotMetadata;
use crate::db::car::RandomAccessFileReader;
use crate::db::car::plain::write_skip_frame_header_async;
use crate::db::car::forest::index::ZstdSkipFramesEncodedDataReader;
use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes};
use crate::utils::encoding::from_slice_with_fallback;
use crate::utils::get_size::CidWrapper;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use byteorder::LittleEndian;
use bytes::{BufMut as _, Bytes, BytesMut, buf::Writer};
use cid::Cid;
use futures::{Stream, TryStreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore as _;
use integer_encoding::VarIntReader;
use nunny::Vec as NonEmpty;
use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
use positioned_io::{Cursor, ReadAt, Size as _, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::path::Path;
use std::sync::{Arc, OnceLock};
Expand All @@ -86,7 +86,7 @@ pub const TEMP_FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst.tmp";
pub const ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER: [u8; 4] = [0x50, 0x2A, 0x4D, 0x18];
pub const DEFAULT_FOREST_CAR_FRAME_SIZE: usize = 8000_usize.next_power_of_two();
pub const DEFAULT_FOREST_CAR_COMPRESSION_LEVEL: u16 = zstd::DEFAULT_COMPRESSION_LEVEL as _;
const ZSTD_SKIP_FRAME_LEN: u64 = 8;
pub const ZSTD_SKIP_FRAME_LEN: u64 = 8;

/// `zstd` frame of Forest CAR
pub type ForestCarFrame = (Vec<Cid>, Bytes);
Expand All @@ -95,24 +95,19 @@ pub struct ForestCar<ReaderT> {
// Multiple `ForestCar` structures may share the same cache. The cache key is used to identify
// the origin of a cached z-frame.
cache_key: CacheKey,
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
index_size_bytes: u32,
indexed: index::Reader<index::ZstdSkipFramesEncodedDataReader<positioned_io::Slice<ReaderT>>>,
index_size_bytes: u64,
frame_cache: Arc<ZstdFrameCache>,
header: CarV1Header,
metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
}

impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
pub fn new(reader: ReaderT) -> io::Result<ForestCar<ReaderT>> {
let (header, footer) = Self::validate_car(&reader)?;
let index_size_bytes = reader.read_u32_at::<LittleEndian>(
footer.index.saturating_sub(std::mem::size_of::<u32>() as _),
)?;
let indexed = index::Reader::new(positioned_io::Slice::new(
reader,
footer.index,
Some(index_size_bytes as u64),
))?;
let (header, index_start_pos, index_size_bytes) = Self::validate_car(&reader)?;
let indexed = index::Reader::new(index::ZstdSkipFramesEncodedDataReader::new(
positioned_io::Slice::new(reader, index_start_pos, Some(index_size_bytes)),
)?)?;
Ok(ForestCar {
cache_key: 0,
indexed,
Expand Down Expand Up @@ -141,9 +136,10 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
Self::validate_car(reader).is_ok()
}

fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, ForestCarFooter)> {
fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, u64, u64)> {
let mut cursor = SizeCursor::new(&reader);
cursor.seek(SeekFrom::End(-(ForestCarFooter::SIZE as i64)))?;
let index_end_pos = cursor.position();

let mut footer_buffer = [0; ForestCarFooter::SIZE];
cursor.read_exact(&mut footer_buffer)?;
Expand All @@ -153,6 +149,15 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
"not recognizable as a `{FOREST_CAR_FILE_EXTENSION}` file"
))
})?;
let index_start_pos = footer.index.checked_sub(ZSTD_SKIP_FRAME_LEN).ok_or_else(||
invalid_data(format!(
"unexpected error: footer.index({}) < ZSTD_SKIP_FRAME_LEN({ZSTD_SKIP_FRAME_LEN})",
footer.index
)),
)?;
let index_len = index_end_pos.checked_sub(index_start_pos).ok_or_else(||
invalid_data(format!("unexpected error: index_end_pos({index_end_pos}) < index_start_pos({index_start_pos})"))
)?;

let cursor = Cursor::new_pos(&reader, 0);
let mut header_zstd_frame = decode_zstd_single_frame(cursor)?.into();
Expand All @@ -162,7 +167,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
let header = from_slice_with_fallback::<CarV1Header>(&block_frame)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

Ok((header, footer))
Ok((header, index_start_pos, index_len))
}

pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
Expand All @@ -175,7 +180,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
}
}

pub fn index_size_bytes(&self) -> u32 {
pub fn index_size_bytes(&self) -> u64 {
self.index_size_bytes
}

Expand All @@ -187,22 +192,23 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
Tipset::load_required(self, &self.heaviest_tipset_key())
}

pub fn into_dyn(self) -> ForestCar<Box<dyn super::RandomAccessFileReader>> {
ForestCar {
pub fn into_dyn(self) -> io::Result<ForestCar<Box<dyn super::RandomAccessFileReader>>> {
Ok(ForestCar {
cache_key: self.cache_key,
indexed: self.indexed.map(|slice| {
let offset = slice.offset();
positioned_io::Slice::new(
Box::new(slice.into_inner()) as Box<dyn RandomAccessFileReader>,
let offset = slice.inner().offset();
let size = slice.inner().size()?;
ZstdSkipFramesEncodedDataReader::new(positioned_io::Slice::new(
Box::new(slice.into_inner().into_inner()) as Box<dyn RandomAccessFileReader>,
offset,
None,
)
}),
size,
))
})?,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
index_size_bytes: self.index_size_bytes,
frame_cache: self.frame_cache,
header: self.header,
metadata: self.metadata,
}
})
}

pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
Expand All @@ -217,7 +223,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
pub fn get_reader(&self, k: Cid) -> anyhow::Result<Option<impl Read>> {
for position in self.indexed.get(k)? {
// escape the positioned_io::Slice
let entire_file = self.indexed.reader().get_ref();
let entire_file = self.indexed.reader().inner().get_ref();
// `position` is the frame start offset.
let cursor = Cursor::new_pos(entire_file, position);
let mut decoder = zstd::Decoder::new(cursor)?.single_frame();
Expand Down Expand Up @@ -259,7 +265,7 @@ where
Some(None) => {}
None => {
// Decode entire frame into memory, "position" arg is the frame start offset.
let entire_file = indexed.reader().get_ref(); // escape the positioned_io::Slice
let entire_file = indexed.reader().inner().get_ref(); // escape the positioned_io::Slice
let cursor = Cursor::new_pos(entire_file, position);
let mut zstd_frame = decode_zstd_single_frame(cursor)?.into();
// Parse all key-value pairs and insert them into a map
Expand Down Expand Up @@ -332,8 +338,7 @@ impl Encoder {

// Create index
let writer = builder.into_writer();
write_skip_frame_header_async(&mut sink, writer.written_len().try_into().unwrap()).await?;
writer.write_into(&mut sink).await?;
writer.write_zstd_skip_frames_into(&mut sink).await?;

// Write ForestCAR.zst footer, it's a valid ZSTD skip-frame
let footer = ForestCarFooter {
Expand Down
Loading
Loading