Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

### Changed

- [#6939](https://github.com/ChainSafe/forest/pull/6939): Refactored snapshot export and garbage collection logic to use disk-backed hash set for de-de-duplicating reachable blocks. This results in less RAM usage (~6-7GiB) and more disk usage (~7-8GiB on mainnet).

### Removed

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ num-traits = "0.2"
num_cpus = "1"
nunny = { version = "0.2", features = ["serde", "quickcheck", "schemars1"] }
openrpc-types = "0.5"
parity-db = { version = "0.5" }
parity-db = { version = "0.5", features = ["bytes"] }
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
pastey = "0.2"
pathfinding = "4"
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/users/guides/gc.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ Always remember to enable GC when moving back to production or long-term testing

During the GC process, Forest consumes extra RAM and disk space temporarily:

- While traversing reachable blocks, it uses 32 bytes of RAM per reachable block.
- While traversing reachable blocks, it uses ~80MiB of RAM and ~8GiB disk space on mainnet (and ~2GiB on calibnet) for de-duplicating reachable blocks.
- While exporting a lite snapshot, it uses extra disk space before cleaning up parity-db and stale CAR snapshots.

For a typical ~80 GiB mainnet snapshot, this results in ~2.5 GiB of additional RAM and ~80 GiB disk space usage.
For a typical ~80 GiB mainnet snapshot, this results in ~80 MiB of additional RAM and ~90 GiB disk space usage.

### Syncing Pauses or Performance Overheads

Expand Down
49 changes: 29 additions & 20 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod weight;
pub use self::{snapshot_format::*, store::*, weight::*};

use crate::blocks::{Tipset, TipsetKey};
use crate::cid_collections::CidHashSet;
use crate::cid_collections::CidHashSetLike;
use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
use crate::db::{SettingsStore, SettingsStoreExt};
use crate::ipld::stream_chain;
Expand All @@ -32,50 +32,61 @@ use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};

