Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/transport #5984

Merged
merged 3 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*types.Messag
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]types.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.RLock()
mset, ok := mp.pending[from]
mset, ok, err := mp.getPendingMset(ctx, from)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok {
msgs = make([]*types.Message, 0, len(mset.msgs))
for _, sm := range mset.msgs {
msgs = append(msgs, &sm.Message)
}
Expand Down Expand Up @@ -61,7 +66,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
if !ok {
mmap = make(map[uint64]*types.Message)
msgMap[m.From] = mmap
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok {
count += len(mset.msgs)
for _, sm := range mset.msgs {
Expand Down Expand Up @@ -141,7 +150,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
st, ok := state[m.From]
if !ok {
mp.lk.RLock()
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
Expand Down
8 changes: 8 additions & 0 deletions pkg/messagepool/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ func (mp *MessagePool) GasEstimateMessageGas(ctx context.Context, estimateMessag
gasLimitOverestimation = estimateMessage.Spec.GasOverEstimation
}
estimateMessage.Msg.GasLimit = int64(float64(gasLimit) * gasLimitOverestimation)
// Gas overestimation can cause us to exceed the block gas limit, cap it.
if estimateMessage.Msg.GasLimit > constants.BlockGasLimit {
estimateMessage.Msg.GasLimit = constants.BlockGasLimit
}
}

if estimateMessage.Msg.GasPremium == types.EmptyInt || types.BigCmp(estimateMessage.Msg.GasPremium, types.NewInt(0)) == 0 {
Expand Down Expand Up @@ -441,6 +445,10 @@ func (mp *MessagePool) GasBatchEstimateMessageGas(ctx context.Context, estimateM
continue
}
estimateMsg.GasLimit = int64(float64(gasUsed) * estimateMessage.Spec.GasOverEstimation)
// Gas overestimation can cause us to exceed the block gas limit, cap it.
if estimateMsg.GasLimit > constants.BlockGasLimit {
estimateMsg.GasLimit = constants.BlockGasLimit
}
}

if estimateMsg.GasPremium == types.EmptyInt || types.BigCmp(estimateMsg.GasPremium, types.NewInt(0)) == 0 {
Expand Down
100 changes: 78 additions & 22 deletions pkg/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type MessagePool struct {

sigValCache *lru.TwoQueueCache[string, struct{}]

stateNonceCache *lru.Cache[stateNonceCacheKey, uint64]

evtTypes [3]journal.EventType
journal journal.Journal

Expand All @@ -182,6 +184,11 @@ type MessagePool struct {
PriceCache *GasPriceCache
}

type stateNonceCacheKey struct {
tsk types.TipSetKey
addr address.Address
}

func newDefaultMaxFeeFunc(maxFee types.FIL) DefaultMaxFeeFunc {
return func() (out abi.TokenAmount, err error) {
out = abi.TokenAmount{Int: maxFee.Int}
Expand Down Expand Up @@ -391,6 +398,7 @@ func New(ctx context.Context,
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](constants.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q[string, struct{}](constants.VerifSigCacheSize)
keycache, _ := lru.New[address.Address, address.Address](1_000_000)
stateNonceCache, _ := lru.New[stateNonceCacheKey, uint64](32768) // 32k * ~200 bytes = 6MB

cfg, err := loadConfig(ctx, ds)
if err != nil {
Expand All @@ -404,25 +412,26 @@ func New(ctx context.Context,
setRepublishInterval(networkParams.PropagationDelaySecs)

mp := &MessagePool{
ds: ds,
addSema: make(chan struct{}, 1),
closer: make(chan struct{}),
repubTk: constants.Clock.Ticker(RepublishInterval),
repubTrigger: make(chan struct{}, 1),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
keyCache: keycache,
minGasPrice: big.NewInt(0),
pruneTrigger: make(chan struct{}, 1),
pruneCooldown: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
sm: sm,
netName: netName,
cfg: cfg,
ds: ds,
addSema: make(chan struct{}, 1),
closer: make(chan struct{}),
repubTk: constants.Clock.Ticker(RepublishInterval),
repubTrigger: make(chan struct{}, 1),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
keyCache: keycache,
minGasPrice: big.NewInt(0),
pruneTrigger: make(chan struct{}, 1),
pruneCooldown: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
stateNonceCache: stateNonceCache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
sm: sm,
netName: netName,
cfg: cfg,
evtTypes: [...]journal.EventType{
evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"),
Expand Down Expand Up @@ -472,6 +481,14 @@ func New(ctx context.Context,
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
//if addr is not an ID addr, then it is already resolved to a key
if addr.Protocol() != address.ID {
return addr, nil
}
return mp.resolveToKeyFromID(ctx, addr)
}

func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, ok := mp.keyCache.Get(addr)
if ok {
Expand All @@ -486,7 +503,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (

// place both entries in the cache (may both be key addresses, which is fine)
mp.keyCache.Add(addr, ka)
mp.keyCache.Add(ka, ka)

return ka, nil
}
Expand Down Expand Up @@ -1123,12 +1139,52 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address,
}

func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, curTS *types.TipSet) (uint64, error) {
act, err := mp.api.GetActorAfter(ctx, addr, curTS)
nk := stateNonceCacheKey{
tsk: curTS.Key(),
addr: addr,
}

n, ok := mp.stateNonceCache.Get(nk)
if ok {
return n, nil
}

// get the nonce from the actor before ts
actor, err := mp.api.GetActorBefore(addr, curTS)
if err != nil {
return 0, err
}
nextNonce := actor.Nonce

raddr, err := mp.resolveToKey(ctx, addr)
if err != nil {
return 0, err
}

return act.Nonce, nil
// loop over all messages sent by 'addr' and find the highest nonce
messages, err := mp.api.MessagesForTipset(ctx, curTS)
if err != nil {
return 0, err
}
for _, message := range messages {
msg := message.VMMessage()

maddr, err := mp.resolveToKey(ctx, msg.From)
if err != nil {
log.Warnf("failed to resolve message from address: %s", err)
continue
}

if maddr == raddr {
if n := msg.Nonce + 1; n > nextNonce {
nextNonce = n
}
}
}

mp.stateNonceCache.Add(nk, nextNonce)

return nextNonce, nil
}

func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (big.Int, error) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ func (tma *testMpoolAPI) PubSubPublish(context.Context, string, []byte) error {
return nil
}

func (tma *testMpoolAPI) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
balance, ok := tma.balance[addr]
if !ok {
balance = types.NewInt(1000e6)
tma.balance[addr] = balance
}

nonce := tma.statenonce[addr]

return &types.Actor{
Code: builtin2.AccountActorCodeID,
Nonce: nonce,
Balance: balance,
}, nil
}

func (tma *testMpoolAPI) GetActorAfter(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) {
// regression check for load bug
if ts == nil {
Expand Down
50 changes: 39 additions & 11 deletions pkg/messagepool/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Provider interface {
SubscribeHeadChanges(context.Context, func(rev, app []*types.TipSet) error) *types.TipSet
PutMessage(context.Context, types.ChainMsg) (cid.Cid, error)
PubSubPublish(context.Context, string, []byte) error
GetActorBefore(address.Address, *types.TipSet) (*types.Actor, error)
GetActorAfter(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version
Expand Down Expand Up @@ -72,6 +73,22 @@ func (mpp *mpoolProvider) IsLite() bool {
return mpp.lite != nil
}

func (mpp *mpoolProvider) getActorLite(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) {
if !mpp.IsLite() {
return nil, errors.New("should not use getActorLite on non lite Provider")
}
n, err := mpp.lite.GetNonce(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting nonce over lite: %w", err)
}
a, err := mpp.lite.GetActor(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting actor over lite: %w", err)
}
a.Nonce = n
return a, nil
}

func (mpp *mpoolProvider) SubscribeHeadChanges(ctx context.Context, cb func(rev, app []*types.TipSet) error) *types.TipSet {
mpp.sm.SubscribeHeadChanges(
chain.WrapHeadChangeCoalescer(
Expand Down Expand Up @@ -99,18 +116,29 @@ func (mpp *mpoolProvider) PubSubPublish(ctx context.Context, k string, v []byte)
return mpp.ps.Publish(k, v) // nolint
}

func (mpp *mpoolProvider) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
ctx := context.TODO()

if mpp.IsLite() {
return mpp.getActorLite(ctx, addr, ts)
}

_, st, err := mpp.stmgr.ParentState(ctx, ts)
if err != nil {
return nil, fmt.Errorf("computing tipset state for GetActor: %v", err)
}

act, found, err := st.GetActor(ctx, addr)
if !found {
err = types.ErrActorNotFound
}

return act, err
}

func (mpp *mpoolProvider) GetActorAfter(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) {
if mpp.IsLite() {
n, err := mpp.lite.GetNonce(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting nonce over lite: %w", err)
}
a, err := mpp.lite.GetActor(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting actor over lite: %w", err)
}
a.Nonce = n
return a, nil
return mpp.getActorLite(context.TODO(), addr, ts)
}

st, err := mpp.stmgr.TipsetState(ctx, ts)
Expand All @@ -120,7 +148,7 @@ func (mpp *mpoolProvider) GetActorAfter(ctx context.Context, addr address.Addres

act, found, err := st.GetActor(ctx, addr)
if !found {
err = errors.New("actor not found")
err = types.ErrActorNotFound
}

return act, err
Expand Down
12 changes: 7 additions & 5 deletions pkg/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTSLk.RLock()
ts := mp.curTS
mp.curTSLk.RUnlock()

baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
mp.curTSLk.RUnlock()
if err != nil {
return fmt.Errorf("computing basefee: %v", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.curTSLk.Lock()

mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
mp.lk.Unlock()

mp.lk.RLock()
for actor := range mp.localAddrs {
mset, ok := mp.pending[actor]
if !ok {
Expand All @@ -49,8 +52,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
}
pending[actor] = pend
}
mp.lk.Unlock()
mp.curTSLk.Unlock()
mp.lk.RUnlock()

if len(pending) == 0 {
return nil
Expand Down Expand Up @@ -173,8 +175,8 @@ LOOP:
republished[m.Cid()] = struct{}{}
}

mp.lk.Lock()
// update the republished set so that we can trigger early republish from head changes
mp.lk.Lock()
mp.republished = republished
mp.lk.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions pkg/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type msgChain struct {
}

func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
mp.curTSLk.Lock()
defer mp.curTSLk.Unlock()
mp.curTSLk.RLock()
defer mp.curTSLk.RUnlock()

//TODO confirm if we can switch to RLock here for performance
mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

// See if we need to prune before selection; excessive buildup can lead to slow selection,
// so prune if we have too many messages (ignoring the cooldown).
Expand Down