From b63caefa7d1e2052f2460e4dfdc3566a33e044c1 Mon Sep 17 00:00:00 2001 From: tanlang Date: Wed, 12 Apr 2023 14:47:28 +0000 Subject: [PATCH] feat: add actor upgrade function --- extern/filecoin-ffi | 2 +- pkg/fork/fork.go | 111 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 110 insertions(+), 3 deletions(-) diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 28e3cd44d9..cec06f7f9d 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 28e3cd44d91681c074aba362d1e5c954db110ad9 +Subproject commit cec06f7f9d4c8ffd38caf4f7ac97381b453575ef diff --git a/pkg/fork/fork.go b/pkg/fork/fork.go index 7071f5b2aa..fa4b635d7e 100644 --- a/pkg/fork/fork.go +++ b/pkg/fork/fork.go @@ -34,6 +34,7 @@ import ( "go.opencensus.io/trace" nv18 "github.com/filecoin-project/go-state-types/builtin/v10/migration" + nv19 "github.com/filecoin-project/go-state-types/builtin/v11/migration" nv17 "github.com/filecoin-project/go-state-types/builtin/v9/migration" "github.com/filecoin-project/go-state-types/migration" "github.com/filecoin-project/specs-actors/actors/migration/nv3" @@ -2407,14 +2408,14 @@ func (c *ChainFork) upgradeActorsV10Common( if stateRoot.Version != vmstate.StateTreeVersion4 { return cid.Undef, fmt.Errorf( - "expected state root version 4 for actors v9 upgrade, got %d", + "expected state root version 4 for actors v10 upgrade, got %d", stateRoot.Version, ) } manifest, ok := actors.GetManifest(actorstypes.Version10) if !ok { - return cid.Undef, fmt.Errorf("no manifest CID for v9 upgrade") + return cid.Undef, fmt.Errorf("no manifest CID for v10 upgrade") } // Perform the migration @@ -2448,6 +2449,112 @@ func (c *ChainFork) upgradeActorsV10Common( return newRoot, nil } +func (c *ChainFork) PreUpgradeActorsV11(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { + // Use half the CPUs for pre-migration, but leave at least 3. + workerCount := MigrationMaxWorkerCount + if workerCount <= 4 { + workerCount = 1 + } else { + workerCount /= 2 + } + + nv := c.GetNetworkVersion(ctx, epoch) + lbts, lbRoot, err := c.cr.GetLookbackTipSetForRound(ctx, ts, epoch, nv) + if err != nil { + return fmt.Errorf("error getting lookback ts for premigration: %w", err) + } + + config := migration.Config{ + MaxWorkers: uint(workerCount), + ProgressLogPeriod: time.Minute * 5, + } + + _, err = c.upgradeActorsV11Common(ctx, cache, lbRoot, epoch, lbts, config) + return err +} + +func (c *ChainFork) UpgradeActorsV11(ctx context.Context, cache MigrationCache, + root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { + // Use all the CPUs except 2. + workerCount := MigrationMaxWorkerCount - 3 + if workerCount <= 0 { + workerCount = 1 + } + config := migration.Config{ + MaxWorkers: uint(workerCount), + JobQueueSize: 1000, + ResultQueueSize: 100, + ProgressLogPeriod: 10 * time.Second, + } + newRoot, err := c.upgradeActorsV11Common(ctx, cache, root, epoch, ts, config) + if err != nil { + return cid.Undef, fmt.Errorf("migrating actors v11 state: %w", err) + } + return newRoot, nil +} + +func (c *ChainFork) upgradeActorsV11Common( + ctx context.Context, cache MigrationCache, + root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, + config migration.Config, +) (cid.Cid, error) { + buf := blockstoreutil.NewTieredBstore(c.bs, blockstoreutil.NewTemporarySync()) + store := adt.WrapStore(ctx, cbor.NewCborStore(buf)) + + // ensure that the manifest is loaded in the blockstore + if err := actors.LoadBundles(ctx, c.bs, actorstypes.Version11); err != nil { + return cid.Undef, fmt.Errorf("failed to load manifest bundle: %w", err) + } + + // Load the state root. + var stateRoot vmstate.StateRoot + if err := store.Get(ctx, root, &stateRoot); err != nil { + return cid.Undef, fmt.Errorf("failed to decode state root: %w", err) + } + + if stateRoot.Version != vmstate.StateTreeVersion5 { + return cid.Undef, fmt.Errorf( + "expected state root version 5 for actors v11 upgrade, got %d", + stateRoot.Version, + ) + } + + manifest, ok := actors.GetManifest(actorstypes.Version11) + if !ok { + return cid.Undef, fmt.Errorf("no manifest CID for v11 upgrade") + } + + // Perform the migration + newHamtRoot, err := nv19.MigrateStateTree(ctx, store, manifest, stateRoot.Actors, epoch, config, + migrationLogger{}, cache) + if err != nil { + return cid.Undef, fmt.Errorf("upgrading to actors v11: %w", err) + } + + // Persist the result. + newRoot, err := store.Put(ctx, &vmstate.StateRoot{ + Version: vmstate.StateTreeVersion5, + Actors: newHamtRoot, + Info: stateRoot.Info, + }) + if err != nil { + return cid.Undef, fmt.Errorf("failed to persist new state root: %w", err) + } + + // Persist the new tree. + + { + from := buf + to := buf.Read() + + if err := Copy(ctx, from, to, newRoot); err != nil { + return cid.Undef, fmt.Errorf("copying migrated tree: %w", err) + } + } + + return newRoot, nil +} + func (c *ChainFork) GetForkUpgrade() *config.ForkUpgradeConfig { return c.forkUpgrade }