#[derive(Debug, Clone, Default)]
pub struct ExportOptions {
pub struct ExportOptions<S> {
pub skip_checksum: bool,
pub include_receipts: bool,
pub include_events: bool,
pub include_tipset_keys: bool,
pub seen: CidHashSet,
pub seen: S,
}

pub async fn export_from_head<D: Digest>(
impl<S: Default> Default for ExportOptions<S> {
Comment thread
hanabi1224 marked this conversation as resolved.
fn default() -> Self {
Self {
skip_checksum: Default::default(),
include_receipts: Default::default(),
include_events: Default::default(),
include_tipset_keys: Default::default(),
seen: Default::default(),
}
}
}

pub async fn export_from_head<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
db: &Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
options: ExportOptions<S>,
) -> anyhow::Result<(Tipset, Option<digest::Output<D>>)> {
let head_key = SettingsStoreExt::read_obj::<TipsetKey>(db, crate::db::setting_keys::HEAD_KEY)?
.context("chain head key not found")?;
let head_ts = Tipset::load_required(&db, &head_key)?;
let digest = export::<D>(db, &head_ts, lookup_depth, writer, options).await?;
let digest = export::<D, S>(db, &head_ts, lookup_depth, writer, options).await?;
Ok((head_ts, digest))
}

/// Exports a Filecoin snapshot in v1 format
/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v1-specification>
pub async fn export<D: Digest>(
pub async fn export<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
options: ExportOptions<S>,
) -> anyhow::Result<Option<digest::Output<D>>> {
let roots = tipset.key().to_cids();
export_to_forest_car::<D>(roots, None, db, tipset, lookup_depth, writer, options).await
export_to_forest_car::<D, S>(roots, None, db, tipset, lookup_depth, writer, options).await
}

/// Exports a Filecoin snapshot in v2 format
/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
pub async fn export_v2<D: Digest, F: Seek + Read>(
pub async fn export_v2<D: Digest, F: Seek + Read, S: CidHashSetLike + Send + Sync + 'static>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
mut f3: Option<(Cid, F)>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
options: ExportOptions<S>,
) -> anyhow::Result<Option<digest::Output<D>>> {
// validate f3 data
if let Some((f3_cid, f3_data)) = &mut f3 {
Expand Down Expand Up @@ -121,7 +132,7 @@ pub async fn export_v2<D: Digest, F: Seek + Read>(
});
}

export_to_forest_car::<D>(
export_to_forest_car::<D, S>(
roots,
Some(prefix_data_frames),
db,
Expand All @@ -134,23 +145,21 @@ pub async fn export_v2<D: Digest, F: Seek + Read>(
}

#[allow(clippy::too_many_arguments)]
async fn export_to_forest_car<D: Digest>(
async fn export_to_forest_car<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
roots: NonEmpty<Cid>,
prefix_data_frames: Option<Vec<anyhow::Result<ForestCarFrame>>>,
db: &Arc<impl Blockstore + Send + Sync + 'static>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
) -> anyhow::Result<Option<digest::Output<D>>> {
let ExportOptions {
ExportOptions {
skip_checksum,
include_receipts,
include_events,
include_tipset_keys,
seen,
} = options.unwrap_or_default();

}: ExportOptions<S>,
) -> anyhow::Result<Option<digest::Output<D>>> {
if include_events && !include_receipts {
anyhow::bail!("message receipts must be included when events are included");
}
Expand All @@ -171,8 +180,8 @@ async fn export_to_forest_car<D: Digest>(
db.shallow_clone(),
tipset.shallow_clone().chain_owned(db.shallow_clone()),
stateroot_lookup_limit,
seen,
)
.with_seen(seen)
.with_message_receipts(include_receipts)
.with_events(include_events)
.with_tipset_keys(include_tipset_keys)
Expand Down
20 changes: 18 additions & 2 deletions src/chain/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::*;
use crate::{
blocks::{CachingBlockHeader, Chain4U, Tipset, TipsetKey, chain4u},
cid_collections::CidHashSet,
db::{MemoryDB, car::ForestCar},
utils::db::CborStoreExt,
};
Expand Down Expand Up @@ -60,10 +61,25 @@ async fn test_export_inner(version: FilecoinSnapshotVersion) -> anyhow::Result<(

let checksum = match version {
FilecoinSnapshotVersion::V1 => {
export::<Sha256>(&db, &head, 0, &mut car_bytes, None).await?
export::<Sha256, _>(
&db,
&head,
0,
&mut car_bytes,
ExportOptions::<CidHashSet>::default(),
)
.await?
}
FilecoinSnapshotVersion::V2 => {
export_v2::<Sha256, File>(&db, None, &head, 0, &mut car_bytes, None).await?
export_v2::<Sha256, File, _>(
&db,
None,
&head,
0,
&mut car_bytes,
ExportOptions::<CidHashSet>::default(),
)
.await?
}
};

Expand Down
142 changes: 142 additions & 0 deletions src/cid_collections/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::*;
use anyhow::Context as _;
use bytes::Bytes;
use cid::Cid;

#[cfg(doc)]
use std::collections::HashSet;
use std::{path::Path, sync::LazyLock};

pub trait CidHashSetLike {
/// Adds a value to the set.
///
/// Returns whether the value was newly inserted.
fn insert(&mut self, cid: Cid) -> anyhow::Result<bool>;
}

/// A hash set implemented as a `HashMap` where the value is `()`.
///
Expand Down Expand Up @@ -56,6 +66,12 @@ impl CidHashSet {
}
}

impl CidHashSetLike for CidHashSet {
fn insert(&mut self, cid: Cid) -> anyhow::Result<bool> {
Ok(self.insert(cid))
}
}

////////////////////
// Collection Ops //
////////////////////
Expand All @@ -73,3 +89,129 @@ impl FromIterator<Cid> for CidHashSet {
this
}
}

/// A file-backed CID hash set.
/// This is intended to be used for large sets of CIDs that may not fit in memory, such as when tracking seen CIDs during a chain export.
pub struct FileBackedCidHashSet {
db: parity_db::Db,
// for dropping the temporary directory when the set is dropped
_dir: tempfile::TempDir,
lru: hashlink::LruCache<SmallCid, ()>,
Comment thread
hanabi1224 marked this conversation as resolved.
}

impl FileBackedCidHashSet {
pub fn new(temp_dir_root: impl AsRef<Path>) -> anyhow::Result<Self> {
let dir = tempfile::tempdir_in(temp_dir_root.as_ref()).with_context(|| {
format!(
"failed to create temp dir in {}",
temp_dir_root.as_ref().display(),
)
})?;
let options = parity_db::Options {
path: dir.path().to_path_buf(),
sync_wal: false,
sync_data: false,
stats: false,
salt: None,
columns: vec![
parity_db::ColumnOptions {
uniform: true,
append_only: true,
..Default::default()
},
parity_db::ColumnOptions {
append_only: true,
..Default::default()
},
],
compression_threshold: Default::default(),
};
let db = parity_db::Db::open_or_create(&options).with_context(|| {
format!(
"failed to create temp parity-db at {}",
options.path.display()
)
})?;
Ok(Self {
db,
_dir: dir,
#[allow(clippy::disallowed_methods)]
lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries
})
}

pub fn new_in_temp_dir() -> anyhow::Result<Self> {
Self::new(std::env::temp_dir())
}
}

impl CidHashSetLike for FileBackedCidHashSet {
fn insert(&mut self, cid: Cid) -> anyhow::Result<bool> {
static EMPTY_VALUE: LazyLock<Bytes> = LazyLock::new(|| Bytes::from_static(&[]));

let small = SmallCid::from(cid);
if self.lru.get(&small).is_some() {
return Ok(false);
}

let (col, key) = match &small {
SmallCid::Inline(c) => (0, c.digest().to_vec()),
SmallCid::Indirect(u) => (1, u.inner().to_bytes()),
};
if self.db.get(col, &key).ok().flatten().is_some() {
self.lru.insert(small, ());
Ok(false)
} else {
self.db.commit_changes_bytes([(
col,
parity_db::Operation::Set(key, EMPTY_VALUE.clone()),
)])?;
self.lru.insert(small, ());
Ok(true)
}
}
}

#[cfg(test)]
impl Default for FileBackedCidHashSet {
fn default() -> Self {
Self::new_in_temp_dir().expect("failed to create FileBackedCidHashSet")
}
}

#[cfg(test)]
mod tests {
use super::*;
use ahash::HashSet;

#[quickcheck_macros::quickcheck]
fn test_cid_hashset(cids: HashSet<Cid>) {
let mut set = CidHashSet::default();
for cid in cids.iter() {
all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted");
}
for cid in cids.iter() {
all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set");
}
}

#[quickcheck_macros::quickcheck]
fn test_file_backed_cid_hashset(cids: HashSet<Cid>) {
let mut set = FileBackedCidHashSet::default();
let dir = set._dir.path().to_path_buf();
for cid in cids.iter() {
all_asserts::assert_true!(
set.insert(*cid).unwrap(),
"expected CID to be newly inserted"
);
}
for cid in cids.iter() {
all_asserts::assert_false!(
set.insert(*cid).unwrap(),
"expected CID to be present in the set"
);
}
drop(set);
all_asserts::assert_false!(dir.exists(), "expected temporary directory to be deleted");
}
}
Loading
Loading