From e7196160df2607115d96232bd5d5e9ba144ae953 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 19 Sep 2025 15:28:53 +0800 Subject: [PATCH 1/4] fix: CarStream should support large data blocks like F3 snap --- src/daemon/db_util.rs | 12 +++++++++++- src/db/car/forest.rs | 17 +++++++++-------- src/utils/db/car_stream.rs | 10 ++++++++-- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index 666020166d80..ec79b6c80419 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -161,7 +161,12 @@ pub async fn import_chain_as_forest_car( snapshot_progress_tracker.completed(); } else { snapshot_progress_tracker.not_required(); - move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?; + if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) { + move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?; + } else { + // For a local snapshot, we transcode directly instead of copying & transcoding. + transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?; + } } if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open( @@ -292,6 +297,11 @@ fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow: } async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> { + tracing::info!( + from = %from.display(), + to = %to.display(), + "transcoding into forest car" + ); let car_stream = CarStream::new(tokio::io::BufReader::new( tokio::fs::File::open(from).await?, )) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 7603bed01b6b..17ef3fa0c908 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -51,7 +51,7 @@ use crate::blocks::{Tipset, TipsetKey}; use crate::chain::FilecoinSnapshotMetadata; use crate::db::car::RandomAccessFileReader; use crate::db::car::plain::write_skip_frame_header_async; -use crate::utils::db::car_stream::{CarBlock, CarV1Header}; +use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes}; use crate::utils::encoding::from_slice_with_fallback; use crate::utils::get_size::CidWrapper; use crate::utils::io::EitherMmapOrRandomAccessFile; @@ -74,7 +74,6 @@ use std::{ }; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{Decoder, Encoder as _}; -use unsigned_varint::codec::UviBytes; #[cfg(feature = "benchmark-private")] pub mod index; @@ -157,7 +156,7 @@ impl ForestCar { let cursor = Cursor::new_pos(&reader, 0); let mut header_zstd_frame = decode_zstd_single_frame(cursor)?; - let block_frame = UviBytes::::default() + let block_frame = uvi_bytes() .decode(&mut header_zstd_frame)? .ok_or_else(|| invalid_data("malformed uvibytes"))?; let header = from_slice_with_fallback::(&block_frame) @@ -265,9 +264,7 @@ where let mut zstd_frame = decode_zstd_single_frame(cursor)?; // Parse all key-value pairs and insert them into a map let mut block_map = hashbrown::HashMap::new(); - while let Some(block_frame) = - UviBytes::::default().decode_eof(&mut zstd_frame)? - { + while let Some(block_frame) = uvi_bytes().decode_eof(&mut zstd_frame)? { let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?; block_map.insert(cid.into(), data); } @@ -313,7 +310,7 @@ impl Encoder { let header = CarV1Header { roots, version: 1 }; let mut header_uvi_frame = BytesMut::new(); - UviBytes::default().encode( + uvi_bytes().encode( Bytes::from(fvm_ipld_encoding::to_vec(&header)?), &mut header_uvi_frame, )?; @@ -402,7 +399,11 @@ impl Encoder { } } // Pass errors through - Some(Err(e)) => return Poll::Ready(Some(Err(e))), + Some(Err(e)) => { + return Poll::Ready(Some(Err(anyhow::anyhow!( + "error polling CarBlock from stream: {e}" + )))); + } // Got element, add to encoder and emit block position Some(Ok(block)) => { frame_cids.push(block.cid); diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 99a5c296b448..73f6c3e58e61 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -172,7 +172,7 @@ impl CarStream { .as_ref() .map(|h| h.data_size as u64) .unwrap_or(u64::MAX); - let mut reader = FramedRead::new(reader.take(max_car_v1_bytes), UviBytes::default()); + let mut reader = FramedRead::new(reader.take(max_car_v1_bytes), uvi_bytes()); let header_v1 = read_v1_header(&mut reader) .await .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid v1 header block"))?; @@ -283,7 +283,7 @@ impl CarWriter { let car_header = CarV1Header { roots, version: 1 }; let mut header_uvi_frame = BytesMut::new(); - UviBytes::default().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?; + uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?; Ok(Self { inner: writer, @@ -329,6 +329,12 @@ async fn read_v1_header( Some(header) } +pub fn uvi_bytes() -> UviBytes { + let mut decoder = UviBytes::default(); + decoder.set_max_len(usize::MAX); + decoder +} + #[cfg(test)] mod tests { use std::io::Cursor; From 17805c572a38b3a2c1cf429b5f8cc94b080103a5 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 19 Sep 2025 16:21:35 +0800 Subject: [PATCH 2/4] fix ut --- src/daemon/db_util.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index ec79b6c80419..224c01aa1122 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -166,6 +166,9 @@ pub async fn import_chain_as_forest_car( } else { // For a local snapshot, we transcode directly instead of copying & transcoding. transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?; + if mode == ImportMode::Move { + std::fs::remove_file(from_path).context("Error removing original file")?; + } } } From 591570353532f1633f83355f586a104d363f9452 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 19 Sep 2025 19:23:46 +0800 Subject: [PATCH 3/4] UviBytes set_max_len to 8GiB --- src/utils/db/car_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 73f6c3e58e61..7ec8ac76c3a3 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -331,7 +331,7 @@ async fn read_v1_header( pub fn uvi_bytes() -> UviBytes { let mut decoder = UviBytes::default(); - decoder.set_max_len(usize::MAX); + decoder.set_max_len(8 * 1024 * 1024 * 1024); // 8GiB decoder } From c64da209f3085fbaffb9031f31b633c907cc8a21 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 19 Sep 2025 20:34:59 +0800 Subject: [PATCH 4/4] MAX_LEN const --- src/utils/db/car_stream.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 7ec8ac76c3a3..c38af112ecda 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -330,8 +330,10 @@ async fn read_v1_header( } pub fn uvi_bytes() -> UviBytes { + // 8GiB + const MAX_LEN: usize = 8 * 1024 * 1024 * 1024; let mut decoder = UviBytes::default(); - decoder.set_max_len(8 * 1024 * 1024 * 1024); // 8GiB + decoder.set_max_len(MAX_LEN); decoder }