Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

### Changed

- [#6655](https://github.com/ChainSafe/forest/issues/6655): Updated garbage collector to keep message receipts and events.

### Removed

### Fixed
Expand Down
6 changes: 6 additions & 0 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
#[derive(Debug, Clone, Default)]
pub struct ExportOptions {
pub skip_checksum: bool,
pub include_receipts: bool,
pub include_events: bool,
pub seen: CidHashSet,
}

Expand Down Expand Up @@ -140,6 +142,8 @@ async fn export_to_forest_car<D: Digest>(
) -> anyhow::Result<Option<digest::Output<D>>> {
let ExportOptions {
skip_checksum,
message_receipts,
events,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
seen,
} = options.unwrap_or_default();

Expand All @@ -161,6 +165,8 @@ async fn export_to_forest_car<D: Digest>(
stateroot_lookup_limit,
)
.with_seen(seen)
.with_message_receipts(message_receipts)
.with_events(events)
.track_progress(true),
);

Expand Down
2 changes: 2 additions & 0 deletions src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ impl SnapshotCommands {
recent_roots: depth,
output_path: temp_path.to_path_buf(),
tipset_keys: tipset.key().clone().into(),
message_receipts: false,
events: false,
skip_checksum,
dry_run,
};
Expand Down
4 changes: 3 additions & 1 deletion src/db/gc/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ where
file,
Some(ExportOptions {
Comment thread
LesnyRumcajs marked this conversation as resolved.
skip_checksum: true,
..Default::default()
message_receipts: true,
events: true,
seen: Default::default(),
}),
)
.await?;
Expand Down
68 changes: 68 additions & 0 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::blocks::Tipset;
use crate::cid_collections::CidHashSet;
use crate::ipld::Ipld;
use crate::shim::clock::ChainEpoch;
use crate::shim::executor::Receipt;
use crate::utils::db::car_stream::CarBlock;
use crate::utils::encoding::extract_cids;
use crate::utils::multihash::prelude::*;
Expand Down Expand Up @@ -137,7 +138,9 @@ impl Iterator for DfsIter {

enum IterateType {
Message(Cid),
MessageReceipts(Cid),
StateRoot(Cid),
EventsRoot(Cid),
}

enum Task {
Expand All @@ -155,6 +158,8 @@ pin_project! {
seen: CidHashSet,
stateroot_limit_exclusive: ChainEpoch,
fail_on_dead_links: bool,
message_receipts: bool,
events: bool,
track_progress: bool,
}
}
Expand All @@ -175,6 +180,19 @@ impl<DB, T> ChainStream<DB, T> {
self
}

/// Enable traversal of message receipt roots during chain export.
pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
self.message_receipts = message_receipts;
self
}

/// Enable traversal of events roots during chain export.
/// Requires message receipts to be enabled as well.
pub fn with_events(mut self, events: bool) -> Self {
self.events = events;
self
}
Comment thread
hanabi1224 marked this conversation as resolved.

#[allow(dead_code)]
pub fn into_seen(self) -> CidHashSet {
self.seen
Expand Down Expand Up @@ -204,6 +222,8 @@ pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T>
seen: CidHashSet::default(),
stateroot_limit_exclusive,
fail_on_dead_links: true,
message_receipts: false,
events: false,
track_progress: false,
}
}
Expand All @@ -226,6 +246,12 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use Task::*;

if self.events && !self.message_receipts {
return Poll::Ready(Some(Err(anyhow::anyhow!(
"message receipts must be included when events are included"
))));
}
Comment thread
LesnyRumcajs marked this conversation as resolved.
Outdated

