Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- `lotus state active-sectors` now outputs CSV format and supports an optional `--show-partitions` to list active sector deadlines and partitions. ([filecoin-project/lotus#13152](https://github.com/filecoin-project/lotus/pull/13152))
- chore: generate v0 JSON RPC specification ([filecoin-project/lotus#13155](https://github.com/filecoin-project/lotus/pull/13155))
- deps(f3): bump go-f3 version to 0.8.6, move power table cache to go-f3 from lotus ([filecoin-project/lotus#13144](https://github.com/filecoin-project/lotus/pull/13144))
- feat(f3): move go-f3 datastore to separate leveldb instance ([filecoin-project/lotus#13174](https://github.com/filecoin-project/lotus/pull/13174))

# Node v1.33.0 / 2025-05-08
The Lotus v1.33.0 release introduces experimental v2 APIs with F3 awareness, featuring a new TipSet selection mechanism that significantly enhances how applications interact with the Filecoin blockchain. This release candidate also adds F3-aware Ethereum APIs via the /v2 endpoint. All of the /v2 APIs implement intelligent fallback mechanisms between F3 and Expected Consensus and are exposed through the Lotus Gateway.
Expand Down
116 changes: 105 additions & 11 deletions chain/lf3/f3.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package lf3

import (
"bytes"
"context"
"errors"
"fmt"
"path/filepath"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -56,32 +60,122 @@ type F3 struct {
type F3Params struct {
fx.In

PubSub *pubsub.PubSub
Host host.Host
ChainStore *store.ChainStore
Syncer *chain.Syncer
StateManager *stmgr.StateManager
Datastore dtypes.MetadataDS
Wallet api.Wallet
Config *Config
LockedRepo repo.LockedRepo
PubSub *pubsub.PubSub
Host host.Host
ChainStore *store.ChainStore
Syncer *chain.Syncer
StateManager *stmgr.StateManager
MetaDatastore dtypes.MetadataDS
F3Datastore dtypes.F3DS
Wallet api.Wallet
Config *Config
LockedRepo repo.LockedRepo

Net api.Net
}

var log = logging.Logger("f3")

var migrationKey = datastore.NewKey("/f3-migration/1")

func checkMigrationComplete(ctx context.Context, source datastore.Batching, target datastore.Batching) (bool, error) {
valSource, err := source.Get(ctx, migrationKey)
if errors.Is(err, datastore.ErrNotFound) {
log.Debug("migration not complete, no migration flag in source datastore")
return false, nil
}
if err != nil {
return false, err
}
valTarget, err := target.Get(ctx, migrationKey)
if errors.Is(err, datastore.ErrNotFound) {
log.Debug("migration not complete, no migration flag in target datastore")
return false, nil
}
if err != nil {
return false, err
}
log.Debugw("migration flags", "source", string(valSource), "target", string(valTarget))

// if the values are equal, the migration is complete
return bytes.Equal(valSource, valTarget), nil
}

// migrateDatastore can be removed once at least one network upgade passes
func migrateDatastore(ctx context.Context, source datastore.Batching, target datastore.Batching) error {
if complete, err := checkMigrationComplete(ctx, source, target); err != nil {
return xerrors.Errorf("checking if migration complete: %w", err)
} else if complete {
return nil
}

startDate := time.Now()
migrationVal := startDate.Format(time.RFC3339Nano)

if err := target.Put(ctx, migrationKey, []byte(migrationVal)); err != nil {
return xerrors.Errorf("putting migration flag in target datastore: %w", err)
}
// make sure the migration flag is not set in the source datastore
if err := source.Delete(ctx, migrationKey); err != nil && !errors.Is(err, datastore.ErrNotFound) {
return xerrors.Errorf("deleting migration flag in source datastore: %w", err)
}

log.Infow("starting migration of f3 datastore", "tag", migrationVal)
qr, err := source.Query(ctx, query.Query{})
if err != nil {
return xerrors.Errorf("starting source wildcard query: %w", err)
}

// batch size of 2000, at the time of writing, F3 datastore has 150,000 keys taking ~170MiB
// meaning that a batch of 2000 keys would be ~2MiB of memory
batch := autobatch.NewAutoBatching(target, 2000)
Comment thread
masih marked this conversation as resolved.
var numMigrated int
for ctx.Err() == nil {
res, ok := qr.NextSync()
if !ok {
break
}
if err := batch.Put(ctx, datastore.NewKey(res.Key), res.Value); err != nil {
_ = qr.Close()
Comment thread
Kubuxu marked this conversation as resolved.
return xerrors.Errorf("putting key %s in target datastore: %w", res.Key, err)
}
numMigrated++
}
if ctx.Err() != nil {
return ctx.Err()
}
if err := batch.Flush(ctx); err != nil {
return xerrors.Errorf("flushing batch: %w", err)
}
if err := qr.Close(); err != nil {
return xerrors.Errorf("closing query: %w", err)
}

// set migration flag in the source datastore to signify that the migration is complete
if err := source.Put(ctx, migrationKey, []byte(migrationVal)); err != nil {
return xerrors.Errorf("putting migration flag in source datastore: %w", err)
}
took := time.Since(startDate)
log.Infow("completed migration of f3 datastore", "tag", migrationVal, "tookSeconds", took.Seconds(), "numMigrated", numMigrated)

return nil
}

func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {

if params.Config.StaticManifest == nil {
return nil, fmt.Errorf("configuration invalid, nil StaticManifest in the Config")
}
ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))
metaDs := namespace.Wrap(params.MetaDatastore, datastore.NewKey("/f3"))
err := migrateDatastore(mctx, metaDs, params.F3Datastore)
if err != nil {
return nil, xerrors.Errorf("migrating datastore: %w", err)
}
ec := newEcWrapper(params.ChainStore, params.Syncer, params.StateManager)
verif := blssig.VerifierWithKeyOnG1()

f3FsPath := filepath.Join(params.LockedRepo.Path(), "f3")
module, err := f3.New(mctx, *params.Config.StaticManifest, ds,
module, err := f3.New(mctx, *params.Config.StaticManifest, params.F3Datastore,
params.Host, params.PubSub, verif, ec, f3FsPath)
if err != nil {
return nil, xerrors.Errorf("creating F3: %w", err)
Expand Down
1 change: 1 addition & 0 deletions node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ var ChainNode = Options(
ApplyIf(func(s *Settings) bool {
return build.IsF3Enabled() && !isLiteNode(s)
},
Override(new(dtypes.F3DS), modules.F3Datastore),
Override(new(*lf3.Config), lf3.NewConfig),
Override(new(lf3.F3Backend), lf3.New),
Override(new(full.F3ModuleAPI), From(new(full.F3API))),
Expand Down
3 changes: 3 additions & 0 deletions node/modules/dtypes/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
// main repo datastore.
type MetadataDS datastore.Batching

// F3DS stores F3 data. By default it's namespaced under /f3 in main repo datastore.
type F3DS datastore.Batching

type (
// UniversalBlockstore is the universal blockstore backend.
UniversalBlockstore blockstore.Blockstore
Expand Down
16 changes: 16 additions & 0 deletions node/modules/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,19 @@ func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r
return bds, nil
}
}

func F3Datastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.F3DS, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
mds, err := r.Datastore(ctx, "/f3")
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mds.Close()
},
})

return mds, nil
}
1 change: 1 addition & 0 deletions node/repo/fsrepo_ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type dsCtor func(path string, readonly bool) (datastore.Batching, error)

var fsDatastores = map[string]dsCtor{
"metadata": levelDs,
"f3": levelDs,
}

// Helper badgerDs() and its imports are unused
Expand Down