diff --git a/CHANGELOG.md b/CHANGELOG.md index 006b634110c..49e5cb779f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - feat: fall back to EC finalized tipset if F3 is too far behind in eth APIs ([filecoin-project/lotus#13070](https://github.com/filecoin-project/lotus/pull/13070)) - feat: expose `/v2` APIs through Lotus Gateway ([filecoin-project/lotus#13075](https://github.com/filecoin-project/lotus/pull/13075)) - chore: upgrade to go-f3 `v0.8.4` ([filecoin-project/lotus#13084](https://github.com/filecoin-project/lotus/pull/13084)) +- fix(f3): limit the concurrency of F3 power table calculation ([filecoin-project/lotus#13085](https://github.com/filecoin-project/lotus/pull/13085)) ### Experimental v2 APIs with F3 awareness diff --git a/chain/lf3/ec.go b/chain/lf3/ec.go index 2ad23658b5b..85c90e4d10c 100644 --- a/chain/lf3/ec.go +++ b/chain/lf3/ec.go @@ -2,9 +2,11 @@ package lf3 import ( "context" + "runtime" "sort" "time" + lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -27,9 +29,25 @@ var ( ) type ecWrapper struct { - ChainStore *store.ChainStore - Syncer *chain.Syncer - StateManager *stmgr.StateManager + chainStore *store.ChainStore + syncer *chain.Syncer + stateManager *stmgr.StateManager + cache *lru.TwoQueueCache[types.TipSetKey, gpbft.PowerEntries] + powerTableComputeSemaphore chan struct{} +} + +func newEcWrapper(chainStore *store.ChainStore, syncer *chain.Syncer, stateManager *stmgr.StateManager) *ecWrapper { + cache, err := lru.New2Q[types.TipSetKey, gpbft.PowerEntries](128) + if err != nil { + panic(err) + } + return &ecWrapper{ + chainStore: chainStore, + syncer: syncer, + stateManager: stateManager, + cache: cache, + powerTableComputeSemaphore: make(chan struct{}, min(4, runtime.NumCPU()/2)), + } } type f3TipSet struct { @@ -74,7 +92,7 @@ func (ts *f3TipSet) Timestamp() time.Time { // GetTipsetByEpoch should return a tipset before the one requested if the requested // tipset does not exist due to null epochs func (ec *ecWrapper) GetTipsetByEpoch(ctx context.Context, epoch int64) (ec.TipSet, error) { - ts, err := ec.ChainStore.GetTipsetByHeight(ctx, abi.ChainEpoch(epoch), nil, true) + ts, err := ec.chainStore.GetTipsetByHeight(ctx, abi.ChainEpoch(epoch), nil, true) if err != nil { return nil, xerrors.Errorf("getting tipset by height: %w", err) } @@ -91,7 +109,7 @@ func (ec *ecWrapper) GetTipset(ctx context.Context, tsk gpbft.TipSetKey) (ec.Tip } func (ec *ecWrapper) GetHead(context.Context) (ec.TipSet, error) { - head := ec.ChainStore.GetHeaviestTipSet() + head := ec.chainStore.GetHeaviestTipSet() if head == nil { return nil, xerrors.New("no heaviest tipset") } @@ -103,7 +121,7 @@ func (ec *ecWrapper) GetParent(ctx context.Context, tsF3 ec.TipSet) (ec.TipSet, if err != nil { return nil, err } - parentTs, err := ec.ChainStore.GetTipSetFromKey(ctx, ts.Parents()) + parentTs, err := ec.chainStore.GetTipSetFromKey(ctx, ts.Parents()) if err != nil { return nil, xerrors.Errorf("getting parent tipset: %w", err) } @@ -119,12 +137,32 @@ func (ec *ecWrapper) GetPowerTable(ctx context.Context, tskF3 gpbft.TipSetKey) ( } func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) { - ts, err := ec.ChainStore.GetTipSetFromKey(ctx, tsk) + { + // check the cache + pe, ok := ec.cache.Get(tsk) + if ok { + return pe, nil + } + // take the semaphore + select { + case ec.powerTableComputeSemaphore <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + defer func() { <-ec.powerTableComputeSemaphore }() + + // check the cache again + pe, ok = ec.cache.Get(tsk) + if ok { + return pe, nil + } + } + ts, err := ec.chainStore.GetTipSetFromKey(ctx, tsk) if err != nil { return nil, xerrors.Errorf("getting tipset by key for get parent: %w", err) } - state, err := ec.StateManager.ParentState(ts) + state, err := ec.stateManager.ParentState(ts) if err != nil { return nil, xerrors.Errorf("loading the state tree: %w", err) } @@ -133,7 +171,7 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet return nil, xerrors.Errorf("getting the power actor: %w", err) } - powerState, err := power.Load(ec.ChainStore.ActorStore(ctx), powerAct) + powerState, err := power.Load(ec.chainStore.ActorStore(ctx), powerAct) if err != nil { return nil, xerrors.Errorf("loading power actor state: %w", err) } @@ -158,7 +196,7 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet if err != nil { return xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mstate, err := miner.Load(ec.ChainStore.ActorStore(ctx), act) + mstate, err := miner.Load(ec.chainStore.ActorStore(ctx), act) if err != nil { return xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -179,7 +217,7 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet return nil } - waddr, err := vm.ResolveToDeterministicAddr(state, ec.ChainStore.ActorStore(ctx), info.Worker) + waddr, err := vm.ResolveToDeterministicAddr(state, ec.chainStore.ActorStore(ctx), info.Worker) if err != nil { return xerrors.Errorf("resolve miner worker address: %w", err) } @@ -196,6 +234,8 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet } sort.Sort(powerEntries) + ec.cache.Add(tsk, powerEntries) + return powerEntries, nil } @@ -204,7 +244,7 @@ func (ec *ecWrapper) Finalize(ctx context.Context, key gpbft.TipSetKey) error { if err != nil { return err } - if err = ec.Syncer.SyncCheckpoint(ctx, tsk); err != nil { + if err = ec.syncer.SyncCheckpoint(ctx, tsk); err != nil { return xerrors.Errorf("checkpointing finalized tipset: %w", err) } return nil @@ -225,7 +265,7 @@ func (ec *ecWrapper) getTipSetFromF3TSK(ctx context.Context, key gpbft.TipSetKey if err != nil { return nil, err } - ts, err := ec.ChainStore.GetTipSetFromKey(ctx, tsk) + ts, err := ec.chainStore.GetTipSetFromKey(ctx, tsk) if err != nil { return nil, xerrors.Errorf("getting tipset from key: %w", err) } diff --git a/chain/lf3/f3.go b/chain/lf3/f3.go index 048f3028b73..776b26f0405 100644 --- a/chain/lf3/f3.go +++ b/chain/lf3/f3.go @@ -73,11 +73,7 @@ var log = logging.Logger("f3") func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) { ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3")) - ec := &ecWrapper{ - ChainStore: params.ChainStore, - StateManager: params.StateManager, - Syncer: params.Syncer, - } + ec := newEcWrapper(params.ChainStore, params.Syncer, params.StateManager) verif := blssig.VerifierWithKeyOnG1() f3FsPath := filepath.Join(params.LockedRepo.Path(), "f3")