diff --git a/.github/workflows/forest.yml b/.github/workflows/forest.yml index 5c5ed1013e88..fb8f4a18a6e6 100644 --- a/.github/workflows/forest.yml +++ b/.github/workflows/forest.yml @@ -287,7 +287,7 @@ jobs: calibnet-export-check-v2: needs: - build-ubuntu - name: Snapshot export checks v2 + name: Snapshot export checks v2 with F3 data runs-on: ubuntu-24.04 steps: - run: lscpu @@ -304,7 +304,7 @@ jobs: run: | chmod +x ~/.cargo/bin/forest* - name: Snapshot export check v2 - run: ./scripts/tests/calibnet_export_check.sh v2 + run: ./scripts/tests/calibnet_export_f3_check.sh timeout-minutes: ${{ fromJSON(env.SCRIPT_TIMEOUT_MINUTES) }} calibnet-no-discovery-checks: needs: @@ -576,6 +576,7 @@ jobs: - state-migrations-check - calibnet-wallet-check - calibnet-export-check + - calibnet-export-check-v2 - calibnet-no-discovery-checks - calibnet-kademlia-checks - calibnet-eth-mapping-check diff --git a/CHANGELOG.md b/CHANGELOG.md index 277cfbe61615..12500a3ebc06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,10 +35,14 @@ - [#5859](https://github.com/ChainSafe/forest/pull/5859) Added size metrics for zstd frame cache and made max size configurable via `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` environment variable. +- [#5886](https://github.com/ChainSafe/forest/issues/5886) Add `forest-tool archive merge-f3` subcommand for merging a v1 Filecoin snapshot and an F3 snapshot into a v2 Filecoin snapshot. + - [#4976](https://github.com/ChainSafe/forest/issues/4976) Add support for the `Filecoin.EthSubscribe` and `Filecoin.EthUnsubscribe` API methods to enable subscriptions to Ethereum event types: `heads` and `logs`. ### Changed +- [#5886](https://github.com/ChainSafe/forest/issues/5886) Updated `forest-tool archive metadata` to print F3 snapshot header info when applicable. + - [#5869](https://github.com/ChainSafe/forest/pull/5869) Updated `forest-cli snapshot export` to print average speed. ### Removed diff --git a/docs/docs/users/reference/cli.sh b/docs/docs/users/reference/cli.sh index 4baa6c63b3e1..5da6f06810c6 100755 --- a/docs/docs/users/reference/cli.sh +++ b/docs/docs/users/reference/cli.sh @@ -119,7 +119,9 @@ generate_markdown_section "forest-tool" "archive" generate_markdown_section "forest-tool" "archive info" generate_markdown_section "forest-tool" "archive export" generate_markdown_section "forest-tool" "archive checkpoints" +generate_markdown_section "forest-tool" "archive metadata" generate_markdown_section "forest-tool" "archive merge" +generate_markdown_section "forest-tool" "archive merge-f3" generate_markdown_section "forest-tool" "archive diff" generate_markdown_section "forest-tool" "archive sync-bucket" diff --git a/f3-sidecar/api.go b/f3-sidecar/api.go index 36c63fdcb0c6..e11ac816e12a 100644 --- a/f3-sidecar/api.go +++ b/f3-sidecar/api.go @@ -1,13 +1,17 @@ package main import ( + "bufio" "context" + "errors" + "os" "github.com/filecoin-project/go-f3" "github.com/filecoin-project/go-f3/certs" "github.com/filecoin-project/go-f3/gpbft" "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/go-state-types/crypto" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" ) @@ -51,6 +55,35 @@ func (h *F3ServerHandler) F3GetF3PowerTable(ctx context.Context, tsk []byte) (gp return h.f3.GetPowerTable(ctx, tsk) } +func (h *F3ServerHandler) F3ExportLatestSnapshot(ctx context.Context, path string) (_ *cid.Cid, err error) { + cs, err := h.f3.GetCertStore() + if err != nil { + return nil, err + } + + f, err := os.Create(path) + if err != nil { + return nil, err + } + defer func() { + if closeErr := f.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + writer := bufio.NewWriter(f) + defer func() { + if flushErr := writer.Flush(); flushErr != nil { + err = errors.Join(err, flushErr) + } + }() + cid, _, err := cs.ExportLatestSnapshot(ctx, writer) + if err != nil { + return nil, err + } + return &cid, nil +} + // F3GetF3PowerTableByInstance retrieves the power table for a specific consensus instance. // It returns the power entries associated with the given instance number. // diff --git a/f3-sidecar/ffi_gen.go b/f3-sidecar/ffi_gen.go index edc7f5941af3..aea26d966715 100644 --- a/f3-sidecar/ffi_gen.go +++ b/f3-sidecar/ffi_gen.go @@ -30,6 +30,7 @@ var GoF3NodeImpl GoF3Node type GoF3Node interface { run(rpc_endpoint *string, jwt *string, f3_rpc_endpoint *string, initial_power_table *string, bootstrap_epoch *int64, finality *int64, f3_root *string) bool + import_snap(f3_rpc_endpoint *string, f3_root *string, snapshot_path *string) string } //export CGoF3Node_run @@ -49,6 +50,19 @@ func CGoF3Node_run(rpc_endpoint C.StringRef, jwt C.StringRef, f3_rpc_endpoint C. runtime.KeepAlive(buffer) } +//export CGoF3Node_import_snap +func CGoF3Node_import_snap(f3_rpc_endpoint C.StringRef, f3_root C.StringRef, snapshot_path C.StringRef, slot *C.void, cb *C.void) { + _new_f3_rpc_endpoint := newString(f3_rpc_endpoint) + _new_f3_root := newString(f3_root) + _new_snapshot_path := newString(snapshot_path) + resp := GoF3NodeImpl.import_snap(&_new_f3_rpc_endpoint, &_new_f3_root, &_new_snapshot_path) + resp_ref, buffer := cvt_ref(cntString, refString)(&resp) + asmcall.CallFuncG0P2(unsafe.Pointer(cb), unsafe.Pointer(&resp_ref), unsafe.Pointer(slot)) + runtime.KeepAlive(resp_ref) + runtime.KeepAlive(resp) + runtime.KeepAlive(buffer) +} + func newString(s_ref C.StringRef) string { return unsafe.String((*byte)(unsafe.Pointer(s_ref.ptr)), s_ref.len) } diff --git a/f3-sidecar/ffi_impl.go b/f3-sidecar/ffi_impl.go index ca4356449a01..6bfb7f472e2f 100644 --- a/f3-sidecar/ffi_impl.go +++ b/f3-sidecar/ffi_impl.go @@ -48,6 +48,13 @@ func (f3 *f3Impl) run(rpc_endpoint *string, jwt *string, f3_rpc_endpoint *string return err == nil } +func (f3 *f3Impl) import_snap(f3_rpc_endpoint *string, f3_root *string, snapshot_path *string) string { + if err := importSnap(f3.ctx, *f3_rpc_endpoint, *f3_root, *snapshot_path); err != nil { + return err.Error() + } + return "" +} + func checkError(err error) { if err != nil { panic(err) diff --git a/f3-sidecar/go.mod b/f3-sidecar/go.mod index 1150e3cec7ea..24180c972c5b 100644 --- a/f3-sidecar/go.mod +++ b/f3-sidecar/go.mod @@ -3,11 +3,12 @@ module f3-sidecar/v2 go 1.24.5 require ( - github.com/filecoin-project/go-f3 v0.8.9 + github.com/filecoin-project/go-f3 v0.8.10-0.20250801124500-9288fba86c47 github.com/filecoin-project/go-jsonrpc v0.8.0 github.com/filecoin-project/go-state-types v0.16.0 github.com/ihciah/rust2go v0.0.0-20250726175549-557d7a3a4e27 github.com/ipfs/go-cid v0.5.0 + github.com/ipfs/go-datastore v0.8.2 github.com/ipfs/go-ds-leveldb v0.5.2 github.com/ipfs/go-log/v2 v2.8.0 github.com/libp2p/go-libp2p v0.42.1 @@ -44,7 +45,6 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/boxo v0.33.0 // indirect - github.com/ipfs/go-datastore v0.8.2 // indirect github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect diff --git a/f3-sidecar/go.sum b/f3-sidecar/go.sum index e55c92e08444..6ce3301c4579 100644 --- a/f3-sidecar/go.sum +++ b/f3-sidecar/go.sum @@ -45,8 +45,8 @@ github.com/filecoin-project/go-bitfield v0.2.4 h1:uZ7MeE+XfM5lqrHJZ93OnhQKc/rveW github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= github.com/filecoin-project/go-clock v0.1.0 h1:SFbYIM75M8NnFm1yMHhN9Ahy3W5bEZV9gd6MPfXbKVU= github.com/filecoin-project/go-clock v0.1.0/go.mod h1:4uB/O4PvOjlx1VCMdZ9MyDZXRm//gkj1ELEbxfI1AZs= -github.com/filecoin-project/go-f3 v0.8.9 h1:0SHqwWmcVAL02Or7uE4P7qG1feopyVBSlgrUxkHkQBM= -github.com/filecoin-project/go-f3 v0.8.9/go.mod h1:hFvb2CMxHDmlJAVzfiIL/V8zCtNMQqfSnhP5TyM6CHI= +github.com/filecoin-project/go-f3 v0.8.10-0.20250801124500-9288fba86c47 h1:qf5j5kHyTfwivaXjXlySjS5Nax6wufrUiomE4elcW3s= +github.com/filecoin-project/go-f3 v0.8.10-0.20250801124500-9288fba86c47/go.mod h1:hFvb2CMxHDmlJAVzfiIL/V8zCtNMQqfSnhP5TyM6CHI= github.com/filecoin-project/go-jsonrpc v0.8.0 h1:2yqlN3Vd8Gx5UtA3fib7tQu2aW1cSOJt253LEBWExo4= github.com/filecoin-project/go-jsonrpc v0.8.0/go.mod h1:p8WGOwQGYbFugSdK7qKIGhhb1VVcQ2rtBLdEiik1QWI= github.com/filecoin-project/go-state-types v0.16.0 h1:ajIREDzTGfq71ofIQ29iZR1WXxmkvd2nQNc6ApcP1wI= diff --git a/f3-sidecar/import.go b/f3-sidecar/import.go new file mode 100644 index 000000000000..7acbbe0fdbb0 --- /dev/null +++ b/f3-sidecar/import.go @@ -0,0 +1,59 @@ +package main + +import ( + "bufio" + "context" + "errors" + "os" + + "github.com/filecoin-project/go-f3/certstore" + "github.com/filecoin-project/go-f3/manifest" + "github.com/filecoin-project/go-jsonrpc" + "github.com/ipfs/go-datastore/namespace" +) + +func importSnap(ctx context.Context, rpcEndpoint string, f3Root string, snapshotPath string) (err error) { + logger.Infof("importing F3 snapshot at %s", snapshotPath) + + f3api := F3Api{} + closer, err := jsonrpc.NewClient(ctx, rpcEndpoint, "F3", &f3api, nil) + if err != nil { + return err + } + defer closer() + rawNetworkName := waitRawNetworkName(ctx, &f3api) + networkName := getNetworkName(rawNetworkName) + m := Network2PredefinedManifestMappings[networkName] + if m == nil { + m2 := manifest.LocalDevnetManifest() + m = &m2 + m.NetworkName = networkName + } + + ds, err := getDatastore(f3Root) + if err != nil { + return err + } + defer func() { + if closeErr := ds.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + dsWrapper := namespace.Wrap(ds, m.DatastorePrefix()) + defer func() { + if closeErr := dsWrapper.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + f, err := os.Open(snapshotPath) + if err != nil { + return err + } + defer func() { + if closeErr := f.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + return certstore.ImportSnapshotToDatastore(ctx, bufio.NewReader(f), dsWrapper) +} diff --git a/f3-sidecar/main.go b/f3-sidecar/main.go index 3fa559e69221..d94f71d50e70 100644 --- a/f3-sidecar/main.go +++ b/f3-sidecar/main.go @@ -39,14 +39,22 @@ func main() { flag.Int64Var(&bootstrapEpoch, "bootstrap", -1, "F3 bootstrap epoch") var finality int64 flag.Int64Var(&finality, "finality", 900, "chain finality epochs") - var root string - flag.StringVar(&root, "root", "f3-data", "path to the f3 data directory") + var f3Root string + flag.StringVar(&f3Root, "root", "f3-data", "path to the f3 data directory") + var snapshotPath string + flag.StringVar(&snapshotPath, "snapshot", "", "path to the f3 snapshot file") flag.Parse() ctx := context.Background() - err := run(ctx, rpcEndpoint, jwt, f3RpcEndpoint, initialPowerTable, bootstrapEpoch, finality, root) + if len(snapshotPath) > 0 { + if err := importSnap(ctx, rpcEndpoint, f3Root, snapshotPath); err != nil { + panic(err) + } + } + + err := run(ctx, rpcEndpoint, jwt, f3RpcEndpoint, initialPowerTable, bootstrapEpoch, finality, f3Root) if err != nil { panic(err) } diff --git a/f3-sidecar/run.go b/f3-sidecar/run.go index 64e6236da201..af7c8dae7f58 100644 --- a/f3-sidecar/run.go +++ b/f3-sidecar/run.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "net/http" - "path/filepath" "time" "github.com/filecoin-project/go-f3" @@ -15,10 +14,9 @@ import ( "github.com/filecoin-project/go-f3/manifest" "github.com/filecoin-project/go-jsonrpc" "github.com/ipfs/go-cid" - leveldb "github.com/ipfs/go-ds-leveldb" ) -func run(ctx context.Context, rpcEndpoint string, jwt string, f3RpcEndpoint string, initialPowerTable string, bootstrapEpoch int64, finality int64, f3Root string) error { +func run(ctx context.Context, rpcEndpoint string, jwt string, f3RpcEndpoint string, initialPowerTable string, bootstrapEpoch int64, finality int64, f3Root string) (err error) { api := FilecoinApi{} isJwtProvided := len(jwt) > 0 closer, err := jsonrpc.NewClient(ctx, rpcEndpoint, "Filecoin", &api, nil) @@ -32,17 +30,7 @@ func run(ctx context.Context, rpcEndpoint string, jwt string, f3RpcEndpoint stri return err } - var rawNetwork string - for { - rawNetwork, err = ec.f3api.GetRawNetworkName(ctx) - if err == nil { - logger.Infoln("Forest RPC server is online") - break - } else { - logger.Warnln("waiting for Forest RPC server") - time.Sleep(5 * time.Second) - } - } + rawNetwork := waitRawNetworkName(ctx, &ec.f3api) listenAddrs, err := api.NetAddrsListen(ctx) if err != nil { return err @@ -60,17 +48,17 @@ func run(ctx context.Context, rpcEndpoint string, jwt string, f3RpcEndpoint stri if err != nil { return err } - ds, err := leveldb.NewDatastore(filepath.Join(f3Root, "db"), nil) + ds, err := getDatastore(f3Root) if err != nil { return err } + defer func() { + if closeErr := ds.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() verif := blssig.VerifierWithKeyOnG1() - networkName := gpbft.NetworkName(rawNetwork) - // Use "filecoin" as the network name on mainnet, otherwise use the network name. Yes, - // mainnet is called testnetnet in state. - if networkName == "testnetnet" { - networkName = "filecoin" - } + networkName := getNetworkName(rawNetwork) m := Network2PredefinedManifestMappings[networkName] if m == nil { m2 := manifest.LocalDevnetManifest() diff --git a/f3-sidecar/utils.go b/f3-sidecar/utils.go index 1309b75ff962..7d89126bd134 100644 --- a/f3-sidecar/utils.go +++ b/f3-sidecar/utils.go @@ -1,9 +1,46 @@ package main -import "github.com/ipfs/go-cid" +import ( + "context" + "path/filepath" + "time" + + "github.com/filecoin-project/go-f3/gpbft" + "github.com/ipfs/go-cid" + leveldb "github.com/ipfs/go-ds-leveldb" +) var CID_UNDEF_RUST = cid.MustParse("baeaaaaa") func isCidDefined(c cid.Cid) bool { return c.Defined() && c != CID_UNDEF_RUST } + +func getDatastore(f3Root string) (*leveldb.Datastore, error) { + return leveldb.NewDatastore(filepath.Join(f3Root, "db"), nil) +} + +func waitRawNetworkName(ctx context.Context, f3api *F3Api) string { + for { + rawNetwork, err := f3api.GetRawNetworkName(ctx) + if err != nil { + logger.Warnln("waiting for Forest RPC server") + time.Sleep(5 * time.Second) + continue + } + + logger.Infoln("Forest RPC server is online") + return rawNetwork + } +} + +func getNetworkName(rawNetworkName string) gpbft.NetworkName { + networkName := gpbft.NetworkName(rawNetworkName) + // See + // Use "filecoin" as the network name on mainnet, otherwise use the network name. Yes, + // mainnet is called testnetnet in state. + if networkName == "testnetnet" { + networkName = "filecoin" + } + return networkName +} diff --git a/scripts/tests/calibnet_eth_mapping_check.sh b/scripts/tests/calibnet_eth_mapping_check.sh index 7a3e7168bd7b..8803edd8c35f 100755 --- a/scripts/tests/calibnet_eth_mapping_check.sh +++ b/scripts/tests/calibnet_eth_mapping_check.sh @@ -46,8 +46,7 @@ done echo "Done" -echo "Waiting to be ready for serving" -$FOREST_CLI_PATH healthcheck ready --wait +forest_wait_for_healthcheck_ready ERROR=0 echo "Testing Ethereum mappings" diff --git a/scripts/tests/calibnet_export_f3_check.sh b/scripts/tests/calibnet_export_f3_check.sh new file mode 100755 index 000000000000..5e174eece28a --- /dev/null +++ b/scripts/tests/calibnet_export_f3_check.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# This script is checking the correctness of +# the snapshot export feature. +# It requires both the `forest` and `forest-cli` binaries to be in the PATH. + +set -eu + +source "$(dirname "$0")/harness.sh" + +forest_init_with_f3 + +echo "Cleaning up the initial snapshot" +rm --force --verbose ./*.{car,car.zst,sha256sum} + +echo "Exporting zstd compressed snapshot in v2 format" +$FOREST_CLI_PATH snapshot export --format v2 + +$FOREST_CLI_PATH shutdown --force + +for f in *.car.zst; do + echo "Inspecting archive info $f" + $FOREST_TOOL_PATH archive info "$f" + echo "Inspecting archive metadata $f" + $FOREST_TOOL_PATH archive metadata "$f" +done + +echo "Testing snapshot validity" +zstd --test ./*.car.zst + +echo "Verifying snapshot checksum" +sha256sum --check ./*.sha256sum + +for f in *.car.zst; do + echo "Validating CAR file $f" + $FOREST_TOOL_PATH snapshot validate "$f" +done diff --git a/scripts/tests/harness.sh b/scripts/tests/harness.sh index dc153f27a54f..f861daecfc89 100644 --- a/scripts/tests/harness.sh +++ b/scripts/tests/harness.sh @@ -27,6 +27,21 @@ function forest_download_and_import_snapshot { $FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-200 --auto-download-snapshot } +function forest_download_and_import_snapshot_with_f3 { + echo "Downloading v1 snapshot" + aria2c -x5 https://forest-archive.chainsafe.dev/latest/calibnet/ -o v1.forest.car.zst + echo "Downloading F3 snapshot" + aria2c -x5 https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/f3/f3_snap_calibnet_552628.bin -o f3.bin + echo "Generating v2 snapshot" + $FOREST_TOOL_PATH archive merge-f3 --v1 v1.forest.car.zst --f3 f3.bin --output v2.forest.car.zst + echo "Inspecting archive info" + $FOREST_TOOL_PATH archive info v2.forest.car.zst + echo "Inspecting archive metadata" + $FOREST_TOOL_PATH archive metadata v2.forest.car.zst + echo "Importing the v2 snapshot" + $FOREST_PATH --chain calibnet --encrypt-keystore false --halt-after-import --height=-200 --import-snapshot v2.forest.car.zst +} + function get_epoch_from_car_db { DB_PATH=$($FOREST_TOOL_PATH db stats --chain calibnet | grep "Database path:" | cut -d':' -f2- | xargs) SNAPSHOT=$(ls "$DB_PATH/car_db"/*.car.zst) @@ -97,6 +112,11 @@ function forest_wait_for_sync { timeout 30m $FOREST_CLI_PATH sync wait } +function forest_wait_for_healthcheck_ready { + echo "Waiting for healthcheck ready" + timeout 30m $FOREST_CLI_PATH healthcheck ready --wait +} + function forest_init { forest_download_and_import_snapshot @@ -117,6 +137,25 @@ function forest_init { forest_check_db_stats } +function forest_init_with_f3 { + forest_download_and_import_snapshot_with_f3 + + forest_check_db_stats + forest_run_node_detached + + forest_wait_api + + forest_wait_for_sync + forest_check_db_stats + + forest_wait_for_healthcheck_ready + + echo "Print the latest F3 certificate" + $FOREST_CLI_PATH f3 c get + echo "ensure F3 certificate at instance 550000 has been imported" + $FOREST_CLI_PATH f3 c get 550000 +} + function forest_init_stateless { forest_run_node_stateless_detached forest_wait_api diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 2cb48288f197..9e5868b1e58f 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -23,7 +23,7 @@ use cid::Cid; use digest::Digest; use futures::StreamExt as _; use fvm_ipld_blockstore::Blockstore; -use fvm_ipld_encoding::{DAG_CBOR, IPLD_RAW}; +use fvm_ipld_encoding::DAG_CBOR; use multihash_derive::MultihashDigest as _; use nunny::Vec as NonEmpty; use std::fs::File; @@ -83,10 +83,7 @@ pub async fn export_v2( // validate f3 data if let Some((f3_cid, f3_data)) = &mut f3 { f3_data.seek(SeekFrom::Start(0))?; - let expected_cid = Cid::new_v1( - IPLD_RAW, - MultihashCode::Blake2b256.digest_byte_stream(f3_data)?, - ); + let expected_cid = crate::f3::snapshot::get_f3_snapshot_cid(f3_data)?; anyhow::ensure!( f3_cid == &expected_cid, "f3 snapshot integrity check failed, actual cid: {f3_cid}, expected cid: {expected_cid}" @@ -115,10 +112,11 @@ pub async fn export_v2( }]; if let Some((f3_cid, mut f3_data)) = f3 { + let f3_data_len = f3_data.seek(SeekFrom::End(0))?; f3_data.seek(SeekFrom::Start(0))?; prefix_data_frames.push({ let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?; - encoder.write_car_block(f3_cid, f3_data.metadata()?.len() as _, &mut f3_data)?; + encoder.write_car_block(f3_cid, f3_data_len as _, &mut f3_data)?; anyhow::Ok(( vec![f3_cid], finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?, diff --git a/src/chain/snapshot_format.rs b/src/chain/snapshot_format.rs index e70fd3d9e4d3..b59c6aa54eff 100644 --- a/src/chain/snapshot_format.rs +++ b/src/chain/snapshot_format.rs @@ -74,16 +74,16 @@ impl FilecoinSnapshotMetadata { impl std::fmt::Display for FilecoinSnapshotMetadata { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - writeln!(f, "Snapshot version: {}", self.version as u64)?; + writeln!(f, "Snapshot version: {}", self.version as u64)?; let head_tipset_key_string = self .head_tipset_key .iter() .map(Cid::to_string) - .join("\n "); - writeln!(f, "Head Tipset: {head_tipset_key_string}")?; + .join("\n "); + writeln!(f, "Head Tipset: {head_tipset_key_string}")?; write!( f, - "F3 data: {}", + "F3 data: {}", self.f3_data .map(|c| c.to_string()) .unwrap_or_else(|| "not found".into()) diff --git a/src/cli/subcommands/f3_cmd.rs b/src/cli/subcommands/f3_cmd.rs index edf0dd91ae7b..c501e4a0c57c 100644 --- a/src/cli/subcommands/f3_cmd.rs +++ b/src/cli/subcommands/f3_cmd.rs @@ -476,6 +476,7 @@ pub struct F3PowerTableCliJson { #[serde(rename = "CID")] #[serde_as(as = "DisplayFromStr")] cid: Cid, + #[serde(with = "crate::lotus_json")] entries: Vec, #[serde(with = "crate::lotus_json::stringify")] total: num::BigInt, diff --git a/src/cli_shared/cli/client.rs b/src/cli_shared/cli/client.rs index 6c3c2f8beb79..a00f50287396 100644 --- a/src/cli_shared/cli/client.rs +++ b/src/cli_shared/cli/client.rs @@ -102,4 +102,10 @@ impl Client { pub fn default_rpc_token_path(&self) -> PathBuf { self.data_dir.join("token") } + + pub fn rpc_v1_endpoint(&self) -> Result { + format!("http://{}/rpc/v1", self.rpc_address) + .as_str() + .parse() + } } diff --git a/src/daemon/context.rs b/src/daemon/context.rs index b3458d6bea8a..494c68f2efa1 100644 --- a/src/daemon/context.rs +++ b/src/daemon/context.rs @@ -61,6 +61,10 @@ impl AppContext { snapshot_progress_tracker, }) } + + pub fn chain_config(&self) -> &Arc { + self.state_manager.chain_config() + } } fn get_chain_config_and_set_network(config: &Config) -> Arc { diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index c29374ac1a78..2a8553d1fc98 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -7,6 +7,7 @@ use crate::db::car::forest::{ }; use crate::db::car::{ForestCar, ManyCar}; use crate::interpreter::VMTrace; +use crate::networks::ChainConfig; use crate::rpc::sync::SnapshotProgressTracker; use crate::shim::clock::ChainEpoch; use crate::state_manager::{NO_CALLBACK, StateManager}; @@ -16,8 +17,8 @@ use crate::utils::net::{DownloadFileOption, download_to}; use anyhow::{Context, bail}; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; -use std::ffi::OsStr; use std::{ + ffi::OsStr, fs, path::{Path, PathBuf}, sync::Arc, @@ -130,6 +131,9 @@ pub async fn import_chain_as_forest_car( from_path: &Path, forest_car_db_dir: &Path, import_mode: ImportMode, + rpc_endpoint: Url, + f3_root: &Path, + chain_config: &ChainConfig, snapshot_progress_tracker: &SnapshotProgressTracker, ) -> anyhow::Result<(PathBuf, Tipset)> { info!("Importing chain from snapshot at: {}", from_path.display()); @@ -229,7 +233,32 @@ pub async fn import_chain_as_forest_car( } }; - let ts = ForestCar::try_from(forest_car_db_path.as_path())?.heaviest_tipset()?; + let forest_car = ForestCar::try_from(forest_car_db_path.as_path())?; + + if let Some(f3_cid) = forest_car.metadata().as_ref().and_then(|m| m.f3_data) { + let mut f3_data = forest_car + .get_reader(f3_cid)? + .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?; + let mut temp_f3_snap = tempfile::Builder::new() + .suffix(".f3snap.bin") + .tempfile_in(forest_car_db_dir)?; + { + let f = temp_f3_snap.as_file_mut(); + std::io::copy(&mut f3_data, f)?; + f.sync_all()?; + } + if let Err(e) = crate::f3::import_f3_snapshot( + chain_config, + rpc_endpoint.to_string(), + f3_root.display().to_string(), + temp_f3_snap.path().display().to_string(), + ) { + // Do not make it a hard error if anything is wrong with F3 snapshot + tracing::error!("Failed to import F3 snapshot: {e}"); + } + } + + let ts = forest_car.heaviest_tipset()?; info!( "Imported snapshot in: {}s, heaviest tipset epoch: {}, key: {}", stopwatch.elapsed().as_secs(), @@ -456,6 +485,9 @@ mod test { file_path, temp_db_dir.path(), import_mode, + "http://127.0.0.1:2345/rpc/v1".parse().unwrap(), + Path::new("test"), + &ChainConfig::devnet(), &SnapshotProgressTracker::default(), ) .await?; diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 26b3bbe9e9d4..7d279dc400a3 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -140,6 +140,9 @@ async fn maybe_import_snapshot( path, &ctx.db_meta_data.get_forest_car_db_dir(), config.client.import_mode, + config.client.rpc_v1_endpoint()?, + &crate::f3::get_f3_root(config), + ctx.chain_config(), &snapshot_tracker, ) .await?; @@ -392,21 +395,21 @@ fn maybe_start_rpc_service( Ok(()) } -fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) { +fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> { // already running if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() { - return; + return Ok(()); } if !config.client.enable_rpc { if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) { tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ") } - return; + return Ok(()); } if !opts.halt_after_import && !opts.stateless { - let rpc_address = config.client.rpc_address; + let rpc_endpoint = config.client.rpc_v1_endpoint()?; let state_manager = &ctx.state_manager; let p2p_peer_id = ctx.p2p_peer_id; let admin_jwt = ctx.admin_jwt.clone(); @@ -418,10 +421,7 @@ fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) { )) .expect("F3 lease manager should not have been initialized before"); let chain_config = state_manager.chain_config().clone(); - let default_f3_root = config - .client - .data_dir - .join(format!("f3/{}", config.chain())); + let f3_root = crate::f3::get_f3_root(config); let crate::f3::F3Options { chain_finality, bootstrap_epoch, @@ -430,7 +430,7 @@ fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) { move || { crate::f3::run_f3_sidecar_if_enabled( &chain_config, - format!("http://{rpc_address}/rpc/v1"), + rpc_endpoint.to_string(), admin_jwt, crate::rpc::f3::get_f3_rpc_endpoint().to_string(), initial_power_table @@ -438,12 +438,13 @@ fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) { .unwrap_or_default(), bootstrap_epoch, chain_finality, - std::env::var("FOREST_F3_ROOT") - .unwrap_or(default_f3_root.display().to_string()), + f3_root.display().to_string(), ); } }); } + + Ok(()) } fn maybe_start_indexer_service( @@ -577,7 +578,7 @@ pub(super) async fn start_services( on_app_context_and_db_initialized(&ctx); ctx.state_manager.populate_cache(); maybe_start_metrics_service(&mut services, &config, &ctx).await?; - maybe_start_f3_service(opts, &config, &ctx); + maybe_start_f3_service(opts, &config, &ctx)?; maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx) .await?; maybe_start_indexer_service(&mut services, opts, &config, &ctx); diff --git a/src/db/car/any.rs b/src/db/car/any.rs index 29371c35c598..f11bf40e8c68 100644 --- a/src/db/car/any.rs +++ b/src/db/car/any.rs @@ -14,9 +14,10 @@ use crate::chain::FilecoinSnapshotMetadata; use crate::utils::io::EitherMmapOrRandomAccessFile; use cid::Cid; use fvm_ipld_blockstore::Blockstore; +use itertools::Either; use positioned_io::ReadAt; use std::borrow::Cow; -use std::io::{Error, ErrorKind, Result}; +use std::io::{Error, ErrorKind, Read, Result}; use std::path::Path; use std::sync::Arc; @@ -112,6 +113,15 @@ impl AnyCar { _ => None, } } + + /// Gets a reader of the block data by its `Cid` + pub fn get_reader(&self, k: Cid) -> anyhow::Result> { + match self { + Self::Forest(car) => Ok(car.get_reader(k)?.map(Either::Left)), + Self::Plain(car) => Ok(car.get_reader(k).map(|r| Either::Right(Either::Left(r)))), + Self::Memory(car) => Ok(car.get_reader(k).map(|r| Either::Right(Either::Right(r)))), + } + } } impl TryFrom<&'static [u8]> for AnyCar<&'static [u8]> { diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index e29c25411db2..7603bed01b6b 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -61,6 +61,7 @@ use cid::Cid; use futures::{Stream, TryStreamExt as _}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore as _; +use integer_encoding::VarIntReader; use nunny::Vec as NonEmpty; use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor}; use std::io::{Seek, SeekFrom}; @@ -212,6 +213,28 @@ impl ForestCar { ..self } } + + /// Gets a reader of the block data by its `Cid` + pub fn get_reader(&self, k: Cid) -> anyhow::Result> { + for position in self.indexed.get(k)? { + // escape the positioned_io::Slice + let entire_file = self.indexed.reader().get_ref(); + // `position` is the frame start offset. + let cursor = Cursor::new_pos(entire_file, position); + let mut decoder = zstd::Decoder::new(cursor)?.single_frame(); + while let Ok(car_block_len) = decoder.read_varint::() { + let cid = Cid::read_bytes(&mut decoder)?; + let data_len = car_block_len.saturating_sub(cid.encoded_len()) as u64; + if cid == k { + // return the reader instead of decoding the entire data block into memory + return Ok(Some(decoder.take(data_len))); + } + // Discard data bytes + io::copy(&mut decoder.by_ref().take(data_len), &mut io::sink())?; + } + } + Ok(None) + } } impl TryFrom<&Path> for ForestCar { @@ -488,7 +511,15 @@ mod tests { ForestCar::new(mk_encoded_car(1024 * 4, 3, roots.clone(), blocks.clone())).unwrap(); assert_eq!(forest_car.head_tipset_key(), &roots); for block in blocks { - assert_eq!(forest_car.get(&block.cid).unwrap(), Some(block.data)); + assert_eq!(forest_car.get(&block.cid).unwrap().unwrap(), block.data); + let mut buf = vec![]; + forest_car + .get_reader(block.cid) + .unwrap() + .unwrap() + .read_to_end(&mut buf) + .unwrap(); + assert_eq!(buf, block.data); } } diff --git a/src/db/car/plain.rs b/src/db/car/plain.rs index 5b8fb78414e1..f69f5586ae2d 100644 --- a/src/db/car/plain.rs +++ b/src/db/car/plain.rs @@ -224,6 +224,16 @@ impl PlainCar { metadata: self.metadata, } } + + /// Gets a reader of the block data by its `Cid` + pub fn get_reader(&self, k: Cid) -> Option { + self.index + .read() + .get(&k) + .map(|UncompressedBlockDataLocation { offset, length }| { + positioned_io::Cursor::new_pos(&self.reader, *offset).take(*length as u64) + }) + } } impl TryFrom<&'static [u8]> for PlainCar<&'static [u8]> { @@ -456,7 +466,7 @@ where #[cfg(test)] mod tests { - use super::PlainCar; + use super::*; use crate::utils::db::{ car_stream::{CarStream, CarV1Header}, car_util::load_car, @@ -483,10 +493,17 @@ mod tests { let expected = reference_car.get(&cid).unwrap().unwrap(); let expected2 = reference_car_zst.get(&cid).unwrap().unwrap(); let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap(); + let mut expected4 = vec![]; + car_backed + .get_reader(cid) + .unwrap() + .read_to_end(&mut expected4) + .unwrap(); let actual = car_backed.get(&cid).unwrap().unwrap(); assert_eq!(expected, actual); assert_eq!(expected2, actual); assert_eq!(expected3, actual); + assert_eq!(expected4, actual); } } diff --git a/src/f3/go_ffi.rs b/src/f3/go_ffi.rs index 7048a3838f4f..144e162816e8 100644 --- a/src/f3/go_ffi.rs +++ b/src/f3/go_ffi.rs @@ -18,4 +18,6 @@ pub trait GoF3Node { finality: i64, f3_root: String, ) -> bool; + + fn import_snap(f3_rpc_endpoint: String, f3_root: String, snapshot_path: String) -> String; } diff --git a/src/f3/mod.rs b/src/f3/mod.rs index 1775723c72f2..003fbb21016d 100644 --- a/src/f3/mod.rs +++ b/src/f3/mod.rs @@ -5,9 +5,13 @@ #[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))] mod go_ffi; +use std::path::{Path, PathBuf}; + #[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))] use go_ffi::*; +pub mod snapshot; + use cid::Cid; use crate::{networks::ChainConfig, utils::misc::env::is_env_set_and_truthy}; @@ -19,6 +23,17 @@ pub struct F3Options { pub initial_power_table: Option, } +pub fn get_f3_root(config: &crate::Config) -> PathBuf { + std::env::var("FOREST_F3_ROOT") + .map(|p| Path::new(&p).to_path_buf()) + .unwrap_or_else(|_| { + config + .client + .data_dir + .join(format!("f3/{}", config.chain())) + }) +} + pub fn get_f3_sidecar_params(chain_config: &ChainConfig) -> F3Options { let chain_finality = std::env::var("FOREST_F3_FINALITY") .ok() @@ -73,31 +88,59 @@ pub fn get_f3_sidecar_params(chain_config: &ChainConfig) -> F3Options { } } +#[allow(unused_variables)] pub fn run_f3_sidecar_if_enabled( chain_config: &ChainConfig, - _rpc_endpoint: String, - _jwt: String, - _f3_rpc_endpoint: String, - _initial_power_table: String, - _bootstrap_epoch: i64, - _finality: i64, - _f3_root: String, + rpc_endpoint: String, + jwt: String, + f3_rpc_endpoint: String, + initial_power_table: String, + bootstrap_epoch: i64, + finality: i64, + f3_root: String, ) { if is_sidecar_ffi_enabled(chain_config) { #[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))] { tracing::info!("Starting F3 sidecar service ..."); GoF3NodeImpl::run( - _rpc_endpoint, - _jwt, - _f3_rpc_endpoint, - _initial_power_table, - _bootstrap_epoch, - _finality, - _f3_root, + rpc_endpoint, + jwt, + f3_rpc_endpoint, + initial_power_table, + bootstrap_epoch, + finality, + f3_root, + ); + } + } +} + +#[allow(unused_variables)] +pub fn import_f3_snapshot( + chain_config: &ChainConfig, + rpc_endpoint: String, + f3_root: String, + snapshot: String, +) -> anyhow::Result<()> { + if is_sidecar_ffi_enabled(chain_config) { + #[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))] + { + let sw = std::time::Instant::now(); + tracing::info!("Importing F3 snapshot ..."); + let err = GoF3NodeImpl::import_snap(rpc_endpoint, f3_root, snapshot); + if !err.is_empty() { + anyhow::bail!("{err}"); + } + tracing::info!( + "Imported F3 snapshot, took {}", + humantime::format_duration(sw.elapsed()) ); } + } else { + tracing::warn!("F3 sidecar is disabled, skip importing the F3 snapshot"); } + Ok(()) } /// Whether F3 sidecar via FFI is enabled. diff --git a/src/f3/snapshot.rs b/src/f3/snapshot.rs new file mode 100644 index 000000000000..2b8a6d597500 --- /dev/null +++ b/src/f3/snapshot.rs @@ -0,0 +1,51 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +#[cfg(test)] +mod tests; + +use crate::{rpc::f3::F3PowerEntry, utils::multihash::MultihashCode}; +use cid::Cid; +use fvm_ipld_encoding::{IPLD_RAW, tuple::*}; +use integer_encoding::VarIntReader as _; +use std::io::Read; + +pub fn get_f3_snapshot_cid(f3_data: &mut impl Read) -> anyhow::Result { + Ok(Cid::new_v1( + IPLD_RAW, + MultihashCode::Blake2b256.digest_byte_stream(f3_data)?, + )) +} + +/// Defined in +#[derive(Debug, Clone, Eq, PartialEq, Serialize_tuple, Deserialize_tuple)] +pub struct F3SnapshotHeader { + pub version: u64, + pub first_instance: u64, + pub latest_instance: u64, + pub initial_power_table: Vec, +} + +impl F3SnapshotHeader { + pub fn decode_from_snapshot(f3_snapshot: &mut impl Read) -> anyhow::Result { + // Reasonable upper bound for snapshot header size (100MiB) + const MAX_HEADER_SIZE: usize = 100 * 1024 * 1024; + + let data_len = f3_snapshot.read_varint::()?; + anyhow::ensure!( + data_len <= MAX_HEADER_SIZE, + "F3 snapshot header size {data_len} exceeds maximum allowed size {MAX_HEADER_SIZE}" + ); + let mut data_bytes = vec![0; data_len]; + f3_snapshot.read_exact(&mut data_bytes)?; + Ok(fvm_ipld_encoding::from_slice(&data_bytes)?) + } +} + +impl std::fmt::Display for F3SnapshotHeader { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "F3 snapshot version: {}", self.version)?; + writeln!(f, "F3 snapshot first instance: {}", self.first_instance)?; + write!(f, "F3 snapshot last instance: {}", self.latest_instance) + } +} diff --git a/src/f3/snapshot/tests.rs b/src/f3/snapshot/tests.rs new file mode 100644 index 000000000000..996acf1f05b6 --- /dev/null +++ b/src/f3/snapshot/tests.rs @@ -0,0 +1,21 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; + +const ENCODED_F3_SNAP_HEADER_HEX: &str = "8401001a00086e7d94831a00021b71480002fbe0000000005830aedb9e1f2f4e20a6a6327b9136b8472b16f1f56cec2b6d9341a12a97dc90a39e93f026baefeea09b7507e3bf1fc70981831a00022eff4800010ae80000000058308cc2fc55933c0413e763b63b10b131b3e53f57882e0c79231de0ada3227e01ed817d689e3150874170a4a2f6b590cd93831943eb48000103dcdf0000005830b2158a380b942e099ee4c7522921d638598ebb487f97c5d5ed06a29ba47f620513316f7a596b0a4d5aff7343275c212f83190fc84700faa80000000058308a9b00505fedc14edde52278ea229f4fe91e80890f5f8951200663e41f8034986f56ffd593f4e6a9a282a81ef8dfdc1c831a0002114a4700b2180000000058308c39f779f522b8de5836f7c421427f40556f934376e493a6e0fc0c72f8e35228fcf14e116fa73146ae4d88710c668e4083190ec64700948000000000583095748c4cd115988755c49578725996df98eb96c0447f2b4d5064d6562876745950d64ffb410b6db141b3a43e49a0d6908319ea7847003a90000000005830b5a56233aa1dcbffa5a2326e9b224826b3d540998e8dffea40ee238a7c7ed67e6cc9e21fad0e3e4af9dd00329063c9cd831945b047001fd0ec10000058308cddcf1d23cf25a361cd9a451eeed1d391aae84adc2220d3b2d6d9b6c929d487581ddbde3c53b6770b1b1b3dfe21e8bc831a0001c5b3470001980000000058308d1768b8b3c8fe31025ce52323dc1112bd2e3d1b39923cab98aa821283100fe3635eee56938b63eb60eef4e117c55013831a0001e00a470001600000000058308d1768b8b3c8fe31025ce52323dc1112bd2e3d1b39923cab98aa821283100fe3635eee56938b63eb60eef4e117c55013831a0001c2ad47000138000000005830a64817823ab8728463b34ee36b44761ffdab722762d675f76f631dcfc02affc95e9f54cd765b7bc98c1619991dfbe2f2831a0002307b4600b8000000005830962b2e0e92443cec526cac167a7f80f42bd907b68c17f2e93baff6232b8f4d396b43458810dd95aea41163b7cc047222831903f54600b0000000005830a37c98356d8fa32a1a8fcbd6a009395c4a34177aae9cd28c1c652963fbe445e95513e9f0c8281047ebcbf97cdd0aed0f831a00021051460090009000005830a57b3347936bc303bdf8fc232e701270db3817507562f0cd591114adeb5596c665f828510f52f678e213ad31f0eb6b3383190e7a460078000000005830b234c6533b5b40b7241345a9e9a06eea45877b510170ae9d221bfcdd577d482ead9eb1f74992f667e96459ff43b0e5218319049b460068000000005830a6c8a62aec6bff5b185c220aebe671af9cb24560fce4913c7edffd4368ba7af67a292ab8bfde1b326d6885b27976d59b831a0001bf50460040000000005830ad31d1d68bb36fc0e830ad5d452c7e676ff3277d3ad86a45bacbdfe15b5d89119434fa003031dd2f82907717aba10f9b8319048f46002800000000583097f447a28d7a7a3a489a1683d5a6eb7f0a96bac3b67d6dfeee43bb8ed3a8b223887d9defc6436b73e8b75b2ee2cf9b4f8319066b4600280000000058309561056d88ceb20291cb4770aab1613502bbf8bd226bb0b1ba132cf770bbd00120cc6c09ebf4f4c6ae0fab38d83c833d831a0002286b4600220000000058308018ef30920f5ef4420164b13a806958563fabd74a356537bd38796d0baf070d8c10f3ba57e34ee1ccbea1517bcff9c2"; + +#[test] +pub fn test_f3_snap_header_serde() { + let encoded_block_bytes = hex::decode(ENCODED_F3_SNAP_HEADER_HEX).unwrap(); + let F3SnapshotHeader { + version, + first_instance, + latest_instance, + initial_power_table, + } = fvm_ipld_encoding::from_slice(&encoded_block_bytes).unwrap(); + assert_eq!(version, 1); + assert_eq!(first_instance, 0); + assert_eq!(latest_instance, 552573); + assert_eq!(initial_power_table.len(), 20); +} diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index f848880b861f..48558b345ab3 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -17,6 +17,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec}; +use crate::rpc::f3::F3ExportLatestSnapshot; use crate::rpc::types::{ApiTipsetKey, Event}; use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; use crate::shim::clock::ChainEpoch; @@ -37,6 +38,7 @@ use num::BigInt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use sha2::Sha256; +use std::fs::File; use std::{ collections::VecDeque, path::PathBuf, @@ -361,9 +363,30 @@ impl RpcMethod<1> for ForestChainExport { .await } FilecoinSnapshotVersion::V2 => { + let f3_snap_tmp_path = { + let mut f3_snap_dir = output_path.clone(); + let mut builder = tempfile::Builder::new(); + let with_suffix = builder.suffix(".f3snap.bin"); + if f3_snap_dir.pop() { + with_suffix.tempfile_in(&f3_snap_dir) + } else { + with_suffix.tempfile_in(".") + }? + .into_temp_path() + }; + let f3_snap = { + match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await + { + Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)), + Err(e) => { + tracing::error!("Failed to export F3 snapshot: {e}"); + None + } + } + }; crate::chain::export_v2::( &ctx.store_owned(), - None, + f3_snap, &start_ts, recent_roots, writer, diff --git a/src/rpc/methods/f3.rs b/src/rpc/methods/f3.rs index c2c408a590c9..9e06896ea9bc 100644 --- a/src/rpc/methods/f3.rs +++ b/src/rpc/methods/f3.rs @@ -32,7 +32,7 @@ use crate::{ LruBlockstoreReadCache, }, libp2p::{NetRPCMethods, NetworkMessage}, - lotus_json::HasLotusJson as _, + lotus_json::{HasLotusJson as _, LotusJson}, rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError, types::ApiTipsetKey}, shim::{ address::{Address, Protocol}, @@ -43,6 +43,7 @@ use crate::{ }; use ahash::{HashMap, HashSet}; use anyhow::Context as _; +use cid::Cid; use enumflags2::BitFlags; use fvm_ipld_blockstore::Blockstore; use jsonrpsee::core::{client::ClientT as _, params::ArrayParams}; @@ -623,6 +624,39 @@ impl RpcMethod<2> for SignMessage { } } +pub enum F3ExportLatestSnapshot {} + +impl F3ExportLatestSnapshot { + pub async fn run(path: String) -> anyhow::Result { + let client = get_rpc_http_client()?; + let mut params = ArrayParams::new(); + params.insert(path)?; + let LotusJson(cid): LotusJson = client + .request("Filecoin.F3ExportLatestSnapshot", params) + .await?; + Ok(cid) + } +} + +impl RpcMethod<1> for F3ExportLatestSnapshot { + const NAME: &'static str = "F3.ExportLatestSnapshot"; + const PARAM_NAMES: [&'static str; 1] = ["path"]; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Read; + const DESCRIPTION: Option<&'static str> = + Some("Exports the latest F3 snapshot to the specified path and returns its CID"); + + type Params = (String,); + type Ok = Cid; + + async fn handle( + _ctx: Ctx, + (path,): Self::Params, + ) -> Result { + Ok(Self::run(path).await?) + } +} + /// returns a finality certificate at given instance number pub enum F3GetCertificate {} impl RpcMethod<1> for F3GetCertificate { @@ -641,8 +675,8 @@ impl RpcMethod<1> for F3GetCertificate { let client = get_rpc_http_client()?; let mut params = ArrayParams::new(); params.insert(instance)?; - let response = client.request(Self::NAME, params).await?; - Ok(response) + let response: LotusJson = client.request(Self::NAME, params).await?; + Ok(response.into_inner()) } } @@ -659,8 +693,8 @@ impl RpcMethod<0> for F3GetLatestCertificate { async fn handle(_: Ctx, _: Self::Params) -> Result { let client = get_rpc_http_client()?; - let response = client.request(Self::NAME, ArrayParams::new()).await?; - Ok(response) + let response: LotusJson = client.request(Self::NAME, ArrayParams::new()).await?; + Ok(response.into_inner()) } } @@ -703,8 +737,8 @@ impl RpcMethod<1> for F3GetF3PowerTable { let client = get_rpc_http_client()?; let mut params = ArrayParams::new(); params.insert(tsk.into_lotus_json())?; - let response = client.request(Self::NAME, params).await?; - Ok(response) + let response: LotusJson = client.request(Self::NAME, params).await?; + Ok(response.into_inner()) } } @@ -727,8 +761,8 @@ impl RpcMethod<1> for F3GetF3PowerTableByInstance { let client = get_rpc_http_client()?; let mut params = ArrayParams::new(); params.insert(instance)?; - let response = client.request(Self::NAME, params).await?; - Ok(response) + let response: LotusJson = client.request(Self::NAME, params).await?; + Ok(response.into_inner()) } } @@ -762,8 +796,9 @@ pub enum F3GetProgress {} impl F3GetProgress { async fn run() -> anyhow::Result { let client = get_rpc_http_client()?; - let response = client.request(Self::NAME, ArrayParams::new()).await?; - Ok(response) + let response: LotusJson = + client.request(Self::NAME, ArrayParams::new()).await?; + Ok(response.into_inner()) } } @@ -787,8 +822,9 @@ pub enum F3GetManifest {} impl F3GetManifest { async fn run() -> anyhow::Result { let client = get_rpc_http_client()?; - let response = client.request(Self::NAME, ArrayParams::new()).await?; - Ok(response) + let response: LotusJson = + client.request(Self::NAME, ArrayParams::new()).await?; + Ok(response.into_inner()) } } diff --git a/src/rpc/methods/f3/types.rs b/src/rpc/methods/f3/types.rs index e225a487bed0..268916e6a766 100644 --- a/src/rpc/methods/f3/types.rs +++ b/src/rpc/methods/f3/types.rs @@ -6,8 +6,8 @@ use crate::{ blocks::{Tipset, TipsetKey}, lotus_json::{HasLotusJson, LotusJson, base64_standard, lotus_json_with_self}, networks::NetworkChain, - shim::executor::Receipt, - utils::multihash::prelude::*, + shim::{executor::Receipt, fvm_shared_latest::bigint::bigint_ser}, + utils::{encoding::serde_byte_array, multihash::prelude::*}, }; use byteorder::ByteOrder as _; use cid::Cid; @@ -134,9 +134,18 @@ pub struct ECTipSet { lotus_json_with_self!(ECTipSet); /// PowerEntry represents a single entry in the PowerTable, including ActorID and its StoragePower and PubKey. +#[derive(Debug, Clone, Serialize_tuple, Deserialize_tuple, Eq, PartialEq)] +pub struct F3PowerEntry { + pub id: ActorID, + #[serde(with = "bigint_ser")] + pub power: num::BigInt, + #[serde(with = "serde_byte_array")] + pub pub_key: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Eq, PartialEq)] #[serde(rename_all = "PascalCase")] -pub struct F3PowerEntry { +pub struct F3PowerEntryLotusJson { #[serde(rename = "ID")] pub id: ActorID, #[schemars(with = "String")] @@ -146,7 +155,39 @@ pub struct F3PowerEntry { #[serde(with = "base64_standard")] pub pub_key: Vec, } -lotus_json_with_self!(F3PowerEntry); + +impl HasLotusJson for F3PowerEntry { + type LotusJson = F3PowerEntryLotusJson; + + #[cfg(test)] + fn snapshots() -> Vec<(serde_json::Value, Self)> { + use base64::Engine; + use serde_json::json; + vec![( + json!({ + "ID": 143103, + "Power": "1233789485318144", + "PubKey": "jML8VZM8BBPnY7Y7ELExs+U/V4guDHkjHeCtoyJ+Ae2BfWieMVCHQXCkova1kM2T" + }), + Self { + id: 143103, + power: num::BigInt::from_str("1233789485318144").unwrap(), + pub_key: base64::prelude::BASE64_STANDARD + .decode("jML8VZM8BBPnY7Y7ELExs+U/V4guDHkjHeCtoyJ+Ae2BfWieMVCHQXCkova1kM2T") + .unwrap(), + }, + )] + } + + fn into_lotus_json(self) -> Self::LotusJson { + let Self { id, power, pub_key } = self; + F3PowerEntryLotusJson { id, power, pub_key } + } + + fn from_lotus_json(F3PowerEntryLotusJson { id, power, pub_key }: Self::LotusJson) -> Self { + Self { id, power, pub_key } + } +} /// Entries are sorted descending order of their power, where entries with equal power are /// sorted by ascending order of their ID. diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 9236da6474e2..b175b0427cd7 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -284,6 +284,7 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::f3::F3GetLatestCertificate); $callback!($crate::rpc::f3::F3GetOrRenewParticipationTicket); $callback!($crate::rpc::f3::F3Participate); + $callback!($crate::rpc::f3::F3ExportLatestSnapshot); $callback!($crate::rpc::f3::GetHead); $callback!($crate::rpc::f3::GetParent); $callback!($crate::rpc::f3::GetParticipatingMinerIDs); diff --git a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt index 95130b2800da..12c24dbf6207 100644 --- a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt +++ b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt @@ -1,3 +1,4 @@ +F3.ExportLatestSnapshot F3.Finalize F3.GetHead F3.GetParent diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 98e27bd5f9fd..0da18f47cf95 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -27,14 +27,15 @@ //! Additional reading: [`crate::db::car::plain`] use crate::blocks::Tipset; -use crate::chain::FilecoinSnapshotVersion; use crate::chain::{ ChainEpochDelta, index::{ChainIndex, ResolveNullTipset}, }; +use crate::chain::{FilecoinSnapshotMetadata, FilecoinSnapshotVersion}; use crate::cid_collections::CidHashSet; use crate::cli_shared::{snapshot, snapshot::TrustedVendor}; -use crate::db::car::{AnyCar, ManyCar}; +use crate::db::car::{AnyCar, ManyCar, forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL}; +use crate::f3::snapshot::F3SnapshotHeader; use crate::interpreter::VMTrace; use crate::ipld::{stream_graph, unordered_stream_graph}; use crate::networks::{ChainConfig, NetworkChain, butterflynet, calibnet, mainnet}; @@ -43,16 +44,22 @@ use crate::shim::clock::{ChainEpoch, EPOCH_DURATION_SECONDS, EPOCHS_IN_DAY}; use crate::shim::fvm_shared_latest::address::Network; use crate::shim::machine::GLOBAL_MULTI_ENGINE; use crate::state_manager::{NO_CALLBACK, StateOutput, apply_block_messages}; +use crate::utils::db::car_stream::{CarBlock, CarBlockWrite as _, CarStream}; +use crate::utils::multihash::MultihashCode; use anyhow::{Context as _, bail}; use chrono::DateTime; use cid::Cid; use clap::{Subcommand, ValueEnum}; use dialoguer::{Confirm, theme::ColorfulTheme}; -use futures::TryStreamExt; +use futures::{StreamExt as _, TryStreamExt as _}; use fvm_ipld_blockstore::Blockstore; -use indicatif::ProgressIterator; +use fvm_ipld_encoding::DAG_CBOR; +use indicatif::{ProgressBar, ProgressIterator, ProgressStyle}; use itertools::Itertools; +use multihash_derive::MultihashDigest as _; use sha2::Sha256; +use std::fs::File; +use std::io::{Seek as _, SeekFrom}; use std::ops::Range; use std::path::PathBuf; use std::sync::Arc; @@ -138,6 +145,18 @@ pub enum ArchiveCommands { #[arg(long, default_value_t = false)] force: bool, }, + /// Merge a v1 Filecoin snapshot with an F3 snapshot into a v2 Filecoin snapshot in `.forest.car.zst` format + MergeF3 { + /// Path to the v1 Filecoin snapshot + #[arg(long = "v1")] + filecoin_v1: PathBuf, + /// Path to the F3 snapshot + #[arg(long)] + f3: PathBuf, + /// Path to the snapshot output file in `.forest.car.zst` format + #[arg(long)] + output: PathBuf, + }, /// Show the difference between the canonical and computed state of a /// tipset. Diff { @@ -198,6 +217,13 @@ impl ArchiveCommands { let store = AnyCar::try_from(snapshot.as_path())?; if let Some(metadata) = store.metadata() { println!("{metadata}"); + if let Some(f3_cid) = metadata.f3_data { + let mut f3_data = store + .get_reader(f3_cid)? + .with_context(|| format!("f3 data not found, cid: {f3_cid}"))?; + let f3_snap_header = F3SnapshotHeader::decode_from_snapshot(&mut f3_data)?; + println!("{f3_snap_header}"); + } } else { println!( "No metadata found (required by v2 snapshot) - this appears to be a v1 snapshot" @@ -236,6 +262,11 @@ impl ArchiveCommands { output_path, force, } => merge_snapshots(snapshot_files, output_path, force).await, + Self::MergeF3 { + filecoin_v1, + f3, + output, + } => merge_f3_snapshot(filecoin_v1, f3, output).await, Self::Diff { snapshot_files, epoch, @@ -541,8 +572,8 @@ async fn do_export( output_path.to_str().unwrap_or_default() ); - let pb = indicatif::ProgressBar::new_spinner().with_style( - indicatif::ProgressStyle::with_template( + let pb = ProgressBar::new_spinner().with_style( + ProgressStyle::with_template( "{spinner} exported {total_bytes} with {binary_bytes_per_sec} in {elapsed}", ) .expect("indicatif template must be valid"), @@ -607,6 +638,99 @@ async fn merge_snapshots( Ok(()) } +async fn merge_f3_snapshot(filecoin: PathBuf, f3: PathBuf, output: PathBuf) -> anyhow::Result<()> { + { + let store = AnyCar::try_from(filecoin.as_path())?; + anyhow::ensure!( + store.metadata().is_none(), + "The filecoin snapshot is not in v1 format" + ); + } + + let mut f3_data = File::open(f3)?; + let f3_cid = crate::f3::snapshot::get_f3_snapshot_cid(&mut f3_data)?; + + let car_stream = CarStream::new(tokio::io::BufReader::new( + tokio::fs::File::open(&filecoin).await?, + )) + .await?; + + let chain_head = car_stream.header_v1.roots.clone(); + + println!("f3 snapshot cid: {f3_cid}"); + println!( + "chain head: [{}]", + chain_head.iter().map(|c| c.to_string()).join(", ") + ); + + let snap_meta = FilecoinSnapshotMetadata::new_v2(chain_head, Some(f3_cid)); + let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?; + let snap_meta_block = CarBlock { + cid: Cid::new_v1( + DAG_CBOR, + MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded), + ), + data: snap_meta_cbor_encoded, + }; + + let roots = nunny::vec![snap_meta_block.cid]; + let snap_meta_frame = { + let mut encoder = + crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?; + snap_meta_block.write(&mut encoder)?; + anyhow::Ok(( + vec![snap_meta_block.cid], + crate::db::car::forest::finalize_frame( + DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, + &mut encoder, + )?, + )) + }; + let f3_frame = { + let mut encoder = + crate::db::car::forest::new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?; + let f3_data_len = f3_data.seek(SeekFrom::End(0))?; + f3_data.seek(SeekFrom::Start(0))?; + encoder.write_car_block(f3_cid, f3_data_len as _, &mut f3_data)?; + anyhow::Ok(( + vec![f3_cid], + crate::db::car::forest::finalize_frame( + DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, + &mut encoder, + )?, + )) + }; + + let block_frames = crate::db::car::forest::Encoder::compress_stream_default( + car_stream.map_err(anyhow::Error::from), + ); + let frames = futures::stream::iter([snap_meta_frame, f3_frame]).chain(block_frames); + + let temp_output = { + let mut dir = output.clone(); + if dir.pop() { + tempfile::NamedTempFile::new_in(dir)? + } else { + tempfile::NamedTempFile::new_in(".")? + } + }; + let writer = tokio::io::BufWriter::new(tokio::fs::File::create(&temp_output).await?); + let pb = ProgressBar::new_spinner().with_style( + ProgressStyle::with_template( + "{spinner} {msg} {binary_total_bytes} written in {elapsed} ({binary_bytes_per_sec})", + ) + .expect("indicatif template must be valid"), + ).with_message(format!("Merging into {} ...", output.display())); + pb.enable_steady_tick(std::time::Duration::from_secs(1)); + let mut writer = pb.wrap_async_write(writer); + crate::db::car::forest::Encoder::write(&mut writer, roots, frames).await?; + writer.shutdown().await?; + temp_output.persist(&output)?; + pb.finish(); + + Ok(()) +} + /// Compute the tree of actor states for a given epoch and compare it to the /// expected result (as encoded in the blockchain). Differences are printed /// using the diff format (red for the blockchain state, green for the computed