Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ num-traits = "0.2"
num_cpus = "1"
nunny = { version = "0.2", features = ["serde", "quickcheck", "schemars1"] }
openrpc-types = "0.5"
parity-db = { version = "0.5" }
parity-db = { version = "0.5", features = ["bytes"] }
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
pastey = "0.2"
pathfinding = "4"
Expand Down
6 changes: 3 additions & 3 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod weight;
pub use self::{snapshot_format::*, store::*, weight::*};

use crate::blocks::{Tipset, TipsetKey};
use crate::cid_collections::CidHashSet;
use crate::cid_collections::FileBackedCidHashSet;
use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
use crate::db::{SettingsStore, SettingsStoreExt};
use crate::ipld::stream_chain;
Expand All @@ -32,13 +32,13 @@ use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};

#[derive(Debug, Clone, Default)]
#[derive(Default)]
pub struct ExportOptions {
pub skip_checksum: bool,
pub include_receipts: bool,
pub include_events: bool,
pub include_tipset_keys: bool,
pub seen: CidHashSet,
pub seen: FileBackedCidHashSet,
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

pub async fn export_from_head<D: Digest>(
Expand Down
127 changes: 127 additions & 0 deletions src/cid_collections/hash_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use super::*;
use bytes::Bytes;
use cid::Cid;

#[cfg(doc)]
use std::collections::HashSet;
use std::sync::LazyLock;

pub trait CidHashSetLike {
/// Adds a value to the set.
///
/// Returns whether the value was newly inserted.
fn insert(&mut self, cid: Cid) -> bool;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

/// A hash set implemented as a `HashMap` where the value is `()`.
///
Expand Down Expand Up @@ -56,6 +65,12 @@ impl CidHashSet {
}
}

impl CidHashSetLike for CidHashSet {
fn insert(&mut self, cid: Cid) -> bool {
self.insert(cid)
}
}

////////////////////
// Collection Ops //
////////////////////
Expand All @@ -73,3 +88,115 @@ impl FromIterator<Cid> for CidHashSet {
this
}
}

/// A file-backed CID hash set.
/// This is intended to be used for large sets of CIDs that may not fit in memory, such as when tracking seen CIDs during a chain export.
pub struct FileBackedCidHashSet {
db: parity_db::Db,
// for dropping the temporary directory when the set is dropped
_dir: tempfile::TempDir,
lru: hashlink::LruCache<SmallCid, ()>,
Comment thread
hanabi1224 marked this conversation as resolved.
}

impl Default for FileBackedCidHashSet {
fn default() -> Self {
const MAX_ATTEMPTS: usize = 10;
for _ in 0..MAX_ATTEMPTS {
// temp dir has limitation of 10GiB on some platforms, so we prefer current working directory
// and try multiple times to create one in case we hit that limit
if let Some(dir) = tempfile::tempdir_in(".")
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
.ok()
.or_else(|| tempfile::tempdir().ok())
{
let options = parity_db::Options {
path: dir.path().to_path_buf(),
sync_wal: false,
sync_data: false,
stats: false,
salt: None,
columns: vec![
parity_db::ColumnOptions {
uniform: true,
append_only: true,
..Default::default()
},
parity_db::ColumnOptions {
append_only: true,
..Default::default()
},
],
compression_threshold: Default::default(),
};
if let Ok(db) = parity_db::Db::open_or_create(&options) {
return Self {
db,
_dir: dir,
#[allow(clippy::disallowed_methods)]
lru: hashlink::LruCache::new(2 << 19), // ~80MiB for 1M entries
};
}
}
}
panic!(
"failed to create parity db with a temporary directory after {MAX_ATTEMPTS} attempts"
);
}
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
}

impl CidHashSetLike for FileBackedCidHashSet {
fn insert(&mut self, cid: Cid) -> bool {
static EMPTY_VALUE: LazyLock<Bytes> = LazyLock::new(|| Bytes::from_static(&[]));

let small = SmallCid::from(cid);
if self.lru.get(&small).is_some() {
return false;
}

let (col, key) = match &small {
SmallCid::Inline(c) => (0, c.digest().to_vec()),
SmallCid::Indirect(u) => (1, u.inner().to_bytes()),
};
if self.db.get(col, &key).ok().flatten().is_some() {
self.lru.insert(small, ());
false
} else {
_ = self
.db
.commit_changes_bytes([(col, parity_db::Operation::Set(key, EMPTY_VALUE.clone()))]);
self.lru.insert(small, ());
true
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[quickcheck_macros::quickcheck]
fn test_cid_hashset(mut cids: Vec<Cid>) {
cids.dedup();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
let mut set = CidHashSet::default();
for cid in cids.iter() {
all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted");
}
for cid in cids.iter() {
all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set");
}
}

#[quickcheck_macros::quickcheck]
fn test_file_backed_cid_hashset(mut cids: Vec<Cid>) {
cids.dedup();
let mut set = FileBackedCidHashSet::default();
let dir = set._dir.path().to_path_buf();
for cid in cids.iter() {
all_asserts::assert_true!(set.insert(*cid), "expected CID to be newly inserted");
}
for cid in cids.iter() {
all_asserts::assert_false!(set.insert(*cid), "expected CID to be present in the set");
}
drop(set);
all_asserts::assert_false!(dir.exists(), "expected temporary directory to be deleted");
}
}
14 changes: 12 additions & 2 deletions src/cid_collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pub mod hash_map;
pub mod hash_set;
mod small_cid_vec;
pub use hash_map::CidHashMap;
pub use hash_set::CidHashSet;
pub use hash_set::{CidHashSet, FileBackedCidHashSet};
use imp::{CidV1DagCborBlake2b256, Uncompactable};
pub use small_cid_vec::SmallCidNonEmptyVec;
pub use small_cid_vec::{SmallCid, SmallCidNonEmptyVec};

/// The core primitive for saving space in this module.
///
Expand Down Expand Up @@ -53,6 +53,10 @@ mod imp {

impl CidV1DagCborBlake2b256 {
const WIDTH: usize = 32;

pub fn digest(&self) -> &[u8; Self::WIDTH] {
&self.digest
}
}

#[cfg(test)]
Expand Down Expand Up @@ -107,6 +111,12 @@ mod imp {
inner: Cid,
}

impl Uncompactable {
pub fn inner(&self) -> &Cid {
&self.inner
}
}

/// [`Uncompactable`] can only be created through [`MaybeCompactedCid`], since
/// that type defines the canonical conversion
impl From<Uncompactable> for Cid {
Expand Down
6 changes: 3 additions & 3 deletions src/dev/subcommands/export_state_tree_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

use crate::{
chain::{ChainStore, index::ResolveNullTipset},
cid_collections::FileBackedCidHashSet,
cli_shared::{chain_path, read_config},
daemon::db_util::load_all_forest_cars,
db::{
CAR_DB_DIR_NAME,
car::ManyCar,
car::forest::FOREST_CAR_FILE_EXTENSION,
car::{ManyCar, forest::FOREST_CAR_FILE_EXTENSION},
db_engine::{db_root, open_db},
},
genesis::read_genesis_header,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl ExportStateTreeCommand {
ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root()));
}
let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?];
let stream = IpldStream::new(db, ipld_roots.clone());
let stream = IpldStream::<_, FileBackedCidHashSet>::new(db, ipld_roots.clone());
let frames = crate::db::car::forest::Encoder::compress_stream_default(stream);
let tmp =
tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))?
Expand Down
46 changes: 28 additions & 18 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::blocks::Tipset;
use crate::cid_collections::CidHashSet;
use crate::cid_collections::hash_set::CidHashSetLike;
use crate::ipld::Ipld;
use crate::shim::clock::ChainEpoch;
use crate::shim::executor::Receipt;
Expand Down Expand Up @@ -145,11 +146,11 @@ enum Task {
}

pin_project! {
pub struct ChainStream<DB, T> {
pub struct ChainStream<DB, T, S = CidHashSet> {
tipset_iter: T,
db: DB,
dfs: VecDeque<Task>, // Depth-first work queue.
seen: CidHashSet,
seen: S,
stateroot_limit_exclusive: ChainEpoch,
fail_on_dead_links: bool,
message_receipts: bool,
Expand All @@ -159,8 +160,8 @@ pin_project! {
}
}

impl<DB, T> ChainStream<DB, T> {
pub fn with_seen(mut self, seen: CidHashSet) -> Self {
impl<DB, T, S> ChainStream<DB, T, S> {
pub fn with_seen(mut self, seen: S) -> Self {
self.seen = seen;
self
}
Expand Down Expand Up @@ -194,8 +195,7 @@ impl<DB, T> ChainStream<DB, T> {
self
}

#[allow(dead_code)]
pub fn into_seen(self) -> CidHashSet {
pub fn into_seen(self) -> S {
self.seen
}
}
Expand All @@ -211,16 +211,21 @@ impl<DB, T> ChainStream<DB, T> {
/// * `stateroot_limit` - An epoch that signifies how far back (exclusive) we need to inspect tipsets,
/// in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth`
/// is the number of `[`Tipset`]` that needs inspection.
pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
pub fn stream_chain<
DB: Blockstore,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin,
S: CidHashSetLike + Default,
>(
db: DB,
tipset_iter: ITER,
stateroot_limit_exclusive: ChainEpoch,
) -> ChainStream<DB, ITER> {
) -> ChainStream<DB, ITER, S> {
ChainStream {
tipset_iter,
db,
dfs: VecDeque::new(),
seen: CidHashSet::default(),
seen: Default::default(),
stateroot_limit_exclusive,
fail_on_dead_links: true,
message_receipts: false,
Expand All @@ -232,16 +237,21 @@ pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T>

// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
// are ignored.
pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
pub fn stream_graph<
DB: Blockstore,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin,
S: CidHashSetLike + Default,
>(
db: DB,
tipset_iter: ITER,
stateroot_limit_exclusive: ChainEpoch,
) -> ChainStream<DB, ITER> {
) -> ChainStream<DB, ITER, S> {
stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false)
}

impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
for ChainStream<DB, ITER>
impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin, S: CidHashSetLike> Stream
for ChainStream<DB, ITER, S>
{
type Item = anyhow::Result<CarBlock>;

Expand Down Expand Up @@ -412,24 +422,24 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
}

pin_project! {
pub struct IpldStream<DB> {
pub struct IpldStream<DB, S> {
db: DB,
cid_vec: Vec<Cid>,
seen: CidHashSet,
seen: S,
}
}

impl<DB> IpldStream<DB> {
impl<DB, S: Default> IpldStream<DB, S> {
pub fn new(db: DB, roots: Vec<Cid>) -> Self {
Self {
db,
cid_vec: roots,
seen: CidHashSet::default(),
seen: S::default(),
}
}
}

impl<DB: Blockstore> Stream for IpldStream<DB> {
impl<DB: Blockstore, S: CidHashSetLike> Stream for IpldStream<DB, S> {
type Item = anyhow::Result<CarBlock>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
Loading
Loading