diff --git a/CHANGELOG.md b/CHANGELOG.md index 331d5a26d8d..946cbbcc255 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ - `lotus state active-sectors` now outputs CSV format and supports an optional `--show-partitions` to list active sector deadlines and partitions. ([filecoin-project/lotus#13152](https://github.com/filecoin-project/lotus/pull/13152)) - chore: generate v0 JSON RPC specification ([filecoin-project/lotus#13155](https://github.com/filecoin-project/lotus/pull/13155)) - deps(f3): bump go-f3 version to 0.8.6, move power table cache to go-f3 from lotus ([filecoin-project/lotus#13144](https://github.com/filecoin-project/lotus/pull/13144)) +- feat(f3): move go-f3 datastore to separate leveldb instance ([filecoin-project/lotus#13174](https://github.com/filecoin-project/lotus/pull/13174)) # Node v1.33.0 / 2025-05-08 The Lotus v1.33.0 release introduces experimental v2 APIs with F3 awareness, featuring a new TipSet selection mechanism that significantly enhances how applications interact with the Filecoin blockchain. This release candidate also adds F3-aware Ethereum APIs via the /v2 endpoint. All of the /v2 APIs implement intelligent fallback mechanisms between F3 and Expected Consensus and are exposed through the Lotus Gateway. diff --git a/chain/lf3/f3.go b/chain/lf3/f3.go index fdc0ae66196..d085803c99e 100644 --- a/chain/lf3/f3.go +++ b/chain/lf3/f3.go @@ -1,13 +1,17 @@ package lf3 import ( + "bytes" "context" "errors" "fmt" "path/filepath" + "time" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/autobatch" "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" @@ -56,32 +60,122 @@ type F3 struct { type F3Params struct { fx.In - PubSub *pubsub.PubSub - Host host.Host - ChainStore *store.ChainStore - Syncer *chain.Syncer - StateManager *stmgr.StateManager - Datastore dtypes.MetadataDS - Wallet api.Wallet - Config *Config - LockedRepo repo.LockedRepo + PubSub *pubsub.PubSub + Host host.Host + ChainStore *store.ChainStore + Syncer *chain.Syncer + StateManager *stmgr.StateManager + MetaDatastore dtypes.MetadataDS + F3Datastore dtypes.F3DS + Wallet api.Wallet + Config *Config + LockedRepo repo.LockedRepo Net api.Net } var log = logging.Logger("f3") +var migrationKey = datastore.NewKey("/f3-migration/1") + +func checkMigrationComplete(ctx context.Context, source datastore.Batching, target datastore.Batching) (bool, error) { + valSource, err := source.Get(ctx, migrationKey) + if errors.Is(err, datastore.ErrNotFound) { + log.Debug("migration not complete, no migration flag in source datastore") + return false, nil + } + if err != nil { + return false, err + } + valTarget, err := target.Get(ctx, migrationKey) + if errors.Is(err, datastore.ErrNotFound) { + log.Debug("migration not complete, no migration flag in target datastore") + return false, nil + } + if err != nil { + return false, err + } + log.Debugw("migration flags", "source", string(valSource), "target", string(valTarget)) + + // if the values are equal, the migration is complete + return bytes.Equal(valSource, valTarget), nil +} + +// migrateDatastore can be removed once at least one network upgade passes +func migrateDatastore(ctx context.Context, source datastore.Batching, target datastore.Batching) error { + if complete, err := checkMigrationComplete(ctx, source, target); err != nil { + return xerrors.Errorf("checking if migration complete: %w", err) + } else if complete { + return nil + } + + startDate := time.Now() + migrationVal := startDate.Format(time.RFC3339Nano) + + if err := target.Put(ctx, migrationKey, []byte(migrationVal)); err != nil { + return xerrors.Errorf("putting migration flag in target datastore: %w", err) + } + // make sure the migration flag is not set in the source datastore + if err := source.Delete(ctx, migrationKey); err != nil && !errors.Is(err, datastore.ErrNotFound) { + return xerrors.Errorf("deleting migration flag in source datastore: %w", err) + } + + log.Infow("starting migration of f3 datastore", "tag", migrationVal) + qr, err := source.Query(ctx, query.Query{}) + if err != nil { + return xerrors.Errorf("starting source wildcard query: %w", err) + } + + // batch size of 2000, at the time of writing, F3 datastore has 150,000 keys taking ~170MiB + // meaning that a batch of 2000 keys would be ~2MiB of memory + batch := autobatch.NewAutoBatching(target, 2000) + var numMigrated int + for ctx.Err() == nil { + res, ok := qr.NextSync() + if !ok { + break + } + if err := batch.Put(ctx, datastore.NewKey(res.Key), res.Value); err != nil { + _ = qr.Close() + return xerrors.Errorf("putting key %s in target datastore: %w", res.Key, err) + } + numMigrated++ + } + if ctx.Err() != nil { + return ctx.Err() + } + if err := batch.Flush(ctx); err != nil { + return xerrors.Errorf("flushing batch: %w", err) + } + if err := qr.Close(); err != nil { + return xerrors.Errorf("closing query: %w", err) + } + + // set migration flag in the source datastore to signify that the migration is complete + if err := source.Put(ctx, migrationKey, []byte(migrationVal)); err != nil { + return xerrors.Errorf("putting migration flag in source datastore: %w", err) + } + took := time.Since(startDate) + log.Infow("completed migration of f3 datastore", "tag", migrationVal, "tookSeconds", took.Seconds(), "numMigrated", numMigrated) + + return nil +} + func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) { if params.Config.StaticManifest == nil { return nil, fmt.Errorf("configuration invalid, nil StaticManifest in the Config") } - ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3")) + metaDs := namespace.Wrap(params.MetaDatastore, datastore.NewKey("/f3")) + err := migrateDatastore(mctx, metaDs, params.F3Datastore) + if err != nil { + return nil, xerrors.Errorf("migrating datastore: %w", err) + } ec := newEcWrapper(params.ChainStore, params.Syncer, params.StateManager) verif := blssig.VerifierWithKeyOnG1() f3FsPath := filepath.Join(params.LockedRepo.Path(), "f3") - module, err := f3.New(mctx, *params.Config.StaticManifest, ds, + module, err := f3.New(mctx, *params.Config.StaticManifest, params.F3Datastore, params.Host, params.PubSub, verif, ec, f3FsPath) if err != nil { return nil, xerrors.Errorf("creating F3: %w", err) diff --git a/node/builder_chain.go b/node/builder_chain.go index 75c4dcd1198..765fb5d3aac 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -187,6 +187,7 @@ var ChainNode = Options( ApplyIf(func(s *Settings) bool { return build.IsF3Enabled() && !isLiteNode(s) }, + Override(new(dtypes.F3DS), modules.F3Datastore), Override(new(*lf3.Config), lf3.NewConfig), Override(new(lf3.F3Backend), lf3.New), Override(new(full.F3ModuleAPI), From(new(full.F3API))), diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 102f6b67c0a..e3ad50db31d 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -12,6 +12,9 @@ import ( // main repo datastore. type MetadataDS datastore.Batching +// F3DS stores F3 data. By default it's namespaced under /f3 in main repo datastore. +type F3DS datastore.Batching + type ( // UniversalBlockstore is the universal blockstore backend. UniversalBlockstore blockstore.Blockstore diff --git a/node/modules/storage.go b/node/modules/storage.go index cb30eb8c29d..4177b01343a 100644 --- a/node/modules/storage.go +++ b/node/modules/storage.go @@ -57,3 +57,19 @@ func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r return bds, nil } } + +func F3Datastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.F3DS, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + mds, err := r.Datastore(ctx, "/f3") + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return mds.Close() + }, + }) + + return mds, nil +} diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index 87dd2b05241..0aac15e96f5 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -16,6 +16,7 @@ type dsCtor func(path string, readonly bool) (datastore.Batching, error) var fsDatastores = map[string]dsCtor{ "metadata": levelDs, + "f3": levelDs, } // Helper badgerDs() and its imports are unused