Skip to content

Commit

Permalink
perf: mempool: lower priority optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed May 23, 2023
1 parent e6bfc13 commit 336826b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
19 changes: 16 additions & 3 deletions pkg/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*types.Messag
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]types.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.RLock()
mset, ok := mp.pending[from]
mset, ok, err := mp.getPendingMset(ctx, from)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok {
msgs = make([]*types.Message, 0, len(mset.msgs))
for _, sm := range mset.msgs {
msgs = append(msgs, &sm.Message)
}
Expand Down Expand Up @@ -61,7 +66,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
if !ok {
mmap = make(map[uint64]*types.Message)
msgMap[m.From] = mmap
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok {
count += len(mset.msgs)
for _, sm := range mset.msgs {
Expand Down Expand Up @@ -141,7 +150,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
st, ok := state[m.From]
if !ok {
mp.lk.RLock()
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
Expand Down
9 changes: 8 additions & 1 deletion pkg/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@ func New(ctx context.Context,
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
//if addr is not an ID addr, then it is already resolved to a key
if addr.Protocol() != address.ID {
return addr, nil
}
return mp.resolveToKeyFromID(ctx, addr)
}

func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, ok := mp.keyCache.Get(addr)
if ok {
Expand All @@ -495,7 +503,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (

// place both entries in the cache (may both be key addresses, which is fine)
mp.keyCache.Add(addr, ka)
mp.keyCache.Add(ka, ka)

return ka, nil
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTSLk.RLock()
ts := mp.curTS
mp.curTSLk.RUnlock()

baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
mp.curTSLk.RUnlock()
if err != nil {
return fmt.Errorf("computing basefee: %v", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.curTSLk.Lock()

mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
mp.lk.Unlock()

mp.lk.RLock()
for actor := range mp.localAddrs {
mset, ok := mp.pending[actor]
if !ok {
Expand All @@ -49,8 +52,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
}
pending[actor] = pend
}
mp.lk.Unlock()
mp.curTSLk.Unlock()
mp.lk.RUnlock()

if len(pending) == 0 {
return nil
Expand Down Expand Up @@ -173,8 +175,8 @@ LOOP:
republished[m.Cid()] = struct{}{}
}

mp.lk.Lock()
// update the republished set so that we can trigger early republish from head changes
mp.lk.Lock()
mp.republished = republished
mp.lk.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions pkg/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type msgChain struct {
}

func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
mp.curTSLk.Lock()
defer mp.curTSLk.Unlock()
mp.curTSLk.RLock()
defer mp.curTSLk.RUnlock()

//TODO confirm if we can switch to RLock here for performance
mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

// See if we need to prune before selection; excessive buildup can lead to slow selection,
// so prune if we have too many messages (ignoring the cooldown).
Expand Down

0 comments on commit 336826b

Please sign in to comment.