diff --git a/chain/lf3/ec.go b/chain/lf3/ec.go index 85c90e4d10c..38812886c1d 100644 --- a/chain/lf3/ec.go +++ b/chain/lf3/ec.go @@ -4,6 +4,7 @@ import ( "context" "runtime" "sort" + "sync" "time" lru "github.com/hashicorp/golang-lru/v2" @@ -29,11 +30,14 @@ var ( ) type ecWrapper struct { - chainStore *store.ChainStore - syncer *chain.Syncer - stateManager *stmgr.StateManager - cache *lru.TwoQueueCache[types.TipSetKey, gpbft.PowerEntries] - powerTableComputeSemaphore chan struct{} + chainStore *store.ChainStore + syncer *chain.Syncer + stateManager *stmgr.StateManager + cache *lru.TwoQueueCache[types.TipSetKey, gpbft.PowerEntries] + + powerTableComputeLock sync.Mutex + powerTableComputeJobs map[types.TipSetKey]chan struct{} + powerTableComputeSema chan struct{} } func newEcWrapper(chainStore *store.ChainStore, syncer *chain.Syncer, stateManager *stmgr.StateManager) *ecWrapper { @@ -42,11 +46,13 @@ func newEcWrapper(chainStore *store.ChainStore, syncer *chain.Syncer, stateManag panic(err) } return &ecWrapper{ - chainStore: chainStore, - syncer: syncer, - stateManager: stateManager, - cache: cache, - powerTableComputeSemaphore: make(chan struct{}, min(4, runtime.NumCPU()/2)), + chainStore: chainStore, + syncer: syncer, + stateManager: stateManager, + cache: cache, + + powerTableComputeJobs: make(map[types.TipSetKey]chan struct{}), + powerTableComputeSema: make(chan struct{}, min(4, runtime.NumCPU()/2)), } } @@ -137,26 +143,53 @@ func (ec *ecWrapper) GetPowerTable(ctx context.Context, tskF3 gpbft.TipSetKey) ( } func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) { - { + // Either wait for someone else to compute the power table, or claim the job. + for { // check the cache pe, ok := ec.cache.Get(tsk) if ok { return pe, nil } - // take the semaphore + ec.powerTableComputeLock.Lock() + waitCh, ok := ec.powerTableComputeJobs[tsk] + if !ok { + break + } + ec.powerTableComputeLock.Unlock() select { - case ec.powerTableComputeSemaphore <- struct{}{}: + case <-waitCh: + continue case <-ctx.Done(): return nil, ctx.Err() } - defer func() { <-ec.powerTableComputeSemaphore }() + } - // check the cache again - pe, ok = ec.cache.Get(tsk) - if ok { - return pe, nil - } + // Ok, we have the lock and nobody else has claimed the job. Claim it. + myWaitCh := make(chan struct{}) + ec.powerTableComputeJobs[tsk] = myWaitCh + ec.powerTableComputeLock.Unlock() + + // Make sure to "unlock" the job when we're done, even if we don't complete it. Someone else + // will complete it in that case. + defer func() { + ec.powerTableComputeLock.Lock() + delete(ec.powerTableComputeJobs, tsk) + ec.powerTableComputeLock.Unlock() + + close(myWaitCh) + }() + + // Then wait in line. We only allow 4 jobs at once. + select { + case ec.powerTableComputeSema <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() } + defer func() { + <-ec.powerTableComputeSema + }() + + // Finally, do the actual compute. ts, err := ec.chainStore.GetTipSetFromKey(ctx, tsk) if err != nil { return nil, xerrors.Errorf("getting tipset by key for get parent: %w", err)