Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

### Changed

- [#6939](https://github.com/ChainSafe/forest/pull/6939): Refactored snapshot export and garbage collection logic to use disk-backed hash set for de-de-duplicating reachable blocks. This results in less RAM usage (~6-7GiB) and more disk usage (~7-8GiB on mainnet).

### Removed

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ num-traits = "0.2"
num_cpus = "1"
nunny = { version = "0.2", features = ["serde", "quickcheck", "schemars1"] }
openrpc-types = "0.5"
parity-db = { version = "0.5" }
parity-db = { version = "0.5", features = ["bytes"] }
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
pastey = "0.2"
pathfinding = "4"
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/users/guides/gc.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ Always remember to enable GC when moving back to production or long-term testing

During the GC process, Forest consumes extra RAM and disk space temporarily:

- While traversing reachable blocks, it uses 32 bytes of RAM per reachable block.
- While traversing reachable blocks, it uses ~80MiB of RAM and ~8GiB disk space on mainnet (and ~2GiB on calibnet) for de-duplicating reachable blocks.
- While exporting a lite snapshot, it uses extra disk space before cleaning up parity-db and stale CAR snapshots.

For a typical ~80 GiB mainnet snapshot, this results in ~2.5 GiB of additional RAM and ~80 GiB disk space usage.
For a typical ~80 GiB mainnet snapshot, this results in ~80 MiB of additional RAM and ~90 GiB disk space usage.

### Syncing Pauses or Performance Overheads

Expand Down
49 changes: 29 additions & 20 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod weight;
pub use self::{snapshot_format::*, store::*, weight::*};

use crate::blocks::{Tipset, TipsetKey};
use crate::cid_collections::CidHashSet;
use crate::cid_collections::CidHashSetLike;
use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
use crate::db::{SettingsStore, SettingsStoreExt};
use crate::ipld::stream_chain;
Expand All @@ -32,50 +32,61 @@ use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};

