From 3a9e058ad09fff4b24f8de8f0f2d7388ec20f5f5 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Thu, 9 Mar 2023 08:50:34 -0500 Subject: [PATCH 1/5] updates --- types/mempool/priority_nonce.go | 138 +++++++++++++++++---------- types/mempool/priority_nonce_test.go | 40 ++++---- 2 files changed, 111 insertions(+), 67 deletions(-) diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index 20e331bf194a..8fe9a63e1404 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -3,7 +3,6 @@ package mempool import ( "context" "fmt" - "math" "github.com/huandu/skiplist" @@ -16,6 +15,40 @@ var ( _ Iterator = (*PriorityNonceIterator)(nil) ) +type TxPriority struct { + // GetTxPriority returns the priority of the transaction. A priority must be + // comparable via CompareTxPriority. + GetTxPriority func(ctx context.Context, tx sdk.Tx) any + // CompareTxPriority compares two transaction priorities. The result should be + // 0 if a == b, -1 if a < b, and +1 if a > b. + CompareTxPriority func(a, b any) int +} + +// NewDefaultTxPriority returns a TxPriority comparator using ctx.Priority as +// the defining transaction priority. +func NewDefaultTxPriority() TxPriority { + return TxPriority{ + GetTxPriority: func(goCtx context.Context, tx sdk.Tx) any { + return sdk.UnwrapSDKContext(goCtx).Priority() + }, + CompareTxPriority: func(a, b any) int { + switch { + case a == nil && b == nil: + return 0 + case a == nil: + return -1 + case b == nil: + return 1 + default: + aPriority := a.(int64) + bPriority := b.(int64) + + return skiplist.Int64.Compare(aPriority, bPriority) + } + }, + } +} + // PriorityNonceMempool is a mempool implementation that stores txs // in a partially ordered set by 2 dimensions: priority, and sender-nonce // (sequence number). Internally it uses one priority ordered skip list and one @@ -25,20 +58,21 @@ var ( // and priority. type PriorityNonceMempool struct { priorityIndex *skiplist.SkipList - priorityCounts map[int64]int + priorityCounts map[any]int senderIndices map[string]*skiplist.SkipList scores map[txMeta]txMeta onRead func(tx sdk.Tx) - txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool + txReplacement func(op, np any, oTx, nTx sdk.Tx) bool maxTx int + txPriority TxPriority } type PriorityNonceIterator struct { + mempool *PriorityNonceMempool + priorityNode *skiplist.Element senderCursors map[string]*skiplist.Element - nextPriority int64 sender string - priorityNode *skiplist.Element - mempool *PriorityNonceMempool + nextPriority any } // txMeta stores transaction metadata used in indices @@ -46,44 +80,48 @@ type txMeta struct { // nonce is the sender's sequence number nonce uint64 // priority is the transaction's priority - priority int64 + priority any // sender is the transaction's sender sender string - // weight is the transaction's weight, used as a tiebreaker for transactions with the same priority - weight int64 + // weight is the transaction's weight, used as a tiebreaker for transactions + // with the same priority + weight any // senderElement is a pointer to the transaction's element in the sender index senderElement *skiplist.Element } -// txMetaLess is a comparator for txKeys that first compares priority, then weight, -// then sender, then nonce, uniquely identifying a transaction. +// skiplistComparable is a comparator for txKeys that first compares priority, +// then weight, then sender, then nonce, uniquely identifying a transaction. // -// Note, txMetaLess is used as the comparator in the priority index. -func txMetaLess(a, b any) int { - keyA := a.(txMeta) - keyB := b.(txMeta) - res := skiplist.Int64.Compare(keyA.priority, keyB.priority) - if res != 0 { - return res - } +// Note, skiplistComparable is used as the comparator in the priority index. +func skiplistComparable(txPriority TxPriority) skiplist.Comparable { + return skiplist.LessThanFunc(func(a, b any) int { + keyA := a.(txMeta) + keyB := b.(txMeta) + + res := txPriority.CompareTxPriority(keyA.priority, keyB.priority) + if res != 0 { + return res + } - // Weight is used as a tiebreaker for transactions with the same priority. - // Weight is calculated in a single pass in .Select(...) and so will be 0 - // on .Insert(...). - res = skiplist.Int64.Compare(keyA.weight, keyB.weight) - if res != 0 { - return res - } + // Weight is used as a tiebreaker for transactions with the same priority. + // Weight is calculated in a single pass in .Select(...) and so will be 0 + // on .Insert(...). + res = txPriority.CompareTxPriority(keyA.weight, keyB.weight) + if res != 0 { + return res + } - // Because weight will be 0 on .Insert(...), we must also compare sender and - // nonce to resolve priority collisions. If we didn't then transactions with - // the same priority would overwrite each other in the priority index. - res = skiplist.String.Compare(keyA.sender, keyB.sender) - if res != 0 { - return res - } + // Because weight will be 0 on .Insert(...), we must also compare sender and + // nonce to resolve priority collisions. If we didn't then transactions with + // the same priority would overwrite each other in the priority index. + res = skiplist.String.Compare(keyA.sender, keyB.sender) + if res != 0 { + return res + } - return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) + return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) + }) } type PriorityNonceMempoolOption func(*PriorityNonceMempool) @@ -99,7 +137,7 @@ func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption // PriorityNonceWithTxReplacement sets a callback to be called when duplicated // transaction nonce detected during mempool insert. An application can define a // transaction replacement rule based on tx priority or certain transaction fields. -func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { +func PriorityNonceWithTxReplacement(txReplacementRule func(op, np any, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { return func(mp *PriorityNonceMempool) { mp.txReplacement = txReplacementRule } @@ -118,18 +156,19 @@ func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { } // DefaultPriorityMempool returns a priorityNonceMempool with no options. -func DefaultPriorityMempool() Mempool { - return NewPriorityMempool() +func DefaultPriorityMempool(txPriority TxPriority) Mempool { + return NewPriorityMempool(txPriority) } // NewPriorityMempool returns the SDK's default mempool implementation which // returns txs in a partial order by 2 dimensions; priority, and sender-nonce. -func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { +func NewPriorityMempool(txPriority TxPriority, opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { mp := &PriorityNonceMempool{ - priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)), - priorityCounts: make(map[int64]int), + priorityIndex: skiplist.New(skiplistComparable(txPriority)), + priorityCounts: make(map[any]int), senderIndices: make(map[string]*skiplist.SkipList), scores: make(map[txMeta]txMeta), + txPriority: txPriority, } for _, opt := range opts { @@ -176,10 +215,9 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { return fmt.Errorf("tx must have at least one signer") } - sdkContext := sdk.UnwrapSDKContext(ctx) - priority := sdkContext.Priority() sig := sigs[0] sender := sdk.AccAddress(sig.PubKey.Address()).String() + priority := mp.txPriority.GetTxPriority(ctx, tx) nonce := sig.Sequence key := txMeta{nonce: nonce, priority: priority, sender: sender} @@ -252,7 +290,7 @@ func (i *PriorityNonceIterator) iteratePriority() Iterator { if nextPriorityNode != nil { i.nextPriority = nextPriorityNode.Key().(txMeta).priority } else { - i.nextPriority = math.MinInt64 + i.nextPriority = nil } return i.Next() @@ -281,13 +319,13 @@ func (i *PriorityNonceIterator) Next() Iterator { // We've reached a transaction with a priority lower than the next highest // priority in the pool. - if key.priority < i.nextPriority { + if i.mempool.txPriority.CompareTxPriority(key.priority, i.nextPriority) < 0 { return i.iteratePriority() - } else if key.priority == i.nextPriority { + } else if i.mempool.txPriority.CompareTxPriority(key.priority, i.nextPriority) == 0 { // Weight is incorporated into the priority index key only (not sender index) // so we must fetch it here from the scores map. weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight - if weight < i.priorityNode.Next().Key().(txMeta).weight { + if i.mempool.txPriority.CompareTxPriority(weight, i.priorityNode.Next().Key().(txMeta).weight) < 0 { return i.iteratePriority() } } @@ -335,7 +373,7 @@ func (mp *PriorityNonceMempool) reorderPriorityTies() { key := node.Key().(txMeta) if mp.priorityCounts[key.priority] > 1 { newKey := key - newKey.weight = senderWeight(key.senderElement) + newKey.weight = senderWeight(mp.txPriority, key.senderElement) reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) } @@ -354,7 +392,7 @@ func (mp *PriorityNonceMempool) reorderPriorityTies() { // defined as the first (nonce-wise) same sender tx with a priority not equal to // t. It is used to resolve priority collisions, that is when 2 or more txs from // different senders have the same priority. -func senderWeight(senderCursor *skiplist.Element) int64 { +func senderWeight(txPriority TxPriority, senderCursor *skiplist.Element) any { if senderCursor == nil { return 0 } @@ -363,7 +401,7 @@ func senderWeight(senderCursor *skiplist.Element) int64 { senderCursor = senderCursor.Next() for senderCursor != nil { p := senderCursor.Key().(txMeta).priority - if p != weight { + if txPriority.CompareTxPriority(p, weight) != 0 { weight = p } @@ -419,7 +457,7 @@ func IsEmpty(mempool Mempool) error { return fmt.Errorf("priorityIndex not empty") } - var countKeys []int64 + var countKeys []any for k := range mp.priorityCounts { countKeys = append(countKeys, k) } diff --git a/types/mempool/priority_nonce_test.go b/types/mempool/priority_nonce_test.go index 6c00bf58ede0..0e1c1ce1346c 100644 --- a/types/mempool/priority_nonce_test.go +++ b/types/mempool/priority_nonce_test.go @@ -7,10 +7,10 @@ import ( "testing" "time" + "cosmossdk.io/log" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" "github.com/stretchr/testify/require" - "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" simtypes "github.com/cosmos/cosmos-sdk/types/simulation" @@ -229,7 +229,7 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { } for i, tt := range tests { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - pool := mempool.NewPriorityMempool() + pool := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) // create test txs and insert into mempool for i, ts := range tt.txs { @@ -275,7 +275,7 @@ func (s *MempoolTestSuite) TestPriorityTies() { } for i := 0; i < 100; i++ { - s.mempool = mempool.NewPriorityMempool() + s.mempool = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) var shuffled []txSpec for _, t := range txSet { tx := txSpec{ @@ -372,9 +372,12 @@ func validateOrder(mtxs []sdk.Tx) error { func (s *MempoolTestSuite) TestRandomGeneratedTxs() { s.iterations = 0 - s.mempool = mempool.NewPriorityMempool(mempool.PriorityNonceWithOnRead(func(tx sdk.Tx) { - s.iterations++ - })) + s.mempool = mempool.NewPriorityMempool( + mempool.NewDefaultTxPriority(), + mempool.PriorityNonceWithOnRead(func(tx sdk.Tx) { + s.iterations++ + }), + ) t := s.T() ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) seed := time.Now().UnixNano() @@ -409,7 +412,7 @@ func (s *MempoolTestSuite) TestRandomGeneratedTxs() { func (s *MempoolTestSuite) TestRandomWalkTxs() { s.iterations = 0 - s.mempool = mempool.NewPriorityMempool() + s.mempool = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) t := s.T() ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) @@ -589,7 +592,7 @@ func TestPriorityNonceMempool_NextSenderTx(t *testing.T) { accA := accounts[0].Address accB := accounts[1].Address - mp := mempool.NewPriorityMempool() + mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) txs := []testTx{ {priority: 20, nonce: 1, address: accA}, @@ -633,13 +636,13 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // unlimited - mp := mempool.NewPriorityMempool(mempool.PriorityNonceWithMaxTx(0)) + mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx(0)) for i, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) require.Equal(t, i+1, mp.CountTx()) } - mp = mempool.NewPriorityMempool() + mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) for i, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) @@ -647,7 +650,7 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // limit: 3 - mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithMaxTx(3)) + mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx(3)) for i, tx := range txs { c := ctx.WithPriority(tx.priority) err := mp.Insert(c, tx) @@ -661,7 +664,7 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // disabled - mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithMaxTx(-1)) + mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx(-1)) for _, tx := range txs { c := ctx.WithPriority(tx.priority) err := mp.Insert(c, tx) @@ -683,7 +686,7 @@ func TestNextSenderTx_TxReplacement(t *testing.T) { } // test Priority with default mempool - mp := mempool.NewPriorityMempool() + mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) for _, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) @@ -697,10 +700,13 @@ func TestNextSenderTx_TxReplacement(t *testing.T) { // we set a TestTxReplacement rule which the priority of the new Tx must be 20% more than the priority of the old Tx // otherwise, the Insert will return error feeBump := 20 - mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool { - threshold := int64(100 + feeBump) - return np >= op*threshold/100 - })) + mp = mempool.NewPriorityMempool( + mempool.NewDefaultTxPriority(), + mempool.PriorityNonceWithTxReplacement(func(op, np any, oTx, nTx sdk.Tx) bool { + threshold := int64(100 + feeBump) + return np.(int64) >= op.(int64)*threshold/100 + }), + ) c := ctx.WithPriority(txs[0].priority) require.NoError(t, mp.Insert(c, txs[0])) From 454548895b2cb9e17ec9ff1d32c945000ed34f73 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 14 Mar 2023 12:24:42 -0400 Subject: [PATCH 2/5] generics approach --- types/mempool/priority_nonce.go | 177 +++++++++++++-------------- types/mempool/priority_nonce_test.go | 14 +-- 2 files changed, 92 insertions(+), 99 deletions(-) diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index 8fe9a63e1404..6edfe674f845 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -3,6 +3,7 @@ package mempool import ( "context" "fmt" + "math" "github.com/huandu/skiplist" @@ -11,41 +12,33 @@ import ( ) var ( - _ Mempool = (*PriorityNonceMempool)(nil) - _ Iterator = (*PriorityNonceIterator)(nil) + _ Mempool = (*PriorityNonceMempool[int64])(nil) + _ Iterator = (*PriorityNonceIterator[int64])(nil) ) -type TxPriority struct { +type TxPriority[C comparable] struct { // GetTxPriority returns the priority of the transaction. A priority must be - // comparable via CompareTxPriority. - GetTxPriority func(ctx context.Context, tx sdk.Tx) any + // comparable via Compare. + GetTxPriority func(ctx context.Context, tx sdk.Tx) C + // CompareTxPriority compares two transaction priorities. The result should be // 0 if a == b, -1 if a < b, and +1 if a > b. - CompareTxPriority func(a, b any) int + Compare func(a, b C) int + + MinValue C } // NewDefaultTxPriority returns a TxPriority comparator using ctx.Priority as // the defining transaction priority. -func NewDefaultTxPriority() TxPriority { - return TxPriority{ - GetTxPriority: func(goCtx context.Context, tx sdk.Tx) any { +func NewDefaultTxPriority() TxPriority[int64] { + return TxPriority[int64]{ + GetTxPriority: func(goCtx context.Context, _ sdk.Tx) int64 { return sdk.UnwrapSDKContext(goCtx).Priority() }, - CompareTxPriority: func(a, b any) int { - switch { - case a == nil && b == nil: - return 0 - case a == nil: - return -1 - case b == nil: - return 1 - default: - aPriority := a.(int64) - bPriority := b.(int64) - - return skiplist.Int64.Compare(aPriority, bPriority) - } + Compare: func(a, b int64) int { + return skiplist.Int64.Compare(a, b) }, + MinValue: math.MinInt64, } } @@ -56,36 +49,36 @@ func NewDefaultTxPriority() TxPriority { // are multiple txs from the same sender, they are not always comparable by // priority to other sender txs and must be partially ordered by both sender-nonce // and priority. -type PriorityNonceMempool struct { +type PriorityNonceMempool[C comparable] struct { priorityIndex *skiplist.SkipList - priorityCounts map[any]int + priorityCounts map[C]int senderIndices map[string]*skiplist.SkipList - scores map[txMeta]txMeta + scores map[txMeta[C]]txMeta[C] onRead func(tx sdk.Tx) - txReplacement func(op, np any, oTx, nTx sdk.Tx) bool + txReplacement func(op, np C, oTx, nTx sdk.Tx) bool maxTx int - txPriority TxPriority + txPriority TxPriority[C] } -type PriorityNonceIterator struct { - mempool *PriorityNonceMempool +type PriorityNonceIterator[C comparable] struct { + mempool *PriorityNonceMempool[C] priorityNode *skiplist.Element senderCursors map[string]*skiplist.Element sender string - nextPriority any + nextPriority C } // txMeta stores transaction metadata used in indices -type txMeta struct { +type txMeta[C comparable] struct { // nonce is the sender's sequence number nonce uint64 // priority is the transaction's priority - priority any + priority C // sender is the transaction's sender sender string // weight is the transaction's weight, used as a tiebreaker for transactions // with the same priority - weight any + weight C // senderElement is a pointer to the transaction's element in the sender index senderElement *skiplist.Element } @@ -94,12 +87,12 @@ type txMeta struct { // then weight, then sender, then nonce, uniquely identifying a transaction. // // Note, skiplistComparable is used as the comparator in the priority index. -func skiplistComparable(txPriority TxPriority) skiplist.Comparable { +func skiplistComparable[C comparable](txPriority TxPriority[C]) skiplist.Comparable { return skiplist.LessThanFunc(func(a, b any) int { - keyA := a.(txMeta) - keyB := b.(txMeta) + keyA := a.(txMeta[C]) + keyB := b.(txMeta[C]) - res := txPriority.CompareTxPriority(keyA.priority, keyB.priority) + res := txPriority.Compare(keyA.priority, keyB.priority) if res != 0 { return res } @@ -107,7 +100,7 @@ func skiplistComparable(txPriority TxPriority) skiplist.Comparable { // Weight is used as a tiebreaker for transactions with the same priority. // Weight is calculated in a single pass in .Select(...) and so will be 0 // on .Insert(...). - res = txPriority.CompareTxPriority(keyA.weight, keyB.weight) + res = txPriority.Compare(keyA.weight, keyB.weight) if res != 0 { return res } @@ -124,12 +117,12 @@ func skiplistComparable(txPriority TxPriority) skiplist.Comparable { }) } -type PriorityNonceMempoolOption func(*PriorityNonceMempool) +type PriorityNonceMempoolOption[C comparable] func(*PriorityNonceMempool[C]) // PriorityNonceWithOnRead sets a callback to be called when a tx is read from // the mempool. -func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption { - return func(mp *PriorityNonceMempool) { +func PriorityNonceWithOnRead[C comparable](onRead func(tx sdk.Tx)) PriorityNonceMempoolOption[C] { + return func(mp *PriorityNonceMempool[C]) { mp.onRead = onRead } } @@ -137,8 +130,8 @@ func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption // PriorityNonceWithTxReplacement sets a callback to be called when duplicated // transaction nonce detected during mempool insert. An application can define a // transaction replacement rule based on tx priority or certain transaction fields. -func PriorityNonceWithTxReplacement(txReplacementRule func(op, np any, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { - return func(mp *PriorityNonceMempool) { +func PriorityNonceWithTxReplacement[C comparable](txReplacementRule func(op, np C, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption[C] { + return func(mp *PriorityNonceMempool[C]) { mp.txReplacement = txReplacementRule } } @@ -149,25 +142,25 @@ func PriorityNonceWithTxReplacement(txReplacementRule func(op, np any, oTx, nTx // <0: disabled, `Insert` is a no-op // 0: unlimited // >0: maximum number of transactions allowed -func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { - return func(mp *PriorityNonceMempool) { +func PriorityNonceWithMaxTx[C comparable](maxTx int) PriorityNonceMempoolOption[C] { + return func(mp *PriorityNonceMempool[C]) { mp.maxTx = maxTx } } // DefaultPriorityMempool returns a priorityNonceMempool with no options. -func DefaultPriorityMempool(txPriority TxPriority) Mempool { +func DefaultPriorityMempool[C comparable](txPriority TxPriority[C]) Mempool { return NewPriorityMempool(txPriority) } // NewPriorityMempool returns the SDK's default mempool implementation which // returns txs in a partial order by 2 dimensions; priority, and sender-nonce. -func NewPriorityMempool(txPriority TxPriority, opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { - mp := &PriorityNonceMempool{ +func NewPriorityMempool[C comparable](txPriority TxPriority[C], opts ...PriorityNonceMempoolOption[C]) *PriorityNonceMempool[C] { + mp := &PriorityNonceMempool[C]{ priorityIndex: skiplist.New(skiplistComparable(txPriority)), - priorityCounts: make(map[any]int), + priorityCounts: make(map[C]int), senderIndices: make(map[string]*skiplist.SkipList), - scores: make(map[txMeta]txMeta), + scores: make(map[txMeta[C]]txMeta[C]), txPriority: txPriority, } @@ -181,7 +174,7 @@ func NewPriorityMempool(txPriority TxPriority, opts ...PriorityNonceMempoolOptio // NextSenderTx returns the next transaction for a given sender by nonce order, // i.e. the next valid transaction for the sender. If no such transaction exists, // nil will be returned. -func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { +func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx { senderIndex, ok := mp.senderIndices[sender] if !ok { return nil @@ -200,7 +193,7 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { // // Inserting a duplicate tx with a different priority overwrites the existing tx, // changing the total order of the mempool. -func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { +func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { return ErrMempoolTxMaxCapacity } else if mp.maxTx < 0 { @@ -219,12 +212,12 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { sender := sdk.AccAddress(sig.PubKey.Address()).String() priority := mp.txPriority.GetTxPriority(ctx, tx) nonce := sig.Sequence - key := txMeta{nonce: nonce, priority: priority, sender: sender} + key := txMeta[C]{nonce: nonce, priority: priority, sender: sender} senderIndex, ok := mp.senderIndices[sender] if !ok { senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { - return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) + return skiplist.Uint64.Compare(b.(txMeta[C]).nonce, a.(txMeta[C]).nonce) })) // initialize sender index if not found @@ -238,7 +231,7 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { // // This O(log n) remove operation is rare and only happens when a tx's priority // changes. - sk := txMeta{nonce: nonce, sender: sender} + sk := txMeta[C]{nonce: nonce, sender: sender} if oldScore, txExists := mp.scores[sk]; txExists { if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { return fmt.Errorf( @@ -250,7 +243,7 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { ) } - mp.priorityIndex.Remove(txMeta{ + mp.priorityIndex.Remove(txMeta[C]{ nonce: nonce, sender: sender, priority: oldScore.priority, @@ -265,13 +258,13 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { // existing key. key.senderElement = senderIndex.Set(key, tx) - mp.scores[sk] = txMeta{priority: priority} + mp.scores[sk] = txMeta[C]{priority: priority} mp.priorityIndex.Set(key, tx) return nil } -func (i *PriorityNonceIterator) iteratePriority() Iterator { +func (i *PriorityNonceIterator[C]) iteratePriority() Iterator { // beginning of priority iteration if i.priorityNode == nil { i.priorityNode = i.mempool.priorityIndex.Front() @@ -284,19 +277,19 @@ func (i *PriorityNonceIterator) iteratePriority() Iterator { return nil } - i.sender = i.priorityNode.Key().(txMeta).sender + i.sender = i.priorityNode.Key().(txMeta[C]).sender nextPriorityNode := i.priorityNode.Next() if nextPriorityNode != nil { - i.nextPriority = nextPriorityNode.Key().(txMeta).priority + i.nextPriority = nextPriorityNode.Key().(txMeta[C]).priority } else { - i.nextPriority = nil + i.nextPriority = i.mempool.txPriority.MinValue } return i.Next() } -func (i *PriorityNonceIterator) Next() Iterator { +func (i *PriorityNonceIterator[C]) Next() Iterator { if i.priorityNode == nil { return nil } @@ -315,17 +308,17 @@ func (i *PriorityNonceIterator) Next() Iterator { return i.iteratePriority() } - key := cursor.Key().(txMeta) + key := cursor.Key().(txMeta[C]) // We've reached a transaction with a priority lower than the next highest // priority in the pool. - if i.mempool.txPriority.CompareTxPriority(key.priority, i.nextPriority) < 0 { + if i.mempool.txPriority.Compare(key.priority, i.nextPriority) < 0 { return i.iteratePriority() - } else if i.mempool.txPriority.CompareTxPriority(key.priority, i.nextPriority) == 0 { + } else if i.mempool.txPriority.Compare(key.priority, i.nextPriority) == 0 { // Weight is incorporated into the priority index key only (not sender index) // so we must fetch it here from the scores map. - weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight - if i.mempool.txPriority.CompareTxPriority(weight, i.priorityNode.Next().Key().(txMeta).weight) < 0 { + weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight + if i.mempool.txPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 { return i.iteratePriority() } } @@ -334,7 +327,7 @@ func (i *PriorityNonceIterator) Next() Iterator { return i } -func (i *PriorityNonceIterator) Tx() sdk.Tx { +func (i *PriorityNonceIterator[C]) Tx() sdk.Tx { return i.senderCursors[i.sender].Value.(sdk.Tx) } @@ -344,14 +337,14 @@ func (i *PriorityNonceIterator) Tx() sdk.Tx { // // The maxBytes parameter defines the maximum number of bytes of transactions to // return. -func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { +func (mp *PriorityNonceMempool[C]) Select(_ context.Context, _ [][]byte) Iterator { if mp.priorityIndex.Len() == 0 { return nil } mp.reorderPriorityTies() - iterator := &PriorityNonceIterator{ + iterator := &PriorityNonceIterator[C]{ mempool: mp, senderCursors: make(map[string]*skiplist.Element), } @@ -359,22 +352,22 @@ func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) Iterator { return iterator.iteratePriority() } -type reorderKey struct { - deleteKey txMeta - insertKey txMeta +type reorderKey[C comparable] struct { + deleteKey txMeta[C] + insertKey txMeta[C] tx sdk.Tx } -func (mp *PriorityNonceMempool) reorderPriorityTies() { +func (mp *PriorityNonceMempool[C]) reorderPriorityTies() { node := mp.priorityIndex.Front() - var reordering []reorderKey + var reordering []reorderKey[C] for node != nil { - key := node.Key().(txMeta) + key := node.Key().(txMeta[C]) if mp.priorityCounts[key.priority] > 1 { newKey := key newKey.weight = senderWeight(mp.txPriority, key.senderElement) - reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) + reordering = append(reordering, reorderKey[C]{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) } node = node.Next() @@ -382,9 +375,9 @@ func (mp *PriorityNonceMempool) reorderPriorityTies() { for _, k := range reordering { mp.priorityIndex.Remove(k.deleteKey) - delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender}) + delete(mp.scores, txMeta[C]{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender}) mp.priorityIndex.Set(k.insertKey, k.tx) - mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey + mp.scores[txMeta[C]{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey } } @@ -392,16 +385,16 @@ func (mp *PriorityNonceMempool) reorderPriorityTies() { // defined as the first (nonce-wise) same sender tx with a priority not equal to // t. It is used to resolve priority collisions, that is when 2 or more txs from // different senders have the same priority. -func senderWeight(txPriority TxPriority, senderCursor *skiplist.Element) any { +func senderWeight[C comparable](txPriority TxPriority[C], senderCursor *skiplist.Element) C { if senderCursor == nil { - return 0 + return txPriority.MinValue } - weight := senderCursor.Key().(txMeta).priority + weight := senderCursor.Key().(txMeta[C]).priority senderCursor = senderCursor.Next() for senderCursor != nil { - p := senderCursor.Key().(txMeta).priority - if txPriority.CompareTxPriority(p, weight) != 0 { + p := senderCursor.Key().(txMeta[C]).priority + if txPriority.Compare(p, weight) != 0 { weight = p } @@ -412,13 +405,13 @@ func senderWeight(txPriority TxPriority, senderCursor *skiplist.Element) any { } // CountTx returns the number of transactions in the mempool. -func (mp *PriorityNonceMempool) CountTx() int { +func (mp *PriorityNonceMempool[C]) CountTx() int { return mp.priorityIndex.Len() } // Remove removes a transaction from the mempool in O(log n) time, returning an // error if unsuccessful. -func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { +func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error { sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { return err @@ -431,12 +424,12 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { sender := sdk.AccAddress(sig.PubKey.Address()).String() nonce := sig.Sequence - scoreKey := txMeta{nonce: nonce, sender: sender} + scoreKey := txMeta[C]{nonce: nonce, sender: sender} score, ok := mp.scores[scoreKey] if !ok { return ErrTxNotFound } - tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} + tk := txMeta[C]{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} senderTxs, ok := mp.senderIndices[sender] if !ok { @@ -451,13 +444,13 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { return nil } -func IsEmpty(mempool Mempool) error { - mp := mempool.(*PriorityNonceMempool) +func IsEmpty[C comparable](mempool Mempool) error { + mp := mempool.(*PriorityNonceMempool[C]) if mp.priorityIndex.Len() != 0 { return fmt.Errorf("priorityIndex not empty") } - var countKeys []any + var countKeys []C for k := range mp.priorityCounts { countKeys = append(countKeys, k) } diff --git a/types/mempool/priority_nonce_test.go b/types/mempool/priority_nonce_test.go index 0e1c1ce1346c..fbfa5e30801b 100644 --- a/types/mempool/priority_nonce_test.go +++ b/types/mempool/priority_nonce_test.go @@ -253,7 +253,7 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { require.NoError(t, pool.Remove(tx)) } - require.NoError(t, mempool.IsEmpty(pool)) + require.NoError(t, mempool.IsEmpty[int64](pool)) }) } } @@ -374,7 +374,7 @@ func (s *MempoolTestSuite) TestRandomGeneratedTxs() { s.iterations = 0 s.mempool = mempool.NewPriorityMempool( mempool.NewDefaultTxPriority(), - mempool.PriorityNonceWithOnRead(func(tx sdk.Tx) { + mempool.PriorityNonceWithOnRead[int64](func(tx sdk.Tx) { s.iterations++ }), ) @@ -636,7 +636,7 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // unlimited - mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx(0)) + mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx[int64](0)) for i, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) @@ -650,7 +650,7 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // limit: 3 - mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx(3)) + mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx[int64](3)) for i, tx := range txs { c := ctx.WithPriority(tx.priority) err := mp.Insert(c, tx) @@ -664,7 +664,7 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // disabled - mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx(-1)) + mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx[int64](-1)) for _, tx := range txs { c := ctx.WithPriority(tx.priority) err := mp.Insert(c, tx) @@ -702,9 +702,9 @@ func TestNextSenderTx_TxReplacement(t *testing.T) { feeBump := 20 mp = mempool.NewPriorityMempool( mempool.NewDefaultTxPriority(), - mempool.PriorityNonceWithTxReplacement(func(op, np any, oTx, nTx sdk.Tx) bool { + mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool { threshold := int64(100 + feeBump) - return np.(int64) >= op.(int64)*threshold/100 + return np >= op*threshold/100 }), ) From 3b0a63203809f74ecce41bafdc2db6cffaea8457 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 14 Mar 2023 15:44:44 -0400 Subject: [PATCH 3/5] updates --- types/mempool/priority_nonce.go | 204 +++++++++++++-------------- types/mempool/priority_nonce_test.go | 57 +++++--- 2 files changed, 141 insertions(+), 120 deletions(-) diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index 6edfe674f845..0450c78fe753 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -16,17 +16,87 @@ var ( _ Iterator = (*PriorityNonceIterator[int64])(nil) ) -type TxPriority[C comparable] struct { - // GetTxPriority returns the priority of the transaction. A priority must be - // comparable via Compare. - GetTxPriority func(ctx context.Context, tx sdk.Tx) C - - // CompareTxPriority compares two transaction priorities. The result should be - // 0 if a == b, -1 if a < b, and +1 if a > b. - Compare func(a, b C) int - - MinValue C -} +type ( + // PriorityNonceMempoolConfig defines the configuration used to configure the + // PriorityNonceMempool. + PriorityNonceMempoolConfig[C comparable] struct { + // TxPriority defines the transaction priority and comparator. + TxPriority TxPriority[C] + + // OnRead is a callback to be called when a tx is read from the mempool. + OnRead func(tx sdk.Tx) + + // TxReplacement is a callback to be called when duplicated transaction nonce + // detected during mempool insert. An application can define a transaction + // replacement rule based on tx priority or certain transaction fields. + TxReplacement func(op, np C, oTx, nTx sdk.Tx) bool + + // MaxTx sets the maximum number of transactions allowed in the mempool with + // the semantics: + // - if MaxTx == 0, there is no cap on the number of transactions in the mempool + // - if MaxTx > 0, the mempool will cap the number of transactions it stores, + // and will prioritize transactions by their priority and sender-nonce + // (sequence number) when evicting transactions. + // - if MaxTx < 0, `Insert` is a no-op. + MaxTx int + } + + // PriorityNonceMempool is a mempool implementation that stores txs + // in a partially ordered set by 2 dimensions: priority, and sender-nonce + // (sequence number). Internally it uses one priority ordered skip list and one + // skip list per sender ordered by sender-nonce (sequence number). When there + // are multiple txs from the same sender, they are not always comparable by + // priority to other sender txs and must be partially ordered by both sender-nonce + // and priority. + PriorityNonceMempool[C comparable] struct { + priorityIndex *skiplist.SkipList + priorityCounts map[C]int + senderIndices map[string]*skiplist.SkipList + scores map[txMeta[C]]txMeta[C] + cfg PriorityNonceMempoolConfig[C] + } + + // PriorityNonceIterator defines an iterator that is used for mempool iteration + // on Select(). + PriorityNonceIterator[C comparable] struct { + mempool *PriorityNonceMempool[C] + priorityNode *skiplist.Element + senderCursors map[string]*skiplist.Element + sender string + nextPriority C + } + + // TxPriority defines a type that is used to retrieve and compare transaction + // priorities. Priorities must be comparable. + TxPriority[C comparable] struct { + // GetTxPriority returns the priority of the transaction. A priority must be + // comparable via Compare. + GetTxPriority func(ctx context.Context, tx sdk.Tx) C + + // CompareTxPriority compares two transaction priorities. The result should be + // 0 if a == b, -1 if a < b, and +1 if a > b. + Compare func(a, b C) int + + // MinValue defines the minimum priority value, e.g. MinInt64. This value is + // used when instantiating a new iterator and comparing weights. + MinValue C + } + + // txMeta stores transaction metadata used in indices + txMeta[C comparable] struct { + // nonce is the sender's sequence number + nonce uint64 + // priority is the transaction's priority + priority C + // sender is the transaction's sender + sender string + // weight is the transaction's weight, used as a tiebreaker for transactions + // with the same priority + weight C + // senderElement is a pointer to the transaction's element in the sender index + senderElement *skiplist.Element + } +) // NewDefaultTxPriority returns a TxPriority comparator using ctx.Priority as // the defining transaction priority. @@ -42,45 +112,10 @@ func NewDefaultTxPriority() TxPriority[int64] { } } -// PriorityNonceMempool is a mempool implementation that stores txs -// in a partially ordered set by 2 dimensions: priority, and sender-nonce -// (sequence number). Internally it uses one priority ordered skip list and one -// skip list per sender ordered by sender-nonce (sequence number). When there -// are multiple txs from the same sender, they are not always comparable by -// priority to other sender txs and must be partially ordered by both sender-nonce -// and priority. -type PriorityNonceMempool[C comparable] struct { - priorityIndex *skiplist.SkipList - priorityCounts map[C]int - senderIndices map[string]*skiplist.SkipList - scores map[txMeta[C]]txMeta[C] - onRead func(tx sdk.Tx) - txReplacement func(op, np C, oTx, nTx sdk.Tx) bool - maxTx int - txPriority TxPriority[C] -} - -type PriorityNonceIterator[C comparable] struct { - mempool *PriorityNonceMempool[C] - priorityNode *skiplist.Element - senderCursors map[string]*skiplist.Element - sender string - nextPriority C -} - -// txMeta stores transaction metadata used in indices -type txMeta[C comparable] struct { - // nonce is the sender's sequence number - nonce uint64 - // priority is the transaction's priority - priority C - // sender is the transaction's sender - sender string - // weight is the transaction's weight, used as a tiebreaker for transactions - // with the same priority - weight C - // senderElement is a pointer to the transaction's element in the sender index - senderElement *skiplist.Element +func DefaultPriorityNonceMempoolConfig() PriorityNonceMempoolConfig[int64] { + return PriorityNonceMempoolConfig[int64]{ + TxPriority: NewDefaultTxPriority(), + } } // skiplistComparable is a comparator for txKeys that first compares priority, @@ -117,60 +152,25 @@ func skiplistComparable[C comparable](txPriority TxPriority[C]) skiplist.Compara }) } -type PriorityNonceMempoolOption[C comparable] func(*PriorityNonceMempool[C]) - -// PriorityNonceWithOnRead sets a callback to be called when a tx is read from -// the mempool. -func PriorityNonceWithOnRead[C comparable](onRead func(tx sdk.Tx)) PriorityNonceMempoolOption[C] { - return func(mp *PriorityNonceMempool[C]) { - mp.onRead = onRead - } -} - -// PriorityNonceWithTxReplacement sets a callback to be called when duplicated -// transaction nonce detected during mempool insert. An application can define a -// transaction replacement rule based on tx priority or certain transaction fields. -func PriorityNonceWithTxReplacement[C comparable](txReplacementRule func(op, np C, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption[C] { - return func(mp *PriorityNonceMempool[C]) { - mp.txReplacement = txReplacementRule - } -} - -// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the -// mempool with the semantics: -// -// <0: disabled, `Insert` is a no-op -// 0: unlimited -// >0: maximum number of transactions allowed -func PriorityNonceWithMaxTx[C comparable](maxTx int) PriorityNonceMempoolOption[C] { - return func(mp *PriorityNonceMempool[C]) { - mp.maxTx = maxTx - } -} - -// DefaultPriorityMempool returns a priorityNonceMempool with no options. -func DefaultPriorityMempool[C comparable](txPriority TxPriority[C]) Mempool { - return NewPriorityMempool(txPriority) -} - // NewPriorityMempool returns the SDK's default mempool implementation which // returns txs in a partial order by 2 dimensions; priority, and sender-nonce. -func NewPriorityMempool[C comparable](txPriority TxPriority[C], opts ...PriorityNonceMempoolOption[C]) *PriorityNonceMempool[C] { +func NewPriorityMempool[C comparable](cfg PriorityNonceMempoolConfig[C]) *PriorityNonceMempool[C] { mp := &PriorityNonceMempool[C]{ - priorityIndex: skiplist.New(skiplistComparable(txPriority)), + priorityIndex: skiplist.New(skiplistComparable(cfg.TxPriority)), priorityCounts: make(map[C]int), senderIndices: make(map[string]*skiplist.SkipList), scores: make(map[txMeta[C]]txMeta[C]), - txPriority: txPriority, - } - - for _, opt := range opts { - opt(mp) + cfg: cfg, } return mp } +// DefaultPriorityMempool returns a priorityNonceMempool with no options. +func DefaultPriorityMempool() *PriorityNonceMempool[int64] { + return NewPriorityMempool(DefaultPriorityNonceMempoolConfig()) +} + // NextSenderTx returns the next transaction for a given sender by nonce order, // i.e. the next valid transaction for the sender. If no such transaction exists, // nil will be returned. @@ -194,9 +194,9 @@ func (mp *PriorityNonceMempool[C]) NextSenderTx(sender string) sdk.Tx { // Inserting a duplicate tx with a different priority overwrites the existing tx, // changing the total order of the mempool. func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { - if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { + if mp.cfg.MaxTx > 0 && mp.CountTx() >= mp.cfg.MaxTx { return ErrMempoolTxMaxCapacity - } else if mp.maxTx < 0 { + } else if mp.cfg.MaxTx < 0 { return nil } @@ -210,7 +210,7 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error sig := sigs[0] sender := sdk.AccAddress(sig.PubKey.Address()).String() - priority := mp.txPriority.GetTxPriority(ctx, tx) + priority := mp.cfg.TxPriority.GetTxPriority(ctx, tx) nonce := sig.Sequence key := txMeta[C]{nonce: nonce, priority: priority, sender: sender} @@ -233,7 +233,7 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error // changes. sk := txMeta[C]{nonce: nonce, sender: sender} if oldScore, txExists := mp.scores[sk]; txExists { - if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { + if mp.cfg.TxReplacement != nil && !mp.cfg.TxReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { return fmt.Errorf( "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", oldScore.priority, @@ -283,7 +283,7 @@ func (i *PriorityNonceIterator[C]) iteratePriority() Iterator { if nextPriorityNode != nil { i.nextPriority = nextPriorityNode.Key().(txMeta[C]).priority } else { - i.nextPriority = i.mempool.txPriority.MinValue + i.nextPriority = i.mempool.cfg.TxPriority.MinValue } return i.Next() @@ -312,13 +312,13 @@ func (i *PriorityNonceIterator[C]) Next() Iterator { // We've reached a transaction with a priority lower than the next highest // priority in the pool. - if i.mempool.txPriority.Compare(key.priority, i.nextPriority) < 0 { + if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 { return i.iteratePriority() - } else if i.mempool.txPriority.Compare(key.priority, i.nextPriority) == 0 { + } else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 { // Weight is incorporated into the priority index key only (not sender index) // so we must fetch it here from the scores map. weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight - if i.mempool.txPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 { + if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 { return i.iteratePriority() } } @@ -366,7 +366,7 @@ func (mp *PriorityNonceMempool[C]) reorderPriorityTies() { key := node.Key().(txMeta[C]) if mp.priorityCounts[key.priority] > 1 { newKey := key - newKey.weight = senderWeight(mp.txPriority, key.senderElement) + newKey.weight = senderWeight(mp.cfg.TxPriority, key.senderElement) reordering = append(reordering, reorderKey[C]{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) } diff --git a/types/mempool/priority_nonce_test.go b/types/mempool/priority_nonce_test.go index fbfa5e30801b..12913998aa2b 100644 --- a/types/mempool/priority_nonce_test.go +++ b/types/mempool/priority_nonce_test.go @@ -229,7 +229,7 @@ func (s *MempoolTestSuite) TestPriorityNonceTxOrder() { } for i, tt := range tests { t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { - pool := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) + pool := mempool.DefaultPriorityMempool() // create test txs and insert into mempool for i, ts := range tt.txs { @@ -275,7 +275,7 @@ func (s *MempoolTestSuite) TestPriorityTies() { } for i := 0; i < 100; i++ { - s.mempool = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) + s.mempool = mempool.DefaultPriorityMempool() var shuffled []txSpec for _, t := range txSet { tx := txSpec{ @@ -373,11 +373,14 @@ func validateOrder(mtxs []sdk.Tx) error { func (s *MempoolTestSuite) TestRandomGeneratedTxs() { s.iterations = 0 s.mempool = mempool.NewPriorityMempool( - mempool.NewDefaultTxPriority(), - mempool.PriorityNonceWithOnRead[int64](func(tx sdk.Tx) { - s.iterations++ - }), + mempool.PriorityNonceMempoolConfig[int64]{ + TxPriority: mempool.NewDefaultTxPriority(), + OnRead: func(tx sdk.Tx) { + s.iterations++ + }, + }, ) + t := s.T() ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) seed := time.Now().UnixNano() @@ -412,7 +415,7 @@ func (s *MempoolTestSuite) TestRandomGeneratedTxs() { func (s *MempoolTestSuite) TestRandomWalkTxs() { s.iterations = 0 - s.mempool = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) + s.mempool = mempool.DefaultPriorityMempool() t := s.T() ctx := sdk.NewContext(nil, cmtproto.Header{}, false, log.NewNopLogger()) @@ -592,7 +595,7 @@ func TestPriorityNonceMempool_NextSenderTx(t *testing.T) { accA := accounts[0].Address accB := accounts[1].Address - mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) + mp := mempool.DefaultPriorityMempool() txs := []testTx{ {priority: 20, nonce: 1, address: accA}, @@ -636,13 +639,19 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // unlimited - mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx[int64](0)) + mp := mempool.NewPriorityMempool( + mempool.PriorityNonceMempoolConfig[int64]{ + TxPriority: mempool.NewDefaultTxPriority(), + MaxTx: 0, + }, + ) for i, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) require.Equal(t, i+1, mp.CountTx()) } - mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) + + mp = mempool.DefaultPriorityMempool() for i, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) @@ -650,7 +659,12 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // limit: 3 - mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx[int64](3)) + mp = mempool.NewPriorityMempool( + mempool.PriorityNonceMempoolConfig[int64]{ + TxPriority: mempool.NewDefaultTxPriority(), + MaxTx: 3, + }, + ) for i, tx := range txs { c := ctx.WithPriority(tx.priority) err := mp.Insert(c, tx) @@ -664,7 +678,12 @@ func TestNextSenderTx_TxLimit(t *testing.T) { } // disabled - mp = mempool.NewPriorityMempool(mempool.NewDefaultTxPriority(), mempool.PriorityNonceWithMaxTx[int64](-1)) + mp = mempool.NewPriorityMempool( + mempool.PriorityNonceMempoolConfig[int64]{ + TxPriority: mempool.NewDefaultTxPriority(), + MaxTx: -1, + }, + ) for _, tx := range txs { c := ctx.WithPriority(tx.priority) err := mp.Insert(c, tx) @@ -686,7 +705,7 @@ func TestNextSenderTx_TxReplacement(t *testing.T) { } // test Priority with default mempool - mp := mempool.NewPriorityMempool(mempool.NewDefaultTxPriority()) + mp := mempool.DefaultPriorityMempool() for _, tx := range txs { c := ctx.WithPriority(tx.priority) require.NoError(t, mp.Insert(c, tx)) @@ -701,11 +720,13 @@ func TestNextSenderTx_TxReplacement(t *testing.T) { // otherwise, the Insert will return error feeBump := 20 mp = mempool.NewPriorityMempool( - mempool.NewDefaultTxPriority(), - mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool { - threshold := int64(100 + feeBump) - return np >= op*threshold/100 - }), + mempool.PriorityNonceMempoolConfig[int64]{ + TxPriority: mempool.NewDefaultTxPriority(), + TxReplacement: func(op, np int64, oTx, nTx sdk.Tx) bool { + threshold := int64(100 + feeBump) + return np >= op*threshold/100 + }, + }, ) c := ctx.WithPriority(txs[0].priority) From a8929403eff94a806cf5dfdd18fa5ecb9cd14b13 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 14 Mar 2023 17:31:16 -0400 Subject: [PATCH 4/5] updates --- types/mempool/priority_nonce.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index 0450c78fe753..01010a08f3a4 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -73,8 +73,8 @@ type ( // comparable via Compare. GetTxPriority func(ctx context.Context, tx sdk.Tx) C - // CompareTxPriority compares two transaction priorities. The result should be - // 0 if a == b, -1 if a < b, and +1 if a > b. + // Compare compares two transaction priorities. The result must be 0 if + // a == b, -1 if a < b, and +1 if a > b. Compare func(a, b C) int // MinValue defines the minimum priority value, e.g. MinInt64. This value is @@ -450,7 +450,7 @@ func IsEmpty[C comparable](mempool Mempool) error { return fmt.Errorf("priorityIndex not empty") } - var countKeys []C + countKeys := make([]C, 0, len(mp.priorityCounts)) for k := range mp.priorityCounts { countKeys = append(countKeys, k) } @@ -461,7 +461,7 @@ func IsEmpty[C comparable](mempool Mempool) error { } } - var senderKeys []string + senderKeys := make([]string, 0, len(mp.senderIndices)) for k := range mp.senderIndices { senderKeys = append(senderKeys, k) } From 08ee562758b3e79cd83c88f973d3a2598216ad55 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 15 Mar 2023 12:09:32 -0400 Subject: [PATCH 5/5] updates --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ade6226baf7a..f1482fc441da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,9 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Improvements +* (mempool) [#15328](https://github.com/cosmos/cosmos-sdk/pull/15328) Improve the `PriorityNonceMempool` + * Support generic transaction prioritization, instead of `ctx.Priority()` + * Improve construction through the use of a single `PriorityNonceMempoolConfig` instead of option functions * (x/authz) [#15164](https://github.com/cosmos/cosmos-sdk/pull/15164) Add `MsgCancelUnbondingDelegation` to staking authorization * (server) [#15358](https://github.com/cosmos/cosmos-sdk/pull/15358) Add `server.InterceptConfigsAndCreateContext` as alternative to `server.InterceptConfigsPreRunHandler` which does not set the server context and the default SDK logger. * [#15011](https://github.com/cosmos/cosmos-sdk/pull/15011) Introduce `cosmossdk.io/log` package to provide a consistent logging interface through the SDK. CometBFT logger is now replaced by `cosmossdk.io/log.Logger`. @@ -97,6 +100,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### API Breaking Changes +* (mempool) [#15328](https://github.com/cosmos/cosmos-sdk/pull/15328) The `PriorityNonceMempool` is now generic over type `C comparable` and takes a single `PriorityNonceMempoolConfig[C]` argument. See `DefaultPriorityNonceMempoolConfig` for how to construct the configuration and a `TxPriority` type. * (server) [#15358](https://github.com/cosmos/cosmos-sdk/pull/15358) Remove `server.ErrorCode` that was not used anywhere. * [#15211](https://github.com/cosmos/cosmos-sdk/pull/15211) Remove usage of `github.com/cometbft/cometbft/libs/bytes.HexBytes` in favor of `[]byte` thorough the SDK. * [#15011](https://github.com/cosmos/cosmos-sdk/pull/15011) All functions that were taking a CometBFT logger, now take `cosmossdk.io/log.Logger` instead.