let fail_on_dead_links = self.fail_on_dead_links;
let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
let this = self.project();
Expand Down Expand Up @@ -276,6 +302,20 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
IterateType::StateRoot(c) => {
format!("state root {c}")
}
IterateType::MessageReceipts(c) => {
// Forgive message receipts
tracing::trace!(
"[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
);
continue;
}
IterateType::EventsRoot(c) => {
// Forgive events
tracing::trace!(
"[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
);
continue;
}
};
return Poll::Ready(Some(Err(anyhow::anyhow!(
"[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
Expand Down Expand Up @@ -318,6 +358,34 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
.filter_map(ipld_to_cid)
.collect(),
));
if *this.message_receipts {
this.dfs.push_back(Iterate(
block.epoch,
*block.cid(),
IterateType::MessageReceipts(block.message_receipts),
DfsIter::from(block.message_receipts)
.filter_map(ipld_to_cid)
.collect(),
));
}
// ignore failure as receipts are not required by a lite snapshot
if *this.events
&& let Ok(receipts) =
Receipt::get_receipts(this.db, block.message_receipts)
{
Comment thread
hanabi1224 marked this conversation as resolved.
for receipt in receipts {
if let Some(events_root) = receipt.events_root() {
this.dfs.push_back(Iterate(
block.epoch,
*block.cid(),
IterateType::EventsRoot(events_root),
DfsIter::from(events_root)
.filter_map(ipld_to_cid)
.collect(),
));
}
}
}
}

// Visit the block if it's within required depth. And a special case for `0`
Expand Down
10 changes: 10 additions & 0 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ impl RpcMethod<1> for ForestChainExport {
recent_roots,
output_path,
tipset_keys: ApiTipsetKey(tsk),
message_receipts,
events,
skip_checksum,
dry_run,
} = params;
Expand All @@ -448,6 +450,8 @@ impl RpcMethod<1> for ForestChainExport {

let options = Some(ExportOptions {
skip_checksum,
message_receipts,
events,
seen: Default::default(),
});
let writer = if dry_run {
Expand Down Expand Up @@ -682,6 +686,8 @@ impl RpcMethod<1> for ChainExport {
recent_roots,
output_path,
tipset_keys,
message_receipts: false,
events: false,
skip_checksum,
dry_run,
},),
Expand Down Expand Up @@ -1468,6 +1474,10 @@ pub struct ForestChainExportParams {
#[schemars(with = "LotusJson<ApiTipsetKey>")]
#[serde(with = "crate::lotus_json")]
pub tipset_keys: ApiTipsetKey,
#[serde(default)]
pub message_receipts: bool,
#[serde(default)]
pub events: bool,
pub skip_checksum: bool,
pub dry_run: bool,
}
Expand Down
38 changes: 30 additions & 8 deletions src/tool/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::ipld::{stream_chain, stream_graph};
use crate::networks::{ChainConfig, NetworkChain, butterflynet, calibnet, mainnet};
use crate::shim::address::CurrentNetwork;
use crate::shim::clock::{ChainEpoch, EPOCH_DURATION_SECONDS, EPOCHS_IN_DAY};
use crate::shim::executor::{Receipt, StampedEvent};
use crate::shim::fvm_shared_latest::address::Network;
use crate::shim::machine::GLOBAL_MULTI_ENGINE;
use crate::state_manager::{NO_CALLBACK, StateOutput, apply_block_messages};
Expand Down Expand Up @@ -308,6 +309,8 @@ pub struct ArchiveInfo {
epoch: ChainEpoch,
tipsets: ChainEpoch,
messages: ChainEpoch,
message_receipts: usize,
events: usize,
head: Tipset,
snapshot_version: FilecoinSnapshotVersion,
index_size_bytes: Option<u32>,
Expand All @@ -321,6 +324,8 @@ impl std::fmt::Display for ArchiveInfo {
writeln!(f, "Epoch: {}", self.epoch)?;
writeln!(f, "State-roots: {}", self.epoch - self.tipsets + 1)?;
writeln!(f, "Messages sets: {}", self.epoch - self.messages + 1)?;
writeln!(f, "Message receipts: {}", self.message_receipts)?;
writeln!(f, "Events: {}", self.events)?;
let head_tipset_key_string = self
.head
.cids()
Expand Down Expand Up @@ -381,13 +386,25 @@ impl ArchiveInfo {
let mut network: String = "unknown".into();
let mut lowest_stateroot_epoch = root_epoch;
let mut lowest_message_epoch = root_epoch;
let mut message_receipts_count = 0;
let mut events_count = 0;

let iter = if progress {
itertools::Either::Left(windowed.progress_count(root_epoch as u64))
} else {
itertools::Either::Right(windowed)
};

let mut update_network_name = |block_cid: &Cid| {
if block_cid == &*calibnet::GENESIS_CID {
network = calibnet::NETWORK_COMMON_NAME.into();
} else if block_cid == &*mainnet::GENESIS_CID {
network = mainnet::NETWORK_COMMON_NAME.into();
} else if block_cid == &*butterflynet::GENESIS_CID {
network = butterflynet::NETWORK_COMMON_NAME.into();
}
};

for (parent, tipset) in iter {
if tipset.epoch() >= parent.epoch() && parent.epoch() != root_epoch {
bail!("Broken invariant: non-sequential epochs");
Expand All @@ -409,15 +426,16 @@ impl ArchiveInfo {
lowest_message_epoch = tipset.epoch();
}

let mut update_network_name = |block_cid: &Cid| {
if block_cid == &*calibnet::GENESIS_CID {
network = calibnet::NETWORK_COMMON_NAME.into();
} else if block_cid == &*mainnet::GENESIS_CID {
network = mainnet::NETWORK_COMMON_NAME.into();
} else if block_cid == &*butterflynet::GENESIS_CID {
network = butterflynet::NETWORK_COMMON_NAME.into();
if let Ok(receipts) = Receipt::get_receipts(store, *tipset.parent_message_receipts()) {
message_receipts += 1;
for receipt in receipts {
if let Some(events_root) = receipt.events_root()
&& let Ok(e) = StampedEvent::get_events(store, &events_root)
{
events += e.len();
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
};
}

if tipset.epoch() == 0 {
let block_cid = tipset.min_ticket_block().cid();
Expand All @@ -442,6 +460,8 @@ impl ArchiveInfo {
epoch: root_epoch,
tipsets: lowest_stateroot_epoch,
messages: lowest_message_epoch,
message_receipts,
events,
head,
snapshot_version,
index_size_bytes,
Expand Down Expand Up @@ -607,6 +627,8 @@ pub async fn do_export(
writer,
Some(ExportOptions {
skip_checksum: true,
message_receipts: false,
events: false,
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
seen,
}),
)
Expand Down
Loading