Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/docs/users/reference/cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,11 @@ generate_markdown_section "forest-dev" "state compute"
generate_markdown_section "forest-dev" "state replay-compute"
generate_markdown_section "forest-dev" "state validate"
generate_markdown_section "forest-dev" "state replay-validate"

generate_markdown_section "forest-dev" "update-checkpoints"

generate_markdown_section "forest-dev" "archive-missing"

generate_markdown_section "forest-dev" "export-tipset-lookup"

generate_markdown_section "forest-dev" "export-state-tree"
124 changes: 124 additions & 0 deletions src/dev/subcommands/export_state_tree_cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::{
chain::{ChainStore, index::ResolveNullTipset},
cli_shared::{chain_path, read_config},
daemon::db_util::load_all_forest_cars,
db::{
CAR_DB_DIR_NAME,
car::ManyCar,
car::forest::FOREST_CAR_FILE_EXTENSION,
db_engine::{db_root, open_db},
},
genesis::read_genesis_header,
ipld::IpldStream,
networks::{ChainConfig, NetworkChain},
shim::{clock::ChainEpoch, executor::Receipt},
};
use anyhow::Context as _;
use clap::Args;
use itertools::Itertools;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::AsyncWriteExt as _;

/// Exports N consecutive parent state trees(together with messages, message receipts and events) of the tipset at the given epoch
#[derive(Debug, Args)]
pub struct ExportStateTreeCommand {
/// Filecoin network chain (e.g., calibnet, mainnet)
#[arg(long, required = true)]
chain: NetworkChain,
/// Optional path to the database folder
#[arg(long)]
db: Option<PathBuf>,
/// The maximum tipset epoch to export state tree from (Exclusive)
#[arg(long)]
from: ChainEpoch,
/// The minimum tipset epoch to export state tree from (Inclusive)
#[arg(long)]
to: ChainEpoch,
/// The path to the output `ForestCAR` file
#[arg(short, long)]
output: Option<PathBuf>,
Comment thread
hanabi1224 marked this conversation as resolved.
}

impl ExportStateTreeCommand {
pub async fn run(self) -> anyhow::Result<()> {
let Self {
chain,
db,
from,
to,
output,
} = self;
let output = output.unwrap_or_else(|| {
Path::new(&format!(
"statetree_{chain}_{to}_{from}{FOREST_CAR_FILE_EXTENSION}"
))
.to_owned()
});
let db_root_path = if let Some(db) = db {
db
} else {
let (_, config) = read_config(None, Some(chain.clone()))?;
db_root(&chain_path(&config))?
};
let forest_car_db_dir = db_root_path.join(CAR_DB_DIR_NAME);
let db: Arc<ManyCar<crate::db::parity_db::ParityDb>> =
Arc::new(ManyCar::new(open_db(db_root_path, &Default::default())?));
load_all_forest_cars(&db, &forest_car_db_dir)?;

let chain_config = Arc::new(ChainConfig::from_chain(&chain));
let genesis_header =
read_genesis_header(None, chain_config.genesis_bytes(&db).await?.as_deref(), &db)
.await?;
let chain_store = Arc::new(ChainStore::new(
db.clone(),
db.clone(),
db.clone(),
chain_config,
genesis_header,
)?);

let start_ts = chain_store.chain_index().tipset_by_height(
from,
chain_store.heaviest_tipset(),
ResolveNullTipset::TakeNewer,
)?;

let mut ipld_roots = vec![];
for (child, ts) in start_ts
.chain(&db)
.tuple_windows()
.take_while(|(_, parent)| parent.epoch() >= to)
{
ipld_roots.extend([*child.parent_state(), *child.parent_message_receipts()]);
ipld_roots.extend(ts.block_headers().iter().map(|h| h.messages));
let receipts = Receipt::get_receipts(&db, *child.parent_message_receipts())
.with_context(|| {
format!(
"failed to get receipts, root: {}, epoch: {}, tipset key: {}",
*child.parent_message_receipts(),
ts.epoch(),
ts.key(),
)
})?;
ipld_roots.extend(receipts.into_iter().filter_map(|r| r.events_root()));
}
let roots = nunny::vec![ipld_roots.first().cloned().context("no ipld roots found")?];
let stream = IpldStream::new(db, ipld_roots.clone());
let frames = crate::db::car::forest::Encoder::compress_stream_default(stream);
let tmp =
tempfile::NamedTempFile::new_in(output.parent().unwrap_or_else(|| Path::new(".")))?
.into_temp_path();
let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(&tmp).await?);
crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?;
writer.flush().await?;
tmp.persist(output)?;

Ok(())
}
}
3 changes: 3 additions & 0 deletions src/dev/subcommands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0, MIT

mod archive_missing_cmd;
mod export_state_tree_cmd;
mod export_tipset_lookup_cmd;
mod state_cmd;
mod update_checkpoints_cmd;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub enum Subcommand {
/// Find missing archival snapshots on the Forest Archive for a given epoch range
ArchiveMissing(archive_missing_cmd::ArchiveMissingCommand),
ExportTipsetLookup(export_tipset_lookup_cmd::ExportTipsetLookupCommand),
ExportStateTree(export_state_tree_cmd::ExportStateTreeCommand),
}

impl Subcommand {
Expand All @@ -61,6 +63,7 @@ impl Subcommand {
Self::UpdateCheckpoints(cmd) => cmd.run().await,
Self::ArchiveMissing(cmd) => cmd.run().await,
Self::ExportTipsetLookup(cmd) => cmd.run().await,
Self::ExportStateTree(cmd) => cmd.run().await,
}
}
}
Expand Down
43 changes: 43 additions & 0 deletions src/ipld/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,49 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
}
}

pin_project! {
pub struct IpldStream<DB> {
db: DB,
cid_vec: Vec<Cid>,
seen: CidHashSet,
}
}

impl<DB> IpldStream<DB> {
pub fn new(db: DB, roots: Vec<Cid>) -> Self {
Self {
db,
cid_vec: roots,
seen: CidHashSet::default(),
}
}
}

impl<DB: Blockstore> Stream for IpldStream<DB> {
type Item = anyhow::Result<CarBlock>;

fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
while let Some(cid) = this.cid_vec.pop() {
if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
if let Some(data) = this.db.get(&cid)? {
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let new_cids = extract_cids(&data)?;
if !new_cids.is_empty() {
this.cid_vec.extend(new_cids);
}
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
}
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
} else {
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
}
}
}
// That's it, nothing else to do. End of stream.
Poll::Ready(None)
}
}

fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
if let Ipld::Link(cid) = ipld {
Some(cid)
Expand Down
Loading