#[derive(Debug, Clone, Default)]
pub struct ExportOptions {
pub struct ExportOptions<S> {
pub skip_checksum: bool,
pub include_receipts: bool,
pub include_events: bool,
pub include_tipset_keys: bool,
pub seen: CidHashSet,
pub seen: S,
}

pub async fn export_from_head<D: Digest>(
impl<S: Default> Default for ExportOptions<S> {
Comment thread
hanabi1224 marked this conversation as resolved.
fn default() -> Self {
Self {
skip_checksum: Default::default(),
include_receipts: Default::default(),
include_events: Default::default(),
include_tipset_keys: Default::default(),
seen: Default::default(),
}
}
}

pub async fn export_from_head<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
db: &Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
options: ExportOptions<S>,
) -> anyhow::Result<(Tipset, Option<digest::Output<D>>)> {
let head_key = SettingsStoreExt::read_obj::<TipsetKey>(db, crate::db::setting_keys::HEAD_KEY)?
.context("chain head key not found")?;
let head_ts = Tipset::load_required(&db, &head_key)?;
let digest = export::<D>(db, &head_ts, lookup_depth, writer, options).await?;
let digest = export::<D, S>(db, &head_ts, lookup_depth, writer, options).await?;
Ok((head_ts, digest))
}

/// Exports a Filecoin snapshot in v1 format
/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v1-specification>
pub async fn export<D: Digest>(
pub async fn export<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
options: ExportOptions<S>,
) -> anyhow::Result<Option<digest::Output<D>>> {
let roots = tipset.key().to_cids();
export_to_forest_car::<D>(roots, None, db, tipset, lookup_depth, writer, options).await
export_to_forest_car::<D, S>(roots, None, db, tipset, lookup_depth, writer, options).await
}

/// Exports a Filecoin snapshot in v2 format
/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
pub async fn export_v2<D: Digest, F: Seek + Read>(
pub async fn export_v2<D: Digest, F: Seek + Read, S: CidHashSetLike + Send + Sync + 'static>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
mut f3: Option<(Cid, F)>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
options: ExportOptions<S>,
) -> anyhow::Result<Option<digest::Output<D>>> {
// validate f3 data
if let Some((f3_cid, f3_data)) = &mut f3 {
Expand Down Expand Up @@ -121,7 +132,7 @@ pub async fn export_v2<D: Digest, F: Seek + Read>(
});
}

export_to_forest_car::<D>(
export_to_forest_car::<D, S>(
roots,
Some(prefix_data_frames),
db,
Expand All @@ -134,23 +145,21 @@ pub async fn export_v2<D: Digest, F: Seek + Read>(
}

#[allow(clippy::too_many_arguments)]
async fn export_to_forest_car<D: Digest>(
async fn export_to_forest_car<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
roots: NonEmpty<Cid>,
prefix_data_frames: Option<Vec<anyhow::Result<ForestCarFrame>>>,
db: &Arc<impl Blockstore + Send + Sync + 'static>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
) -> anyhow::Result<Option<digest::Output<D>>> {
let ExportOptions {
ExportOptions {
skip_checksum,
include_receipts,
include_events,
include_tipset_keys,
seen,
} = options.unwrap_or_default();

}: ExportOptions<S>,
) -> anyhow::Result<Option<digest::Output<D>>> {
if include_events && !include_receipts {
anyhow::bail!("message receipts must be included when events are included");
}
Expand All @@ -171,8 +180,8 @@ async fn export_to_forest_car<D: Digest>(
db.shallow_clone(),
tipset.shallow_clone().chain_owned(db.shallow_clone()),
stateroot_lookup_limit,
seen,
)
.with_seen(seen)
.with_message_receipts(include_receipts)
.with_events(include_events)
.with_tipset_keys(include_tipset_keys)
Expand Down
20 changes: 18 additions & 2 deletions src/chain/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::*;
use crate::{
blocks::{CachingBlockHeader, Chain4U, Tipset, TipsetKey, chain4u},
cid_collections::CidHashSet,
db::{MemoryDB, car::ForestCar},
utils::db::CborStoreExt,
};
Expand Down Expand Up @@ -60,10 +61,25 @@ async fn test_export_inner(version: FilecoinSnapshotVersion) -> anyhow::Result<(

let checksum = match version {
FilecoinSnapshotVersion::V1 => {
export::<Sha256>(&db, &head, 0, &mut car_bytes, None).await?
export::<Sha256, _>(
&db,
&head,
0,
&mut car_bytes,
ExportOptions::<CidHashSet>::default(),
)
.await?
}
FilecoinSnapshotVersion::V2 => {
export_v2::<Sha256, File>(&db, None, &head, 0, &mut car_bytes, None).await?
export_v2::<Sha256, File, _>(
&db,
None,
&head,
0,
&mut car_bytes,
ExportOptions::<CidHashSet>::default(),
)
.await?
}
};

Expand Down
125 changes: 125 additions & 0 deletions src/cid_collections/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::*;
use anyhow::Context as _;
use bytes::Bytes;
use cid::Cid;

#[cfg(doc)]
use std::collections::HashSet;
use std::{path::Path, sync::LazyLock};

pub trait CidHashSetLike {
/// Adds a value to the set.
///
/// Returns whether the value was newly inserted.
fn insert(&mut self, cid: Cid) -> bool;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

/// A hash set implemented as a `HashMap` where the value is `()`.
///
Expand Down Expand Up @@ -56,6 +66,12 @@ impl CidHashSet {
}
}

impl CidHashSetLike for CidHashSet {
fn insert(&mut self, cid: Cid) -> bool {
self.insert(cid)
}
}

////////////////////
// Collection Ops //
////////////////////
Expand All @@ -73,3 +89,112 @@ impl FromIterator<Cid> for CidHashSet {
this
}
}

/// A file-backed CID hash set.
/// This is intended to be used for large sets of CIDs that may not fit in memory, such as when tracking seen CIDs during a chain export.
pub struct FileBackedCidHashSet {
db: parity_db::Db,
// for dropping the temporary directory when the set is dropped
_dir: tempfile::TempDir,
lru: hashlink::LruCache<SmallCid, ()>,
Comment thread
hanabi1224 marked this conversation as resolved.
}

impl FileBackedCidHashSet {
pub fn new(temp_dir_root: impl AsRef<Path>) -> anyhow::Result<Self> {
let dir = tempfile::tempdir_in(temp_dir_root.as_ref()).with_context(|| {
format!(
"failed to create temp dir in {}",
temp_dir_root.as_ref().display(),
)
})?;
let options = parity_db::Options {
path: dir.path().to_path_buf(),
sync_wal: false,
sync_data: false,
stats: false,
salt: None,
columns: vec![
parity_db::ColumnOptions {
uniform: true,
append_only: true,
..Default::default()
},
parity_db::ColumnOptions {
append_only: true,
..Default::default()
},
],
compression_threshold: Default::default(),
};
let db = parity_db::Db::open_or_create(&options).with_context(|| {
format!(
"failed to create temp parity-db at {}",
options.path.display()
)
})?;
Ok(Self {
db,
_dir: dir,
#[allow(clippy::disallowed_methods)]
lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries
})
}
}

impl CidHashSetLike for FileBackedCidHashSet {
fn insert(&mut self, cid: Cid) -> bool {
static EMPTY_VALUE: LazyLock<Bytes> = LazyLock::new(|| Bytes::from_static(&[]));

let small = SmallCid::from(cid);
if self.lru.get(&small).is_some() {
return false;
}

let (col, key) = match &small {
SmallCid::Inline(c) => (0, c.digest().to_vec()),
SmallCid::Indirect(u) => (1, u.inner().to_bytes()),
};
if self.db.get(col, &key).ok().flatten().is_some() {
self.lru.insert(small, ());
false
} else {
_ = self
.db
.commit_changes_bytes([(col, parity_db::Operation::Set(key, EMPTY_VALUE.clone()))]);
self.lru.insert(small, ());
true
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[quickcheck_macros::quickcheck]
fn test_cid_hashset(mut cids: Vec<Cid>) {
cids.dedup();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
let mut set = CidHashSet::default();
for cid in cids.iter() {
all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted");
}
for cid in cids.iter() {
all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set");
}
}

#[quickcheck_macros::quickcheck]
fn test_file_backed_cid_hashset(mut cids: Vec<Cid>) {
cids.dedup();
let mut set = FileBackedCidHashSet::new(std::env::temp_dir()).unwrap();
let dir = set._dir.path().to_path_buf();
for cid in cids.iter() {
all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted");
}
for cid in cids.iter() {
all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set");
}
drop(set);
all_asserts::assert_false!(dir.exists(), "expected temporary directory to be deleted");
}
}
14 changes: 12 additions & 2 deletions src/cid_collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pub mod hash_map;
pub mod hash_set;
mod small_cid_vec;
pub use hash_map::CidHashMap;
pub use hash_set::CidHashSet;
pub use hash_set::{CidHashSet, CidHashSetLike, FileBackedCidHashSet};
use imp::{CidV1DagCborBlake2b256, Uncompactable};
pub use small_cid_vec::SmallCidNonEmptyVec;
pub use small_cid_vec::{SmallCid, SmallCidNonEmptyVec};

/// The core primitive for saving space in this module.
///
Expand Down Expand Up @@ -53,6 +53,10 @@ mod imp {

impl CidV1DagCborBlake2b256 {
const WIDTH: usize = 32;

pub fn digest(&self) -> &[u8; Self::WIDTH] {
&self.digest
}
}

#[cfg(test)]
Expand Down Expand Up @@ -107,6 +111,12 @@ mod imp {
inner: Cid,
}

impl Uncompactable {
pub fn inner(&self) -> &Cid {
&self.inner
}
}

/// [`Uncompactable`] can only be created through [`MaybeCompactedCid`], since
/// that type defines the canonical conversion
impl From<Uncompactable> for Cid {
Expand Down
Loading
Loading