Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions chain/lf3/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"runtime"
"sort"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand All @@ -29,11 +30,14 @@ var (
)

type ecWrapper struct {
chainStore *store.ChainStore
syncer *chain.Syncer
stateManager *stmgr.StateManager
cache *lru.TwoQueueCache[types.TipSetKey, gpbft.PowerEntries]
powerTableComputeSemaphore chan struct{}
chainStore *store.ChainStore
syncer *chain.Syncer
stateManager *stmgr.StateManager
cache *lru.TwoQueueCache[types.TipSetKey, gpbft.PowerEntries]

powerTableComputeLock sync.Mutex
powerTableComputeJobs map[types.TipSetKey]chan struct{}
powerTableComputeSema chan struct{}
}

func newEcWrapper(chainStore *store.ChainStore, syncer *chain.Syncer, stateManager *stmgr.StateManager) *ecWrapper {
Expand All @@ -42,11 +46,13 @@ func newEcWrapper(chainStore *store.ChainStore, syncer *chain.Syncer, stateManag
panic(err)
}
return &ecWrapper{
chainStore: chainStore,
syncer: syncer,
stateManager: stateManager,
cache: cache,
powerTableComputeSemaphore: make(chan struct{}, min(4, runtime.NumCPU()/2)),
chainStore: chainStore,
syncer: syncer,
stateManager: stateManager,
cache: cache,

powerTableComputeJobs: make(map[types.TipSetKey]chan struct{}),
powerTableComputeSema: make(chan struct{}, min(4, runtime.NumCPU()/2)),
}
}

Expand Down Expand Up @@ -137,26 +143,53 @@ func (ec *ecWrapper) GetPowerTable(ctx context.Context, tskF3 gpbft.TipSetKey) (
}

func (ec *ecWrapper) getPowerTableLotusTSK(ctx context.Context, tsk types.TipSetKey) (gpbft.PowerEntries, error) {
{
// Either wait for someone else to compute the power table, or claim the job.
for {
// check the cache
pe, ok := ec.cache.Get(tsk)
if ok {
return pe, nil
}
// take the semaphore
ec.powerTableComputeLock.Lock()
waitCh, ok := ec.powerTableComputeJobs[tsk]
if !ok {
break
}
ec.powerTableComputeLock.Unlock()
select {
case ec.powerTableComputeSemaphore <- struct{}{}:
case <-waitCh:
continue
case <-ctx.Done():
return nil, ctx.Err()
}
defer func() { <-ec.powerTableComputeSemaphore }()
}

// check the cache again
pe, ok = ec.cache.Get(tsk)
if ok {
return pe, nil
}
// Ok, we have the lock and nobody else has claimed the job. Claim it.
myWaitCh := make(chan struct{})
ec.powerTableComputeJobs[tsk] = myWaitCh
ec.powerTableComputeLock.Unlock()

// Make sure to "unlock" the job when we're done, even if we don't complete it. Someone else
// will complete it in that case.
defer func() {
ec.powerTableComputeLock.Lock()
delete(ec.powerTableComputeJobs, tsk)
ec.powerTableComputeLock.Unlock()

close(myWaitCh)
}()

// Then wait in line. We only allow 4 jobs at once.
select {
case ec.powerTableComputeSema <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
defer func() {
<-ec.powerTableComputeSema
}()

// Finally, do the actual compute.
ts, err := ec.chainStore.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("getting tipset by key for get parent: %w", err)
Expand Down
Loading