Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- feat: fall back to EC finalized tipset if F3 is too far behind in eth APIs ([filecoin-project/lotus#13070](https://github.com/filecoin-project/lotus/pull/13070))
- feat: expose `/v2` APIs through Lotus Gateway ([filecoin-project/lotus#13075](https://github.com/filecoin-project/lotus/pull/13075))
- chore: upgrade to go-f3 `v0.8.4` ([filecoin-project/lotus#13084](https://github.com/filecoin-project/lotus/pull/13084))
- fix(f3): limit the concurrency of F3 power table calculation ([filecoin-project/lotus#13085](https://github.com/filecoin-project/lotus/pull/13085))

### Experimental v2 APIs with F3 awareness

Expand Down
66 changes: 53 additions & 13 deletions chain/lf3/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package lf3

import (
"context"
"runtime"
"sort"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand All @@ -27,9 +29,25 @@ var (
)

type ecWrapper struct {
ChainStore *store.ChainStore
Syncer *chain.Syncer
StateManager *stmgr.StateManager
chainStore *store.ChainStore
syncer *chain.Syncer
stateManager *stmgr.StateManager
cache *lru.TwoQueueCache[types.TipSetKey, gpbft.PowerEntries]
powerTableComputeSemaphore chan struct{}
}

func newEcWrapper(chainStore *store.ChainStore, syncer *chain.Syncer, stateManager *stmgr.StateManager) *ecWrapper {
cache, err := lru.New2Q[types.TipSetKey, gpbft.PowerEntries](128)
if err != nil {
panic(err)
}
return &ecWrapper{
chainStore: chainStore,
syncer: syncer,
stateManager: stateManager,
cache: cache,
powerTableComputeSemaphore: make(chan struct{}, min(4, runtime.NumCPU()/2)),
}
}

type f3TipSet struct {
Expand Down Expand Up @@ -74,7 +92,7 @@ func (ts *f3TipSet) Timestamp() time.Time {
// GetTipsetByEpoch should return a tipset before the one requested if the requested
// tipset does not exist due to null epochs
func (ec *ecWrapper) GetTipsetByEpoch(ctx context.Context, epoch int64) (ec.TipSet, error) {
ts, err := ec.ChainStore.GetTipsetByHeight(ctx, abi.ChainEpoch(epoch), nil, true)
ts, err := ec.chainStore.GetTipsetByHeight(ctx, abi.ChainEpoch(epoch), nil, true)
if err != nil {
return nil, xerrors.Errorf("getting tipset by height: %w", err)
}
Expand All @@ -91,7 +109,7 @@ func (ec *ecWrapper) GetTipset(ctx context.Context, tsk gpbft.TipSetKey) (ec.Tip
}

func (ec *ecWrapper) GetHead(context.Context) (ec.TipSet, error) {
head := ec.ChainStore.GetHeaviestTipSet()
head := ec.chainStore.GetHeaviestTipSet()
if head == nil {
return nil, xerrors.New("no heaviest tipset")
}
Expand All @@ -103,7 +121,7 @@ func (ec *ecWrapper) GetParent(ctx context.Context, tsF3 ec.TipSet) (ec.TipSet,
if err != nil {
return nil, err
}
parentTs, err := ec.ChainStore.GetTipSetFromKey(ctx, ts.Parents())
parentTs, err := ec.chainStore.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return nil, xerrors.Errorf("getting parent tipset: %w", err)
}
Expand All @@ -119,12 +137,32 @@ func (ec *ecWrapper) GetPowerTable(ctx context.Context, tskF3 gpbft.TipSetKey) (
}

func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) {
ts, err := ec.ChainStore.GetTipSetFromKey(ctx, tsk)
{
// check the cache
pe, ok := ec.cache.Get(tsk)
if ok {
return pe, nil
}
// take the semaphore
select {
case ec.powerTableComputeSemaphore <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
defer func() { <-ec.powerTableComputeSemaphore }()

// check the cache again
pe, ok = ec.cache.Get(tsk)
if ok {
return pe, nil
}
}
ts, err := ec.chainStore.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("getting tipset by key for get parent: %w", err)
}

state, err := ec.StateManager.ParentState(ts)
state, err := ec.stateManager.ParentState(ts)
if err != nil {
return nil, xerrors.Errorf("loading the state tree: %w", err)
}
Expand All @@ -133,7 +171,7 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet
return nil, xerrors.Errorf("getting the power actor: %w", err)
}

powerState, err := power.Load(ec.ChainStore.ActorStore(ctx), powerAct)
powerState, err := power.Load(ec.chainStore.ActorStore(ctx), powerAct)
if err != nil {
return nil, xerrors.Errorf("loading power actor state: %w", err)
}
Expand All @@ -158,7 +196,7 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet
if err != nil {
return xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
}
mstate, err := miner.Load(ec.ChainStore.ActorStore(ctx), act)
mstate, err := miner.Load(ec.chainStore.ActorStore(ctx), act)
if err != nil {
return xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
}
Expand All @@ -179,7 +217,7 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet
return nil
}

waddr, err := vm.ResolveToDeterministicAddr(state, ec.ChainStore.ActorStore(ctx), info.Worker)
waddr, err := vm.ResolveToDeterministicAddr(state, ec.chainStore.ActorStore(ctx), info.Worker)
if err != nil {
return xerrors.Errorf("resolve miner worker address: %w", err)
}
Expand All @@ -196,6 +234,8 @@ func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSet
}

sort.Sort(powerEntries)
ec.cache.Add(tsk, powerEntries)

return powerEntries, nil
}

Expand All @@ -204,7 +244,7 @@ func (ec *ecWrapper) Finalize(ctx context.Context, key gpbft.TipSetKey) error {
if err != nil {
return err
}
if err = ec.Syncer.SyncCheckpoint(ctx, tsk); err != nil {
if err = ec.syncer.SyncCheckpoint(ctx, tsk); err != nil {
return xerrors.Errorf("checkpointing finalized tipset: %w", err)
}
return nil
Expand All @@ -225,7 +265,7 @@ func (ec *ecWrapper) getTipSetFromF3TSK(ctx context.Context, key gpbft.TipSetKey
if err != nil {
return nil, err
}
ts, err := ec.ChainStore.GetTipSetFromKey(ctx, tsk)
ts, err := ec.chainStore.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("getting tipset from key: %w", err)
}
Expand Down
6 changes: 1 addition & 5 deletions chain/lf3/f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ var log = logging.Logger("f3")

func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) {
ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3"))
ec := &ecWrapper{
ChainStore: params.ChainStore,
StateManager: params.StateManager,
Syncer: params.Syncer,
}
ec := newEcWrapper(params.ChainStore, params.Syncer, params.StateManager)
verif := blssig.VerifierWithKeyOnG1()

f3FsPath := filepath.Join(params.LockedRepo.Path(), "f3")
Expand Down
Loading