From 3d48c185ea1437805587795180a7cd5ca629e82a Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 20 Apr 2026 06:08:27 +0800 Subject: [PATCH 1/5] refactor: use file backed CID hashset to reduce memory usage in chain export --- Cargo.lock | 1 + Cargo.toml | 2 +- src/chain/mod.rs | 6 +- src/cid_collections/hash_set.rs | 127 +++++++++++++++++++ src/cid_collections/mod.rs | 14 +- src/dev/subcommands/export_state_tree_cmd.rs | 6 +- src/ipld/util.rs | 46 ++++--- src/tool/subcommands/archive_cmd.rs | 7 +- src/tool/subcommands/benchmark_cmd.rs | 7 +- src/tool/subcommands/snapshot_cmd.rs | 3 +- 10 files changed, 185 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b20de0adc32a..e1dd0794a464 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6771,6 +6771,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6985a45b0597d68448dac9db2907f9f72bbaf63fe3383d4ba15f99096c87212f" dependencies = [ "blake2", + "bytes", "crc32fast", "fs2", "hex", diff --git a/Cargo.toml b/Cargo.toml index 4214a98acd3b..ded16377733a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,7 +169,7 @@ num-traits = "0.2" num_cpus = "1" nunny = { version = "0.2", features = ["serde", "quickcheck", "schemars1"] } openrpc-types = "0.5" -parity-db = { version = "0.5" } +parity-db = { version = "0.5", features = ["bytes"] } parking_lot = { version = "0.12", features = ["deadlock_detection"] } pastey = "0.2" pathfinding = "4" diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 1b70f09ae9a5..63800fd3db5f 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -11,7 +11,7 @@ mod weight; pub use self::{snapshot_format::*, store::*, weight::*}; use crate::blocks::{Tipset, TipsetKey}; -use crate::cid_collections::CidHashSet; +use crate::cid_collections::FileBackedCidHashSet; use crate::db::car::forest::{self, ForestCarFrame, finalize_frame}; use crate::db::{SettingsStore, SettingsStoreExt}; use crate::ipld::stream_chain; @@ -32,13 +32,13 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter}; -#[derive(Debug, Clone, Default)] +#[derive(Default)] pub struct ExportOptions { pub skip_checksum: bool, pub include_receipts: bool, pub include_events: bool, pub include_tipset_keys: bool, - pub seen: CidHashSet, + pub seen: FileBackedCidHashSet, } pub async fn export_from_head( diff --git a/src/cid_collections/hash_set.rs b/src/cid_collections/hash_set.rs index 7195109f9971..97f085d4ea71 100644 --- a/src/cid_collections/hash_set.rs +++ b/src/cid_collections/hash_set.rs @@ -2,10 +2,19 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; +use bytes::Bytes; use cid::Cid; #[cfg(doc)] use std::collections::HashSet; +use std::sync::LazyLock; + +pub trait CidHashSetLike { + /// Adds a value to the set. + /// + /// Returns whether the value was newly inserted. + fn insert(&mut self, cid: Cid) -> bool; +} /// A hash set implemented as a `HashMap` where the value is `()`. /// @@ -56,6 +65,12 @@ impl CidHashSet { } } +impl CidHashSetLike for CidHashSet { + fn insert(&mut self, cid: Cid) -> bool { + self.insert(cid) + } +} + //////////////////// // Collection Ops // //////////////////// @@ -73,3 +88,115 @@ impl FromIterator for CidHashSet { this } } + +/// A file-backed CID hash set. +/// This is intended to be used for large sets of CIDs that may not fit in memory, such as when tracking seen CIDs during a chain export. +pub struct FileBackedCidHashSet { + db: parity_db::Db, + // for dropping the temporary directory when the set is dropped + _dir: tempfile::TempDir, + lru: hashlink::LruCache, +} + +impl Default for FileBackedCidHashSet { + fn default() -> Self { + const MAX_ATTEMPTS: usize = 10; + for _ in 0..MAX_ATTEMPTS { + // temp dir has limitation of 10GiB on some platforms, so we prefer current working directory + // and try multiple times to create one in case we hit that limit + if let Some(dir) = tempfile::tempdir_in(".") + .ok() + .or_else(|| tempfile::tempdir().ok()) + { + let options = parity_db::Options { + path: dir.path().to_path_buf(), + sync_wal: false, + sync_data: false, + stats: false, + salt: None, + columns: vec![ + parity_db::ColumnOptions { + uniform: true, + append_only: true, + ..Default::default() + }, + parity_db::ColumnOptions { + append_only: true, + ..Default::default() + }, + ], + compression_threshold: Default::default(), + }; + if let Ok(db) = parity_db::Db::open_or_create(&options) { + return Self { + db, + _dir: dir, + #[allow(clippy::disallowed_methods)] + lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries + }; + } + } + } + panic!( + "failed to create parity db with a temporary directory after {MAX_ATTEMPTS} attempts" + ); + } +} + +impl CidHashSetLike for FileBackedCidHashSet { + fn insert(&mut self, cid: Cid) -> bool { + static EMPTY_VALUE: LazyLock = LazyLock::new(|| Bytes::from_static(&[])); + + let small = SmallCid::from(cid); + if self.lru.get(&small).is_some() { + return false; + } + + let (col, key) = match &small { + SmallCid::Inline(c) => (0, c.digest().to_vec()), + SmallCid::Indirect(u) => (1, u.inner().to_bytes()), + }; + if self.db.get(col, &key).ok().flatten().is_some() { + self.lru.insert(small, ()); + false + } else { + _ = self + .db + .commit_changes_bytes([(col, parity_db::Operation::Set(key, EMPTY_VALUE.clone()))]); + self.lru.insert(small, ()); + true + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[quickcheck_macros::quickcheck] + fn test_cid_hashset(mut cids: Vec) { + cids.dedup(); + let mut set = CidHashSet::default(); + for cid in cids.iter() { + all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted"); + } + for cid in cids.iter() { + all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set"); + } + } + + #[quickcheck_macros::quickcheck] + fn test_file_backed_cid_hashset(mut cids: Vec) { + cids.dedup(); + let mut set = FileBackedCidHashSet::default(); + let dir = set._dir.path().to_path_buf(); + for cid in cids.iter() { + all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted"); + } + for cid in cids.iter() { + all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set"); + } + drop(set); + all_asserts::assert_false!(dir.exists(), "expected temporary directory to be deleted"); + } +} diff --git a/src/cid_collections/mod.rs b/src/cid_collections/mod.rs index 299b46763011..1d936cc98136 100644 --- a/src/cid_collections/mod.rs +++ b/src/cid_collections/mod.rs @@ -4,9 +4,9 @@ pub mod hash_map; pub mod hash_set; mod small_cid_vec; pub use hash_map::CidHashMap; -pub use hash_set::CidHashSet; +pub use hash_set::{CidHashSet, FileBackedCidHashSet}; use imp::{CidV1DagCborBlake2b256, Uncompactable}; -pub use small_cid_vec::SmallCidNonEmptyVec; +pub use small_cid_vec::{SmallCid, SmallCidNonEmptyVec}; /// The core primitive for saving space in this module. /// @@ -53,6 +53,10 @@ mod imp { impl CidV1DagCborBlake2b256 { const WIDTH: usize = 32; + + pub fn digest(&self) -> &[u8; Self::WIDTH] { + &self.digest + } } #[cfg(test)] @@ -107,6 +111,12 @@ mod imp { inner: Cid, } + impl Uncompactable { + pub fn inner(&self) -> &Cid { + &self.inner + } + } + /// [`Uncompactable`] can only be created through [`MaybeCompactedCid`], since /// that type defines the canonical conversion impl From for Cid { diff --git a/src/dev/subcommands/export_state_tree_cmd.rs b/src/dev/subcommands/export_state_tree_cmd.rs index 7e8803781bb3..a66b1de2f2cc 100644 --- a/src/dev/subcommands/export_state_tree_cmd.rs +++ b/src/dev/subcommands/export_state_tree_cmd.rs @@ -3,12 +3,12 @@ use crate::{ chain::{ChainStore, index::ResolveNullTipset}, + cid_collections::FileBackedCidHashSet, cli_shared::{chain_path, read_config}, daemon::db_util::load_all_forest_cars, db::{ CAR_DB_DIR_NAME, - car::ManyCar, - car::forest::FOREST_CAR_FILE_EXTENSION, + car::{ManyCar, forest::FOREST_CAR_FILE_EXTENSION}, db_engine::{db_root, open_db}, }, genesis::read_genesis_header, @@ -109,7 +109,7 @@ impl ExportStateTreeCommand { ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root())); } let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?]; - let stream = IpldStream::new(db, ipld_roots.clone()); + let stream = IpldStream::<_, FileBackedCidHashSet>::new(db, ipld_roots.clone()); let frames = crate::db::car::forest::Encoder::compress_stream_default(stream); let tmp = tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))? diff --git a/src/ipld/util.rs b/src/ipld/util.rs index f6de6f41a206..6fb04a4a2284 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -3,6 +3,7 @@ use crate::blocks::Tipset; use crate::cid_collections::CidHashSet; +use crate::cid_collections::hash_set::CidHashSetLike; use crate::ipld::Ipld; use crate::shim::clock::ChainEpoch; use crate::shim::executor::Receipt; @@ -145,11 +146,11 @@ enum Task { } pin_project! { - pub struct ChainStream { + pub struct ChainStream { tipset_iter: T, db: DB, dfs: VecDeque, // Depth-first work queue. - seen: CidHashSet, + seen: S, stateroot_limit_exclusive: ChainEpoch, fail_on_dead_links: bool, message_receipts: bool, @@ -159,8 +160,8 @@ pin_project! { } } -impl ChainStream { - pub fn with_seen(mut self, seen: CidHashSet) -> Self { +impl ChainStream { + pub fn with_seen(mut self, seen: S) -> Self { self.seen = seen; self } @@ -194,8 +195,7 @@ impl ChainStream { self } - #[allow(dead_code)] - pub fn into_seen(self) -> CidHashSet { + pub fn into_seen(self) -> S { self.seen } } @@ -211,16 +211,21 @@ impl ChainStream { /// * `stateroot_limit` - An epoch that signifies how far back (exclusive) we need to inspect tipsets, /// in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` /// is the number of `[`Tipset`]` that needs inspection. -pub fn stream_chain, ITER: Iterator + Unpin>( +pub fn stream_chain< + DB: Blockstore, + T: Borrow, + ITER: Iterator + Unpin, + S: CidHashSetLike + Default, +>( db: DB, tipset_iter: ITER, stateroot_limit_exclusive: ChainEpoch, -) -> ChainStream { +) -> ChainStream { ChainStream { tipset_iter, db, dfs: VecDeque::new(), - seen: CidHashSet::default(), + seen: Default::default(), stateroot_limit_exclusive, fail_on_dead_links: true, message_receipts: false, @@ -232,16 +237,21 @@ pub fn stream_chain, ITER: Iterator // Stream available graph in a depth-first search. All reachable nodes are touched and dead-links // are ignored. -pub fn stream_graph, ITER: Iterator + Unpin>( +pub fn stream_graph< + DB: Blockstore, + T: Borrow, + ITER: Iterator + Unpin, + S: CidHashSetLike + Default, +>( db: DB, tipset_iter: ITER, stateroot_limit_exclusive: ChainEpoch, -) -> ChainStream { +) -> ChainStream { stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false) } -impl, ITER: Iterator + Unpin> Stream - for ChainStream +impl, ITER: Iterator + Unpin, S: CidHashSetLike> Stream + for ChainStream { type Item = anyhow::Result; @@ -412,24 +422,24 @@ impl, ITER: Iterator + Unpin> Stream } pin_project! { - pub struct IpldStream { + pub struct IpldStream { db: DB, cid_vec: Vec, - seen: CidHashSet, + seen: S, } } -impl IpldStream { +impl IpldStream { pub fn new(db: DB, roots: Vec) -> Self { Self { db, cid_vec: roots, - seen: CidHashSet::default(), + seen: S::default(), } } } -impl Stream for IpldStream { +impl Stream for IpldStream { type Item = anyhow::Result; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 51174e9190fa..368d0ea0c55c 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -32,6 +32,7 @@ use crate::chain::{ index::{ChainIndex, ResolveNullTipset}, }; use crate::cid_collections::CidHashSet; +use crate::cid_collections::FileBackedCidHashSet; use crate::cli_shared::{snapshot, snapshot::TrustedVendor}; use crate::daemon::bundle::load_actor_bundles; use crate::db::car::{AnyCar, ManyCar, forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL}; @@ -589,7 +590,7 @@ pub async fn do_export( .context("diff epoch must be smaller than target epoch")?; let diff_ts: &Tipset = &diff_ts; let diff_limit = diff_depth.map(|depth| diff_ts.epoch() - depth).unwrap_or(0); - let mut stream = stream_chain( + let mut stream = stream_chain::<_, _, _, FileBackedCidHashSet>( store.clone(), diff_ts.clone().chain_owned(store.clone()), diff_limit, @@ -597,7 +598,7 @@ pub async fn do_export( while stream.try_next().await?.is_some() {} stream.into_seen() } else { - CidHashSet::default() + Default::default() }; let output_path = build_output_path(network.to_string(), genesis.timestamp, epoch, output_path); @@ -696,7 +697,7 @@ async fn merge_snapshots( )?); // Stream all available blocks from heaviest_tipset to genesis. - let blocks = stream_graph(&store, heaviest_tipset.chain(&store), 0); + let blocks = stream_graph::<_, _, _, CidHashSet>(&store, heaviest_tipset.chain(&store), 0); // Encode Ipld key-value pairs in zstd frames let frames = forest::Encoder::compress_stream_default(blocks); diff --git a/src/tool/subcommands/benchmark_cmd.rs b/src/tool/subcommands/benchmark_cmd.rs index 8ad2f6a09013..8de6a4f15477 100644 --- a/src/tool/subcommands/benchmark_cmd.rs +++ b/src/tool/subcommands/benchmark_cmd.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::blocks::{Tipset, TipsetKey}; +use crate::cid_collections::CidHashSet; use crate::db::car::ManyCar; use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE; use crate::ipld::{stream_chain, stream_graph}; @@ -176,7 +177,7 @@ async fn benchmark_graph_traversal(input: Vec) -> anyhow::Result<()> { let mut sink = indicatif_sink("traversed"); - let mut s = stream_graph(&store, heaviest.chain(&store), 0); + let mut s = stream_graph::<_, _, _, CidHashSet>(&store, heaviest.chain(&store), 0); while let Some(block) = s.try_next().await? { sink.write_all(&block.data).await? } @@ -234,7 +235,7 @@ async fn benchmark_exporting( let mut dest = indicatif_sink("exported"); - let blocks = stream_chain( + let blocks = stream_chain::<_, _, _, CidHashSet>( Arc::clone(&store), ts.clone().chain_owned(Arc::clone(&store)), stateroot_lookup_limit, @@ -299,7 +300,7 @@ async fn benchmark_blockstore_traversal( let head = Tipset::load_required(bs, head_tsk)?; let mut sink = indicatif_sink("traversed"); let start = Instant::now(); - let mut s = stream_graph(bs, head.chain(bs), 0); + let mut s = stream_graph::<_, _, _, CidHashSet>(bs, head.chain(bs), 0); let mut n = 0; while let Some(block) = s.try_next().await? { sink.write_all(&block.data).await?; diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index b84b98545504..c7cf7366c7c6 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -4,6 +4,7 @@ use super::*; use crate::blocks::Tipset; use crate::chain::index::{ChainIndex, ResolveNullTipset}; +use crate::cid_collections::CidHashSet; use crate::cli_shared::snapshot; use crate::daemon::bundle::load_actor_bundles; use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE; @@ -347,7 +348,7 @@ where pb.set_message(format!("{height} remaining epochs (spine)")); } }); - let mut stream = stream_chain(&db, tipsets, epoch_limit); + let mut stream = stream_chain::<_, _, _, CidHashSet>(&db, tipsets, epoch_limit); while stream.try_next().await?.is_some() {} pb.finish_with_message("✅ verified!"); From 69d5b5df9e0b513031d297f36068517206b6031b Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 20 Apr 2026 20:51:14 +0800 Subject: [PATCH 2/5] resolve comments --- CHANGELOG.md | 2 + docs/docs/users/guides/gc.md | 4 +- src/chain/mod.rs | 49 ++++++----- src/chain/tests.rs | 20 ++++- src/cid_collections/hash_set.rs | 86 +++++++++---------- src/cid_collections/mod.rs | 2 +- src/daemon/context.rs | 4 + src/daemon/mod.rs | 2 + src/db/gc/snapshot.rs | 27 +++--- src/dev/subcommands/export_state_tree_cmd.rs | 2 +- src/ipld/util.rs | 24 +++--- src/rpc/methods/chain.rs | 19 ++-- src/rpc/methods/sync.rs | 1 + src/rpc/mod.rs | 1 + src/tool/offline_server/server.rs | 1 + .../api_cmd/generate_test_snapshot.rs | 1 + src/tool/subcommands/api_cmd/test_snapshot.rs | 1 + src/tool/subcommands/archive_cmd.rs | 18 ++-- src/tool/subcommands/benchmark_cmd.rs | 7 +- src/tool/subcommands/snapshot_cmd.rs | 2 +- src/utils/db/car_stream/tests.rs | 40 +++++++-- 21 files changed, 192 insertions(+), 121 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 145ac01a96a2..b96f5a390c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ ### Changed +- [#6939](https://github.com/ChainSafe/forest/pull/6939): Refactored snapshot export and garbage collection logic to use disk-backed hash set for de-de-duplicating reachable blocks. This results in less RAM usage (~6-7GiB) and more disk usage (~7-8GiB on mainnet). + ### Removed ### Fixed diff --git a/docs/docs/users/guides/gc.md b/docs/docs/users/guides/gc.md index 2aa2599ea38f..8572389a7340 100644 --- a/docs/docs/users/guides/gc.md +++ b/docs/docs/users/guides/gc.md @@ -54,10 +54,10 @@ Always remember to enable GC when moving back to production or long-term testing During the GC process, Forest consumes extra RAM and disk space temporarily: -- While traversing reachable blocks, it uses 32 bytes of RAM per reachable block. +- While traversing reachable blocks, it uses ~80MiB of RAM and ~8GiB disk space on mainnet (and ~2GiB on calibnet) for de-duplicating reachable blocks. - While exporting a lite snapshot, it uses extra disk space before cleaning up parity-db and stale CAR snapshots. -For a typical ~80 GiB mainnet snapshot, this results in ~2.5 GiB of additional RAM and ~80 GiB disk space usage. +For a typical ~80 GiB mainnet snapshot, this results in ~80 MiB of additional RAM and ~90 GiB disk space usage. ### Syncing Pauses or Performance Overheads diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 63800fd3db5f..42bd8c3d03f5 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -11,7 +11,7 @@ mod weight; pub use self::{snapshot_format::*, store::*, weight::*}; use crate::blocks::{Tipset, TipsetKey}; -use crate::cid_collections::FileBackedCidHashSet; +use crate::cid_collections::CidHashSetLike; use crate::db::car::forest::{self, ForestCarFrame, finalize_frame}; use crate::db::{SettingsStore, SettingsStoreExt}; use crate::ipld::stream_chain; @@ -32,50 +32,61 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter}; -#[derive(Default)] -pub struct ExportOptions { +pub struct ExportOptions { pub skip_checksum: bool, pub include_receipts: bool, pub include_events: bool, pub include_tipset_keys: bool, - pub seen: FileBackedCidHashSet, + pub seen: S, } -pub async fn export_from_head( +impl Default for ExportOptions { + fn default() -> Self { + Self { + skip_checksum: Default::default(), + include_receipts: Default::default(), + include_events: Default::default(), + include_tipset_keys: Default::default(), + seen: Default::default(), + } + } +} + +pub async fn export_from_head( db: &Arc, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, - options: Option, + options: ExportOptions, ) -> anyhow::Result<(Tipset, Option>)> { let head_key = SettingsStoreExt::read_obj::(db, crate::db::setting_keys::HEAD_KEY)? .context("chain head key not found")?; let head_ts = Tipset::load_required(&db, &head_key)?; - let digest = export::(db, &head_ts, lookup_depth, writer, options).await?; + let digest = export::(db, &head_ts, lookup_depth, writer, options).await?; Ok((head_ts, digest)) } /// Exports a Filecoin snapshot in v1 format /// See -pub async fn export( +pub async fn export( db: &Arc, tipset: &Tipset, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, - options: Option, + options: ExportOptions, ) -> anyhow::Result>> { let roots = tipset.key().to_cids(); - export_to_forest_car::(roots, None, db, tipset, lookup_depth, writer, options).await + export_to_forest_car::(roots, None, db, tipset, lookup_depth, writer, options).await } /// Exports a Filecoin snapshot in v2 format /// See -pub async fn export_v2( +pub async fn export_v2( db: &Arc, mut f3: Option<(Cid, F)>, tipset: &Tipset, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, - options: Option, + options: ExportOptions, ) -> anyhow::Result>> { // validate f3 data if let Some((f3_cid, f3_data)) = &mut f3 { @@ -121,7 +132,7 @@ pub async fn export_v2( }); } - export_to_forest_car::( + export_to_forest_car::( roots, Some(prefix_data_frames), db, @@ -134,23 +145,21 @@ pub async fn export_v2( } #[allow(clippy::too_many_arguments)] -async fn export_to_forest_car( +async fn export_to_forest_car( roots: NonEmpty, prefix_data_frames: Option>>, db: &Arc, tipset: &Tipset, lookup_depth: ChainEpochDelta, writer: impl AsyncWrite + Unpin, - options: Option, -) -> anyhow::Result>> { - let ExportOptions { + ExportOptions { skip_checksum, include_receipts, include_events, include_tipset_keys, seen, - } = options.unwrap_or_default(); - + }: ExportOptions, +) -> anyhow::Result>> { if include_events && !include_receipts { anyhow::bail!("message receipts must be included when events are included"); } @@ -171,8 +180,8 @@ async fn export_to_forest_car( db.shallow_clone(), tipset.shallow_clone().chain_owned(db.shallow_clone()), stateroot_lookup_limit, + seen, ) - .with_seen(seen) .with_message_receipts(include_receipts) .with_events(include_events) .with_tipset_keys(include_tipset_keys) diff --git a/src/chain/tests.rs b/src/chain/tests.rs index d3350e516e03..3bdcf051957b 100644 --- a/src/chain/tests.rs +++ b/src/chain/tests.rs @@ -4,6 +4,7 @@ use super::*; use crate::{ blocks::{CachingBlockHeader, Chain4U, Tipset, TipsetKey, chain4u}, + cid_collections::CidHashSet, db::{MemoryDB, car::ForestCar}, utils::db::CborStoreExt, }; @@ -60,10 +61,25 @@ async fn test_export_inner(version: FilecoinSnapshotVersion) -> anyhow::Result<( let checksum = match version { FilecoinSnapshotVersion::V1 => { - export::(&db, &head, 0, &mut car_bytes, None).await? + export::( + &db, + &head, + 0, + &mut car_bytes, + ExportOptions::::default(), + ) + .await? } FilecoinSnapshotVersion::V2 => { - export_v2::(&db, None, &head, 0, &mut car_bytes, None).await? + export_v2::( + &db, + None, + &head, + 0, + &mut car_bytes, + ExportOptions::::default(), + ) + .await? } }; diff --git a/src/cid_collections/hash_set.rs b/src/cid_collections/hash_set.rs index 97f085d4ea71..dad06d31edbf 100644 --- a/src/cid_collections/hash_set.rs +++ b/src/cid_collections/hash_set.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; +use anyhow::Context as _; use bytes::Bytes; use cid::Cid; #[cfg(doc)] use std::collections::HashSet; -use std::sync::LazyLock; +use std::{path::Path, sync::LazyLock}; pub trait CidHashSetLike { /// Adds a value to the set. @@ -98,48 +99,45 @@ pub struct FileBackedCidHashSet { lru: hashlink::LruCache, } -impl Default for FileBackedCidHashSet { - fn default() -> Self { - const MAX_ATTEMPTS: usize = 10; - for _ in 0..MAX_ATTEMPTS { - // temp dir has limitation of 10GiB on some platforms, so we prefer current working directory - // and try multiple times to create one in case we hit that limit - if let Some(dir) = tempfile::tempdir_in(".") - .ok() - .or_else(|| tempfile::tempdir().ok()) - { - let options = parity_db::Options { - path: dir.path().to_path_buf(), - sync_wal: false, - sync_data: false, - stats: false, - salt: None, - columns: vec![ - parity_db::ColumnOptions { - uniform: true, - append_only: true, - ..Default::default() - }, - parity_db::ColumnOptions { - append_only: true, - ..Default::default() - }, - ], - compression_threshold: Default::default(), - }; - if let Ok(db) = parity_db::Db::open_or_create(&options) { - return Self { - db, - _dir: dir, - #[allow(clippy::disallowed_methods)] - lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries - }; - } - } - } - panic!( - "failed to create parity db with a temporary directory after {MAX_ATTEMPTS} attempts" - ); +impl FileBackedCidHashSet { + pub fn new(temp_dir_root: impl AsRef) -> anyhow::Result { + let dir = tempfile::tempdir_in(temp_dir_root.as_ref()).with_context(|| { + format!( + "failed to create temp dir in {}", + temp_dir_root.as_ref().display(), + ) + })?; + let options = parity_db::Options { + path: dir.path().to_path_buf(), + sync_wal: false, + sync_data: false, + stats: false, + salt: None, + columns: vec![ + parity_db::ColumnOptions { + uniform: true, + append_only: true, + ..Default::default() + }, + parity_db::ColumnOptions { + append_only: true, + ..Default::default() + }, + ], + compression_threshold: Default::default(), + }; + let db = parity_db::Db::open_or_create(&options).with_context(|| { + format!( + "failed to create temp parity-db at {}", + options.path.display() + ) + })?; + Ok(Self { + db, + _dir: dir, + #[allow(clippy::disallowed_methods)] + lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries + }) } } @@ -188,7 +186,7 @@ mod tests { #[quickcheck_macros::quickcheck] fn test_file_backed_cid_hashset(mut cids: Vec) { cids.dedup(); - let mut set = FileBackedCidHashSet::default(); + let mut set = FileBackedCidHashSet::new(std::env::temp_dir()).unwrap(); let dir = set._dir.path().to_path_buf(); for cid in cids.iter() { all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted"); diff --git a/src/cid_collections/mod.rs b/src/cid_collections/mod.rs index d670cbb12a04..a7993d73f3f8 100644 --- a/src/cid_collections/mod.rs +++ b/src/cid_collections/mod.rs @@ -4,7 +4,7 @@ pub mod hash_map; pub mod hash_set; mod small_cid_vec; pub use hash_map::CidHashMap; -pub use hash_set::{CidHashSet, FileBackedCidHashSet}; +pub use hash_set::{CidHashSet, CidHashSetLike, FileBackedCidHashSet}; use imp::{CidV1DagCborBlake2b256, Uncompactable}; pub use small_cid_vec::{SmallCid, SmallCidNonEmptyVec}; diff --git a/src/daemon/context.rs b/src/daemon/context.rs index d3fbc9ac946d..345b046ae3fd 100644 --- a/src/daemon/context.rs +++ b/src/daemon/context.rs @@ -40,6 +40,7 @@ pub struct AppContext { pub keystore: Arc>, pub admin_jwt: String, pub snapshot_progress_tracker: SnapshotProgressTracker, + pub temp_dir: std::path::PathBuf, } impl AppContext { @@ -50,6 +51,8 @@ impl AppContext { let state_manager = create_state_manager(cfg, &db, &chain_cfg).await?; let (keystore, admin_jwt) = load_or_create_keystore_and_configure_jwt(opts, cfg).await?; let snapshot_progress_tracker = SnapshotProgressTracker::default(); + let temp_dir = chain_path(cfg).join("tmp"); + std::fs::create_dir_all(&temp_dir).context("Failed to create temporary directory")?; Ok(Self { net_keypair, p2p_peer_id, @@ -59,6 +62,7 @@ impl AppContext { keystore, admin_jwt, snapshot_progress_tracker, + temp_dir, }) } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 7d8f0ae6c1a4..3ccffecf9595 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -469,6 +469,7 @@ fn maybe_start_rpc_service( let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone(); let nonce_tracker = NonceTracker::new(); let mpool_locker = MpoolLocker::new(); + let temp_dir = Arc::new(ctx.temp_dir.clone()); async move { let rpc_listener = tokio::net::TcpListener::bind(rpc_address) .await @@ -491,6 +492,7 @@ fn maybe_start_rpc_service( snapshot_progress_tracker, mpool_locker, nonce_tracker, + temp_dir, }, rpc_listener, rpc_stop_handle, diff --git a/src/db/gc/snapshot.rs b/src/db/gc/snapshot.rs index bc14dc4fcf24..5b27b74242ef 100644 --- a/src/db/gc/snapshot.rs +++ b/src/db/gc/snapshot.rs @@ -18,14 +18,14 @@ //! ## Correctness //! The algorithm assumes that a Forest node can always be bootstrapped with the most recent standard lite snapshot. //! -//! ## Disk usage -//! The algorithm requires extra disk space of the size of a most recent standard lite -//! snapshot(`~72 GiB` as of writing at epoch 4937270 on mainnet). +//! ### RAM/Disk Usage Spikes //! -//! ## Memory usage -//! During the lite snapshot export stage, the algorithm at least `32 bytes` of memory for each reachable block -//! while traversing the reachable graph. For a typical mainnet snapshot of about 100 GiB that adds up to -//! roughly 2.5 GiB. +//! During the GC process, Forest consumes extra RAM and disk space temporarily: +//! +//! - While traversing reachable blocks, it uses ~80MiB of RAM and ~8GiB disk space on mainnet (and ~2GiB on calibnet) for de-duplicating reachable blocks. +//! - While exporting a lite snapshot, it uses extra disk space before cleaning up parity-db and stale CAR snapshots. +//! +//! For a typical ~80 GiB mainnet snapshot, this results in ~80 MiB of additional RAM and ~90 GiB disk space usage. //! //! ## Scheduling //! When automatic GC is enabled, it by default runs every 7 days (20160 epochs). @@ -39,6 +39,7 @@ use crate::blocks::{Tipset, TipsetKey}; use crate::chain::{ChainStore, ExportOptions}; use crate::chain_sync::ChainFollower; +use crate::cid_collections::FileBackedCidHashSet; use crate::cli_shared::chain_path; use crate::db::{ BlockstoreWriteOpsSubscribable, CAR_DB_DIR_NAME, HeaviestTipsetKeyProvider, SettingsStore, @@ -65,6 +66,7 @@ use std::{ use tokio::task::JoinSet; pub struct SnapshotGarbageCollector { + chain_tmp_root: PathBuf, car_db_dir: PathBuf, recent_state_roots: i64, running: AtomicBool, @@ -96,6 +98,8 @@ where config: &crate::Config, ) -> anyhow::Result { let chain_data_path = chain_path(config); + let chain_tmp_root = chain_data_path.join("tmp"); + std::fs::create_dir_all(&chain_tmp_root)?; let db_root_dir = db_root(&chain_data_path)?; let car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME); let recent_state_roots = std::env::var("FOREST_SNAPSHOT_GC_KEEP_STATE_TREE_EPOCHS") @@ -114,6 +118,7 @@ where .unwrap_or(config.sync.recent_state_roots); let (trigger_tx, trigger_rx) = flume::bounded(1); Ok(Self { + chain_tmp_root, car_db_dir, recent_state_roots, running: AtomicBool::new(false), @@ -248,17 +253,17 @@ where map }); let start = Instant::now(); - let (head_ts, _) = crate::chain::export_from_head::( + let (head_ts, _) = crate::chain::export_from_head::( db, self.recent_state_roots, file, - Some(ExportOptions { + ExportOptions { skip_checksum: true, include_receipts: true, include_events: true, include_tipset_keys: true, - seen: Default::default(), - }), + seen: FileBackedCidHashSet::new(&self.chain_tmp_root)?, + }, ) .await?; let target_path = self.car_db_dir.join(format!( diff --git a/src/dev/subcommands/export_state_tree_cmd.rs b/src/dev/subcommands/export_state_tree_cmd.rs index a66b1de2f2cc..9262200d214c 100644 --- a/src/dev/subcommands/export_state_tree_cmd.rs +++ b/src/dev/subcommands/export_state_tree_cmd.rs @@ -109,7 +109,7 @@ impl ExportStateTreeCommand { ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root())); } let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?]; - let stream = IpldStream::<_, FileBackedCidHashSet>::new(db, ipld_roots.clone()); + let stream = IpldStream::new(db, ipld_roots.clone(), FileBackedCidHashSet::new(".")?); let frames = crate::db::car::forest::Encoder::compress_stream_default(stream); let tmp = tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))? diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 6fb04a4a2284..4b804f642f5e 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -2,8 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::blocks::Tipset; -use crate::cid_collections::CidHashSet; -use crate::cid_collections::hash_set::CidHashSetLike; +use crate::cid_collections::{CidHashSet, CidHashSetLike}; use crate::ipld::Ipld; use crate::shim::clock::ChainEpoch; use crate::shim::executor::Receipt; @@ -161,11 +160,6 @@ pin_project! { } impl ChainStream { - pub fn with_seen(mut self, seen: S) -> Self { - self.seen = seen; - self - } - pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self { self.fail_on_dead_links = fail_on_dead_links; self @@ -215,17 +209,18 @@ pub fn stream_chain< DB: Blockstore, T: Borrow, ITER: Iterator + Unpin, - S: CidHashSetLike + Default, + S: CidHashSetLike, >( db: DB, tipset_iter: ITER, stateroot_limit_exclusive: ChainEpoch, + seen: S, ) -> ChainStream { ChainStream { tipset_iter, db, dfs: VecDeque::new(), - seen: Default::default(), + seen, stateroot_limit_exclusive, fail_on_dead_links: true, message_receipts: false, @@ -241,13 +236,14 @@ pub fn stream_graph< DB: Blockstore, T: Borrow, ITER: Iterator + Unpin, - S: CidHashSetLike + Default, + S: CidHashSetLike, >( db: DB, tipset_iter: ITER, stateroot_limit_exclusive: ChainEpoch, + seen: S, ) -> ChainStream { - stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false) + stream_chain(db, tipset_iter, stateroot_limit_exclusive, seen).fail_on_dead_links(false) } impl, ITER: Iterator + Unpin, S: CidHashSetLike> Stream @@ -429,12 +425,12 @@ pin_project! { } } -impl IpldStream { - pub fn new(db: DB, roots: Vec) -> Self { +impl IpldStream { + pub fn new(db: DB, roots: Vec, seen: S) -> Self { Self { db, cid_vec: roots, - seen: S::default(), + seen, } } } diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 1fe0979ab57c..c5d52a3d1c91 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -10,7 +10,7 @@ use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey}; use crate::chain::index::ResolveNullTipset; use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange}; use crate::chain_sync::{get_full_tipset, load_full_tipset}; -use crate::cid_collections::CidHashSet; +use crate::cid_collections::{CidHashSet, FileBackedCidHashSet}; use crate::ipld::DfsIter; use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export}; use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; @@ -398,13 +398,13 @@ impl RpcMethod<1> for ForestChainExport { ctx.chain_index() .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?; - let options = Some(ExportOptions { + let options = ExportOptions { skip_checksum, include_receipts, include_events, include_tipset_keys, - seen: Default::default(), - }); + seen: FileBackedCidHashSet::new(ctx.temp_dir.as_path())?, + }; let writer = if dry_run { tokio_util::either::Either::Left(VoidAsyncWriter) } else { @@ -414,8 +414,13 @@ impl RpcMethod<1> for ForestChainExport { FilecoinSnapshotVersion::V1 => { let db = ctx.store_owned(); - let chain_export = - crate::chain::export::(&db, &start_ts, recent_roots, writer, options); + let chain_export = crate::chain::export::( + &db, + &start_ts, + recent_roots, + writer, + options, + ); tokio::select! { result = chain_export => { @@ -453,7 +458,7 @@ impl RpcMethod<1> for ForestChainExport { } }; - let chain_export = crate::chain::export_v2::( + let chain_export = crate::chain::export_v2::( &db, f3_snap, &start_ts, diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 1c3473a90dba..218e863d7067 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -246,6 +246,7 @@ mod tests { snapshot_progress_tracker: Default::default(), mpool_locker: MpoolLocker::new(), nonce_tracker, + temp_dir: Arc::new(std::env::temp_dir()), }); (state, network_rx) } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 5534bada4e13..1eaff397b9cb 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -485,6 +485,7 @@ pub struct RPCState { pub shutdown: mpsc::Sender<()>, pub mpool_locker: crate::message_pool::MpoolLocker, pub nonce_tracker: crate::message_pool::NonceTracker, + pub temp_dir: Arc, } impl RPCState { diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index a9ff58bab57f..4d99a6c6eba0 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -130,6 +130,7 @@ where snapshot_progress_tracker: Default::default(), mpool_locker: MpoolLocker::new(), nonce_tracker, + temp_dir: Arc::new(std::env::temp_dir()), }, shutdown_recv, )) diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index ca5dd59e8f92..88bb46a9d255 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -158,6 +158,7 @@ async fn ctx( snapshot_progress_tracker: Default::default(), mpool_locker: MpoolLocker::new(), nonce_tracker, + temp_dir: Arc::new(std::env::temp_dir()), }); Ok((rpc_state, network_rx, shutdown_recv)) } diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index af9a25b663fe..5d745f2d2af7 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -176,6 +176,7 @@ async fn ctx( snapshot_progress_tracker: Default::default(), mpool_locker: MpoolLocker::new(), nonce_tracker, + temp_dir: Arc::new(std::env::temp_dir()), }); Ok((rpc_state, network_rx, shutdown_recv)) } diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 368d0ea0c55c..a33d0bdc453e 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -590,15 +590,16 @@ pub async fn do_export( .context("diff epoch must be smaller than target epoch")?; let diff_ts: &Tipset = &diff_ts; let diff_limit = diff_depth.map(|depth| diff_ts.epoch() - depth).unwrap_or(0); - let mut stream = stream_chain::<_, _, _, FileBackedCidHashSet>( + let mut stream = stream_chain( store.clone(), diff_ts.clone().chain_owned(store.clone()), diff_limit, + FileBackedCidHashSet::new(".")?, ); while stream.try_next().await?.is_some() {} stream.into_seen() } else { - Default::default() + FileBackedCidHashSet::new(".")? }; let output_path = build_output_path(network.to_string(), genesis.timestamp, epoch, output_path); @@ -641,18 +642,18 @@ pub async fn do_export( pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1)); let writer = pb.wrap_async_write(writer); - crate::chain::export::( + crate::chain::export::( store, &ts, depth, writer, - Some(ExportOptions { + ExportOptions { skip_checksum: true, include_receipts: false, include_events: false, include_tipset_keys: false, seen, - }), + }, ) .await?; @@ -697,7 +698,12 @@ async fn merge_snapshots( )?); // Stream all available blocks from heaviest_tipset to genesis. - let blocks = stream_graph::<_, _, _, CidHashSet>(&store, heaviest_tipset.chain(&store), 0); + let blocks = stream_graph( + &store, + heaviest_tipset.chain(&store), + 0, + CidHashSet::default(), + ); // Encode Ipld key-value pairs in zstd frames let frames = forest::Encoder::compress_stream_default(blocks); diff --git a/src/tool/subcommands/benchmark_cmd.rs b/src/tool/subcommands/benchmark_cmd.rs index 8de6a4f15477..57bf6ed2624a 100644 --- a/src/tool/subcommands/benchmark_cmd.rs +++ b/src/tool/subcommands/benchmark_cmd.rs @@ -177,7 +177,7 @@ async fn benchmark_graph_traversal(input: Vec) -> anyhow::Result<()> { let mut sink = indicatif_sink("traversed"); - let mut s = stream_graph::<_, _, _, CidHashSet>(&store, heaviest.chain(&store), 0); + let mut s = stream_graph(&store, heaviest.chain(&store), 0, CidHashSet::default()); while let Some(block) = s.try_next().await? { sink.write_all(&block.data).await? } @@ -235,10 +235,11 @@ async fn benchmark_exporting( let mut dest = indicatif_sink("exported"); - let blocks = stream_chain::<_, _, _, CidHashSet>( + let blocks = stream_chain( Arc::clone(&store), ts.clone().chain_owned(Arc::clone(&store)), stateroot_lookup_limit, + CidHashSet::default(), ); let frames = crate::db::car::forest::Encoder::compress_stream( @@ -300,7 +301,7 @@ async fn benchmark_blockstore_traversal( let head = Tipset::load_required(bs, head_tsk)?; let mut sink = indicatif_sink("traversed"); let start = Instant::now(); - let mut s = stream_graph::<_, _, _, CidHashSet>(bs, head.chain(bs), 0); + let mut s = stream_graph(bs, head.chain(bs), 0, CidHashSet::default()); let mut n = 0; while let Some(block) = s.try_next().await? { sink.write_all(&block.data).await?; diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index c7cf7366c7c6..8a0815f79170 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -348,7 +348,7 @@ where pb.set_message(format!("{height} remaining epochs (spine)")); } }); - let mut stream = stream_chain::<_, _, _, CidHashSet>(&db, tipsets, epoch_limit); + let mut stream = stream_chain(&db, tipsets, epoch_limit, CidHashSet::default()); while stream.try_next().await?.is_some() {} pb.finish_with_message("✅ verified!"); diff --git a/src/utils/db/car_stream/tests.rs b/src/utils/db/car_stream/tests.rs index 56aa0395de49..a51634af7450 100644 --- a/src/utils/db/car_stream/tests.rs +++ b/src/utils/db/car_stream/tests.rs @@ -4,6 +4,8 @@ use super::*; use crate::{ blocks::*, + chain::ExportOptions, + cid_collections::CidHashSet, db::MemoryDB, networks::{calibnet, mainnet}, utils::rand::forest_rng, @@ -100,9 +102,15 @@ async fn stream_snapshot_parity() { let stream_v1 = { let mut snap_bytes: Vec = vec![]; - crate::chain::export::(&db, &head, 0, &mut snap_bytes, None) - .await - .unwrap(); + crate::chain::export::( + &db, + &head, + 0, + &mut snap_bytes, + ExportOptions::::default(), + ) + .await + .unwrap(); CarStream::new_with_header_v2(Cursor::new(snap_bytes), None) .await .unwrap() @@ -114,9 +122,16 @@ async fn stream_snapshot_parity() { let stream_v2 = { let mut snap_bytes: Vec = vec![]; - crate::chain::export_v2::(&db, None, &head, 0, &mut snap_bytes, None) - .await - .unwrap(); + crate::chain::export_v2::( + &db, + None, + &head, + 0, + &mut snap_bytes, + ExportOptions::::default(), + ) + .await + .unwrap(); CarStream::new_with_header_v2(Cursor::new(snap_bytes), None) .await .unwrap() @@ -132,9 +147,16 @@ async fn stream_snapshot_parity() { let cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut data.as_slice()).unwrap(); Some((cid, Cursor::new(data))) }; - crate::chain::export_v2::(&db, f3, &head, 0, &mut snap_bytes, None) - .await - .unwrap(); + crate::chain::export_v2::( + &db, + f3, + &head, + 0, + &mut snap_bytes, + ExportOptions::::default(), + ) + .await + .unwrap(); CarStream::new_with_header_v2(Cursor::new(snap_bytes), None) .await .unwrap() From d0709310f3b6e8207a1bd986884483642b8d5c97 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 21 Apr 2026 10:26:07 +0800 Subject: [PATCH 3/5] resolve AI comments --- src/cid_collections/hash_set.rs | 38 +++++++++++++++++++-------------- src/ipld/util.rs | 6 +++--- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/cid_collections/hash_set.rs b/src/cid_collections/hash_set.rs index dad06d31edbf..aaf450489e8d 100644 --- a/src/cid_collections/hash_set.rs +++ b/src/cid_collections/hash_set.rs @@ -14,7 +14,7 @@ pub trait CidHashSetLike { /// Adds a value to the set. /// /// Returns whether the value was newly inserted. - fn insert(&mut self, cid: Cid) -> bool; + fn insert(&mut self, cid: Cid) -> anyhow::Result; } /// A hash set implemented as a `HashMap` where the value is `()`. @@ -67,8 +67,8 @@ impl CidHashSet { } impl CidHashSetLike for CidHashSet { - fn insert(&mut self, cid: Cid) -> bool { - self.insert(cid) + fn insert(&mut self, cid: Cid) -> anyhow::Result { + Ok(self.insert(cid)) } } @@ -142,12 +142,12 @@ impl FileBackedCidHashSet { } impl CidHashSetLike for FileBackedCidHashSet { - fn insert(&mut self, cid: Cid) -> bool { + fn insert(&mut self, cid: Cid) -> anyhow::Result { static EMPTY_VALUE: LazyLock = LazyLock::new(|| Bytes::from_static(&[])); let small = SmallCid::from(cid); if self.lru.get(&small).is_some() { - return false; + return Ok(false); } let (col, key) = match &small { @@ -156,13 +156,14 @@ impl CidHashSetLike for FileBackedCidHashSet { }; if self.db.get(col, &key).ok().flatten().is_some() { self.lru.insert(small, ()); - false + Ok(false) } else { - _ = self - .db - .commit_changes_bytes([(col, parity_db::Operation::Set(key, EMPTY_VALUE.clone()))]); + self.db.commit_changes_bytes([( + col, + parity_db::Operation::Set(key, EMPTY_VALUE.clone()), + )])?; self.lru.insert(small, ()); - true + Ok(true) } } } @@ -170,10 +171,10 @@ impl CidHashSetLike for FileBackedCidHashSet { #[cfg(test)] mod tests { use super::*; + use ahash::HashSet; #[quickcheck_macros::quickcheck] - fn test_cid_hashset(mut cids: Vec) { - cids.dedup(); + fn test_cid_hashset(cids: HashSet) { let mut set = CidHashSet::default(); for cid in cids.iter() { all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted"); @@ -184,15 +185,20 @@ mod tests { } #[quickcheck_macros::quickcheck] - fn test_file_backed_cid_hashset(mut cids: Vec) { - cids.dedup(); + fn test_file_backed_cid_hashset(cids: HashSet) { let mut set = FileBackedCidHashSet::new(std::env::temp_dir()).unwrap(); let dir = set._dir.path().to_path_buf(); for cid in cids.iter() { - all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted"); + all_asserts::assert_true!( + set.insert(*cid).unwrap(), + "expected CID to be newly inserted" + ); } for cid in cids.iter() { - all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set"); + all_asserts::assert_false!( + set.insert(*cid).unwrap(), + "expected CID to be present in the set" + ); } drop(set); all_asserts::assert_false!(dir.exists(), "expected temporary directory to be deleted"); diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 4b804f642f5e..b7ce9ce37ba4 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -285,7 +285,7 @@ impl, ITER: Iterator + Unpin, S: Cid // 2. IPLD_RAW: WASM blocks, for example. Need to be loaded, but not traversed. // 3. _: ignore all other links // Don't revisit what's already been visited. - if should_save_block_to_snapshot(cid) && this.seen.insert(cid) { + if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? { if let Some(data) = this.db.get(&cid)? { if cid.codec() == fvm_ipld_encoding::DAG_CBOR { let new_values = extract_cids(&data)?; @@ -339,7 +339,7 @@ impl, ITER: Iterator + Unpin, S: Cid for block in tipset.borrow().block_headers() { let (cid, data) = block.car_block()?; - if this.seen.insert(cid) { + if this.seen.insert(cid)? { if *this.track_progress { update_epoch(block.epoch); } @@ -441,7 +441,7 @@ impl Stream for IpldStream { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { let this = self.project(); while let Some(cid) = this.cid_vec.pop() { - if should_save_block_to_snapshot(cid) && this.seen.insert(cid) { + if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? { if let Some(data) = this.db.get(&cid)? { if cid.codec() == fvm_ipld_encoding::DAG_CBOR { let new_cids = extract_cids(&data)?; From 042301a92420ed5e7bc63a72c697605d94546891 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 21 Apr 2026 10:29:24 +0800 Subject: [PATCH 4/5] impl Default for FileBackedCidHashSet for test target --- src/cid_collections/hash_set.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/cid_collections/hash_set.rs b/src/cid_collections/hash_set.rs index aaf450489e8d..a0dcb67f3244 100644 --- a/src/cid_collections/hash_set.rs +++ b/src/cid_collections/hash_set.rs @@ -168,6 +168,13 @@ impl CidHashSetLike for FileBackedCidHashSet { } } +#[cfg(test)] +impl Default for FileBackedCidHashSet { + fn default() -> Self { + Self::new(std::env::temp_dir()).expect("failed to create FileBackedCidHashSet") + } +} + #[cfg(test)] mod tests { use super::*; @@ -186,7 +193,7 @@ mod tests { #[quickcheck_macros::quickcheck] fn test_file_backed_cid_hashset(cids: HashSet) { - let mut set = FileBackedCidHashSet::new(std::env::temp_dir()).unwrap(); + let mut set = FileBackedCidHashSet::default(); let dir = set._dir.path().to_path_buf(); for cid in cids.iter() { all_asserts::assert_true!( From d438415a4d855848bdf990e09f172491faf48408 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Tue, 21 Apr 2026 10:41:26 +0800 Subject: [PATCH 5/5] resolve AI comment --- src/cid_collections/hash_set.rs | 6 +++++- src/dev/subcommands/export_state_tree_cmd.rs | 6 +++++- src/tool/subcommands/archive_cmd.rs | 4 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/cid_collections/hash_set.rs b/src/cid_collections/hash_set.rs index a0dcb67f3244..5154c30b08f9 100644 --- a/src/cid_collections/hash_set.rs +++ b/src/cid_collections/hash_set.rs @@ -139,6 +139,10 @@ impl FileBackedCidHashSet { lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries }) } + + pub fn new_in_temp_dir() -> anyhow::Result { + Self::new(std::env::temp_dir()) + } } impl CidHashSetLike for FileBackedCidHashSet { @@ -171,7 +175,7 @@ impl CidHashSetLike for FileBackedCidHashSet { #[cfg(test)] impl Default for FileBackedCidHashSet { fn default() -> Self { - Self::new(std::env::temp_dir()).expect("failed to create FileBackedCidHashSet") + Self::new_in_temp_dir().expect("failed to create FileBackedCidHashSet") } } diff --git a/src/dev/subcommands/export_state_tree_cmd.rs b/src/dev/subcommands/export_state_tree_cmd.rs index 9262200d214c..366edc075762 100644 --- a/src/dev/subcommands/export_state_tree_cmd.rs +++ b/src/dev/subcommands/export_state_tree_cmd.rs @@ -109,7 +109,11 @@ impl ExportStateTreeCommand { ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root())); } let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?]; - let stream = IpldStream::new(db, ipld_roots.clone(), FileBackedCidHashSet::new(".")?); + let stream = IpldStream::new( + db, + ipld_roots.clone(), + FileBackedCidHashSet::new_in_temp_dir()?, + ); let frames = crate::db::car::forest::Encoder::compress_stream_default(stream); let tmp = tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))? diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index a33d0bdc453e..dca8f981e72f 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -594,12 +594,12 @@ pub async fn do_export( store.clone(), diff_ts.clone().chain_owned(store.clone()), diff_limit, - FileBackedCidHashSet::new(".")?, + FileBackedCidHashSet::new_in_temp_dir()?, ); while stream.try_next().await?.is_some() {} stream.into_seen() } else { - FileBackedCidHashSet::new(".")? + FileBackedCidHashSet::new_in_temp_dir()? }; let output_path = build_output_path(network.to_string(), genesis.timestamp, epoch, output_path);