Skip to content

Commit

Permalink
Merge pull request #5984 from filecoin-project/chore/transport
Browse files Browse the repository at this point in the history
Chore/transport
  • Loading branch information
diwufeiwen authored May 25, 2023
2 parents a758257 + 336826b commit cbae30f
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 45 deletions.
19 changes: 16 additions & 3 deletions pkg/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*types.Messag
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]types.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.RLock()
mset, ok := mp.pending[from]
mset, ok, err := mp.getPendingMset(ctx, from)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok {
msgs = make([]*types.Message, 0, len(mset.msgs))
for _, sm := range mset.msgs {
msgs = append(msgs, &sm.Message)
}
Expand Down Expand Up @@ -61,7 +66,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
if !ok {
mmap = make(map[uint64]*types.Message)
msgMap[m.From] = mmap
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok {
count += len(mset.msgs)
for _, sm := range mset.msgs {
Expand Down Expand Up @@ -141,7 +150,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
st, ok := state[m.From]
if !ok {
mp.lk.RLock()
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
mp.lk.RUnlock()
return nil, fmt.Errorf("errored while getting pending mset: %w", err)
}
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
Expand Down
8 changes: 8 additions & 0 deletions pkg/messagepool/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ func (mp *MessagePool) GasEstimateMessageGas(ctx context.Context, estimateMessag
gasLimitOverestimation = estimateMessage.Spec.GasOverEstimation
}
estimateMessage.Msg.GasLimit = int64(float64(gasLimit) * gasLimitOverestimation)
// Gas overestimation can cause us to exceed the block gas limit, cap it.
if estimateMessage.Msg.GasLimit > constants.BlockGasLimit {
estimateMessage.Msg.GasLimit = constants.BlockGasLimit
}
}

if estimateMessage.Msg.GasPremium == types.EmptyInt || types.BigCmp(estimateMessage.Msg.GasPremium, types.NewInt(0)) == 0 {
Expand Down Expand Up @@ -441,6 +445,10 @@ func (mp *MessagePool) GasBatchEstimateMessageGas(ctx context.Context, estimateM
continue
}
estimateMsg.GasLimit = int64(float64(gasUsed) * estimateMessage.Spec.GasOverEstimation)
// Gas overestimation can cause us to exceed the block gas limit, cap it.
if estimateMsg.GasLimit > constants.BlockGasLimit {
estimateMsg.GasLimit = constants.BlockGasLimit
}
}

if estimateMsg.GasPremium == types.EmptyInt || types.BigCmp(estimateMsg.GasPremium, types.NewInt(0)) == 0 {
Expand Down
100 changes: 78 additions & 22 deletions pkg/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type MessagePool struct {

sigValCache *lru.TwoQueueCache[string, struct{}]

stateNonceCache *lru.Cache[stateNonceCacheKey, uint64]

evtTypes [3]journal.EventType
journal journal.Journal

Expand All @@ -182,6 +184,11 @@ type MessagePool struct {
PriceCache *GasPriceCache
}

type stateNonceCacheKey struct {
tsk types.TipSetKey
addr address.Address
}

func newDefaultMaxFeeFunc(maxFee types.FIL) DefaultMaxFeeFunc {
return func() (out abi.TokenAmount, err error) {
out = abi.TokenAmount{Int: maxFee.Int}
Expand Down Expand Up @@ -391,6 +398,7 @@ func New(ctx context.Context,
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](constants.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q[string, struct{}](constants.VerifSigCacheSize)
keycache, _ := lru.New[address.Address, address.Address](1_000_000)
stateNonceCache, _ := lru.New[stateNonceCacheKey, uint64](32768) // 32k * ~200 bytes = 6MB

cfg, err := loadConfig(ctx, ds)
if err != nil {
Expand All @@ -404,25 +412,26 @@ func New(ctx context.Context,
setRepublishInterval(networkParams.PropagationDelaySecs)

mp := &MessagePool{
ds: ds,
addSema: make(chan struct{}, 1),
closer: make(chan struct{}),
repubTk: constants.Clock.Ticker(RepublishInterval),
repubTrigger: make(chan struct{}, 1),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
keyCache: keycache,
minGasPrice: big.NewInt(0),
pruneTrigger: make(chan struct{}, 1),
pruneCooldown: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
sm: sm,
netName: netName,
cfg: cfg,
ds: ds,
addSema: make(chan struct{}, 1),
closer: make(chan struct{}),
repubTk: constants.Clock.Ticker(RepublishInterval),
repubTrigger: make(chan struct{}, 1),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
keyCache: keycache,
minGasPrice: big.NewInt(0),
pruneTrigger: make(chan struct{}, 1),
pruneCooldown: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
stateNonceCache: stateNonceCache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
sm: sm,
netName: netName,
cfg: cfg,
evtTypes: [...]journal.EventType{
evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"),
Expand Down Expand Up @@ -472,6 +481,14 @@ func New(ctx context.Context,
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
//if addr is not an ID addr, then it is already resolved to a key
if addr.Protocol() != address.ID {
return addr, nil
}
return mp.resolveToKeyFromID(ctx, addr)
}

func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, ok := mp.keyCache.Get(addr)
if ok {
Expand All @@ -486,7 +503,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (

// place both entries in the cache (may both be key addresses, which is fine)
mp.keyCache.Add(addr, ka)
mp.keyCache.Add(ka, ka)

return ka, nil
}
Expand Down Expand Up @@ -1123,12 +1139,52 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address,
}

func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, curTS *types.TipSet) (uint64, error) {
act, err := mp.api.GetActorAfter(ctx, addr, curTS)
nk := stateNonceCacheKey{
tsk: curTS.Key(),
addr: addr,
}

n, ok := mp.stateNonceCache.Get(nk)
if ok {
return n, nil
}

// get the nonce from the actor before ts
actor, err := mp.api.GetActorBefore(addr, curTS)
if err != nil {
return 0, err
}
nextNonce := actor.Nonce

raddr, err := mp.resolveToKey(ctx, addr)
if err != nil {
return 0, err
}

return act.Nonce, nil
// loop over all messages sent by 'addr' and find the highest nonce
messages, err := mp.api.MessagesForTipset(ctx, curTS)
if err != nil {
return 0, err
}
for _, message := range messages {
msg := message.VMMessage()

maddr, err := mp.resolveToKey(ctx, msg.From)
if err != nil {
log.Warnf("failed to resolve message from address: %s", err)
continue
}

if maddr == raddr {
if n := msg.Nonce + 1; n > nextNonce {
nextNonce = n
}
}
}

mp.stateNonceCache.Add(nk, nextNonce)

return nextNonce, nil
}

func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (big.Int, error) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ func (tma *testMpoolAPI) PubSubPublish(context.Context, string, []byte) error {
return nil
}

func (tma *testMpoolAPI) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
balance, ok := tma.balance[addr]
if !ok {
balance = types.NewInt(1000e6)
tma.balance[addr] = balance
}

nonce := tma.statenonce[addr]

return &types.Actor{
Code: builtin2.AccountActorCodeID,
Nonce: nonce,
Balance: balance,
}, nil
}

func (tma *testMpoolAPI) GetActorAfter(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) {
// regression check for load bug
if ts == nil {
Expand Down
50 changes: 39 additions & 11 deletions pkg/messagepool/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Provider interface {
SubscribeHeadChanges(context.Context, func(rev, app []*types.TipSet) error) *types.TipSet
PutMessage(context.Context, types.ChainMsg) (cid.Cid, error)
PubSubPublish(context.Context, string, []byte) error
GetActorBefore(address.Address, *types.TipSet) (*types.Actor, error)
GetActorAfter(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version
Expand Down Expand Up @@ -72,6 +73,22 @@ func (mpp *mpoolProvider) IsLite() bool {
return mpp.lite != nil
}

func (mpp *mpoolProvider) getActorLite(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) {
if !mpp.IsLite() {
return nil, errors.New("should not use getActorLite on non lite Provider")
}
n, err := mpp.lite.GetNonce(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting nonce over lite: %w", err)
}
a, err := mpp.lite.GetActor(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting actor over lite: %w", err)
}
a.Nonce = n
return a, nil
}

func (mpp *mpoolProvider) SubscribeHeadChanges(ctx context.Context, cb func(rev, app []*types.TipSet) error) *types.TipSet {
mpp.sm.SubscribeHeadChanges(
chain.WrapHeadChangeCoalescer(
Expand Down Expand Up @@ -99,18 +116,29 @@ func (mpp *mpoolProvider) PubSubPublish(ctx context.Context, k string, v []byte)
return mpp.ps.Publish(k, v) // nolint
}

func (mpp *mpoolProvider) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
ctx := context.TODO()

if mpp.IsLite() {
return mpp.getActorLite(ctx, addr, ts)
}

_, st, err := mpp.stmgr.ParentState(ctx, ts)
if err != nil {
return nil, fmt.Errorf("computing tipset state for GetActor: %v", err)
}

act, found, err := st.GetActor(ctx, addr)
if !found {
err = types.ErrActorNotFound
}

return act, err
}

func (mpp *mpoolProvider) GetActorAfter(ctx context.Context, addr address.Address, ts *types.TipSet) (*types.Actor, error) {
if mpp.IsLite() {
n, err := mpp.lite.GetNonce(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting nonce over lite: %w", err)
}
a, err := mpp.lite.GetActor(ctx, addr, ts.Key())
if err != nil {
return nil, fmt.Errorf("getting actor over lite: %w", err)
}
a.Nonce = n
return a, nil
return mpp.getActorLite(context.TODO(), addr, ts)
}

st, err := mpp.stmgr.TipsetState(ctx, ts)
Expand All @@ -120,7 +148,7 @@ func (mpp *mpoolProvider) GetActorAfter(ctx context.Context, addr address.Addres

act, found, err := st.GetActor(ctx, addr)
if !found {
err = errors.New("actor not found")
err = types.ErrActorNotFound
}

return act, err
Expand Down
12 changes: 7 additions & 5 deletions pkg/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTSLk.RLock()
ts := mp.curTS
mp.curTSLk.RUnlock()

baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
mp.curTSLk.RUnlock()
if err != nil {
return fmt.Errorf("computing basefee: %v", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.curTSLk.Lock()

mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
mp.lk.Unlock()

mp.lk.RLock()
for actor := range mp.localAddrs {
mset, ok := mp.pending[actor]
if !ok {
Expand All @@ -49,8 +52,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
}
pending[actor] = pend
}
mp.lk.Unlock()
mp.curTSLk.Unlock()
mp.lk.RUnlock()

if len(pending) == 0 {
return nil
Expand Down Expand Up @@ -173,8 +175,8 @@ LOOP:
republished[m.Cid()] = struct{}{}
}

mp.lk.Lock()
// update the republished set so that we can trigger early republish from head changes
mp.lk.Lock()
mp.republished = republished
mp.lk.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions pkg/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type msgChain struct {
}

func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
mp.curTSLk.Lock()
defer mp.curTSLk.Unlock()
mp.curTSLk.RLock()
defer mp.curTSLk.RUnlock()

//TODO confirm if we can switch to RLock here for performance
mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

// See if we need to prune before selection; excessive buildup can lead to slow selection,
// so prune if we have too many messages (ignoring the cooldown).
Expand Down

0 comments on commit cbae30f

Please sign in to comment.