diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index 666020166d80..224c01aa1122 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -161,7 +161,15 @@ pub async fn import_chain_as_forest_car( snapshot_progress_tracker.completed(); } else { snapshot_progress_tracker.not_required(); - move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?; + if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open(from_path)?) { + move_or_copy_file(from_path, &downloaded_car_temp_path, mode)?; + } else { + // For a local snapshot, we transcode directly instead of copying & transcoding. + transcode_into_forest_car(from_path, &downloaded_car_temp_path).await?; + if mode == ImportMode::Move { + std::fs::remove_file(from_path).context("Error removing original file")?; + } + } } if ForestCar::is_valid(&EitherMmapOrRandomAccessFile::open( @@ -292,6 +300,11 @@ fn move_or_copy_file(from: &Path, to: &Path, import_mode: ImportMode) -> anyhow: } async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> { + tracing::info!( + from = %from.display(), + to = %to.display(), + "transcoding into forest car" + ); let car_stream = CarStream::new(tokio::io::BufReader::new( tokio::fs::File::open(from).await?, )) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index 7603bed01b6b..17ef3fa0c908 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -51,7 +51,7 @@ use crate::blocks::{Tipset, TipsetKey}; use crate::chain::FilecoinSnapshotMetadata; use crate::db::car::RandomAccessFileReader; use crate::db::car::plain::write_skip_frame_header_async; -use crate::utils::db::car_stream::{CarBlock, CarV1Header}; +use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes}; use crate::utils::encoding::from_slice_with_fallback; use crate::utils::get_size::CidWrapper; use crate::utils::io::EitherMmapOrRandomAccessFile; @@ -74,7 +74,6 @@ use std::{ }; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{Decoder, Encoder as _}; -use unsigned_varint::codec::UviBytes; #[cfg(feature = "benchmark-private")] pub mod index; @@ -157,7 +156,7 @@ impl ForestCar { let cursor = Cursor::new_pos(&reader, 0); let mut header_zstd_frame = decode_zstd_single_frame(cursor)?; - let block_frame = UviBytes::::default() + let block_frame = uvi_bytes() .decode(&mut header_zstd_frame)? .ok_or_else(|| invalid_data("malformed uvibytes"))?; let header = from_slice_with_fallback::(&block_frame) @@ -265,9 +264,7 @@ where let mut zstd_frame = decode_zstd_single_frame(cursor)?; // Parse all key-value pairs and insert them into a map let mut block_map = hashbrown::HashMap::new(); - while let Some(block_frame) = - UviBytes::::default().decode_eof(&mut zstd_frame)? - { + while let Some(block_frame) = uvi_bytes().decode_eof(&mut zstd_frame)? { let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?; block_map.insert(cid.into(), data); } @@ -313,7 +310,7 @@ impl Encoder { let header = CarV1Header { roots, version: 1 }; let mut header_uvi_frame = BytesMut::new(); - UviBytes::default().encode( + uvi_bytes().encode( Bytes::from(fvm_ipld_encoding::to_vec(&header)?), &mut header_uvi_frame, )?; @@ -402,7 +399,11 @@ impl Encoder { } } // Pass errors through - Some(Err(e)) => return Poll::Ready(Some(Err(e))), + Some(Err(e)) => { + return Poll::Ready(Some(Err(anyhow::anyhow!( + "error polling CarBlock from stream: {e}" + )))); + } // Got element, add to encoder and emit block position Some(Ok(block)) => { frame_cids.push(block.cid); diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 99a5c296b448..c38af112ecda 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -172,7 +172,7 @@ impl CarStream { .as_ref() .map(|h| h.data_size as u64) .unwrap_or(u64::MAX); - let mut reader = FramedRead::new(reader.take(max_car_v1_bytes), UviBytes::default()); + let mut reader = FramedRead::new(reader.take(max_car_v1_bytes), uvi_bytes()); let header_v1 = read_v1_header(&mut reader) .await .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid v1 header block"))?; @@ -283,7 +283,7 @@ impl CarWriter { let car_header = CarV1Header { roots, version: 1 }; let mut header_uvi_frame = BytesMut::new(); - UviBytes::default().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?; + uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?; Ok(Self { inner: writer, @@ -329,6 +329,14 @@ async fn read_v1_header( Some(header) } +pub fn uvi_bytes() -> UviBytes { + // 8GiB + const MAX_LEN: usize = 8 * 1024 * 1024 * 1024; + let mut decoder = UviBytes::default(); + decoder.set_max_len(MAX_LEN); + decoder +} + #[cfg(test)] mod tests { use std::io::Cursor;