Skip to content

Commit

Permalink
Merge branch 'dev' into refactor-client
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Dec 11, 2023
2 parents 02e7588 + 4be5218 commit 09af73d
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 173 deletions.
4 changes: 2 additions & 2 deletions vms/avm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
remainingSize = targetBlockSize
)
for {
tx := b.mempool.Peek()
tx, exists := b.mempool.Peek()
// Invariant: [mempool.MaxTxSize] < [targetBlockSize]. This guarantees
// that we will only stop building a block once there are no
// transactions in the mempool or the block is at least
// [targetBlockSize - mempool.MaxTxSize] bytes full.
if tx == nil || len(tx.Bytes()) > remainingSize {
if !exists || len(tx.Bytes()) > remainingSize {
break
}
b.mempool.Remove([]*txs.Tx{tx})
Expand Down
26 changes: 13 additions & 13 deletions vms/avm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func TestBuilderBuildBlock(t *testing.T) {
tx := &txs.Tx{Unsigned: unsignedTx}

mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Peek().Return(tx)
mempool.EXPECT().Peek().Return(tx, true)
mempool.EXPECT().Remove([]*txs.Tx{tx})
mempool.EXPECT().MarkDropped(tx.ID(), errTest)
// Second loop iteration
mempool.EXPECT().Peek().Return(nil)
mempool.EXPECT().Peek().Return(nil, false)
mempool.EXPECT().RequestBuildBlock()

return New(
Expand Down Expand Up @@ -179,11 +179,11 @@ func TestBuilderBuildBlock(t *testing.T) {
tx := &txs.Tx{Unsigned: unsignedTx}

mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Peek().Return(tx)
mempool.EXPECT().Peek().Return(tx, true)
mempool.EXPECT().Remove([]*txs.Tx{tx})
mempool.EXPECT().MarkDropped(tx.ID(), errTest)
// Second loop iteration
mempool.EXPECT().Peek().Return(nil)
mempool.EXPECT().Peek().Return(nil, false)
mempool.EXPECT().RequestBuildBlock()

return New(
Expand Down Expand Up @@ -225,11 +225,11 @@ func TestBuilderBuildBlock(t *testing.T) {
tx := &txs.Tx{Unsigned: unsignedTx}

mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Peek().Return(tx)
mempool.EXPECT().Peek().Return(tx, true)
mempool.EXPECT().Remove([]*txs.Tx{tx})
mempool.EXPECT().MarkDropped(tx.ID(), errTest)
// Second loop iteration
mempool.EXPECT().Peek().Return(nil)
mempool.EXPECT().Peek().Return(nil, false)
mempool.EXPECT().RequestBuildBlock()

return New(
Expand Down Expand Up @@ -309,14 +309,14 @@ func TestBuilderBuildBlock(t *testing.T) {
)

mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Peek().Return(tx1)
mempool.EXPECT().Peek().Return(tx1, true)
mempool.EXPECT().Remove([]*txs.Tx{tx1})
// Second loop iteration
mempool.EXPECT().Peek().Return(tx2)
mempool.EXPECT().Peek().Return(tx2, true)
mempool.EXPECT().Remove([]*txs.Tx{tx2})
mempool.EXPECT().MarkDropped(tx2.ID(), blkexecutor.ErrConflictingBlockTxs)
// Third loop iteration
mempool.EXPECT().Peek().Return(nil)
mempool.EXPECT().Peek().Return(nil, false)
mempool.EXPECT().RequestBuildBlock()

// To marshal the tx/block
Expand Down Expand Up @@ -385,10 +385,10 @@ func TestBuilderBuildBlock(t *testing.T) {
tx := &txs.Tx{Unsigned: unsignedTx}

mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Peek().Return(tx)
mempool.EXPECT().Peek().Return(tx, true)
mempool.EXPECT().Remove([]*txs.Tx{tx})
// Second loop iteration
mempool.EXPECT().Peek().Return(nil)
mempool.EXPECT().Peek().Return(nil, false)
mempool.EXPECT().RequestBuildBlock()

// To marshal the tx/block
Expand Down Expand Up @@ -459,10 +459,10 @@ func TestBuilderBuildBlock(t *testing.T) {
tx := &txs.Tx{Unsigned: unsignedTx}

mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Peek().Return(tx)
mempool.EXPECT().Peek().Return(tx, true)
mempool.EXPECT().Remove([]*txs.Tx{tx})
// Second loop iteration
mempool.EXPECT().Peek().Return(nil)
mempool.EXPECT().Peek().Return(nil, false)
mempool.EXPECT().RequestBuildBlock()

// To marshal the tx/block
Expand Down
8 changes: 4 additions & 4 deletions vms/avm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Mempool interface {
Remove(txs []*txs.Tx)

// Peek returns the oldest tx in the mempool.
Peek() *txs.Tx
Peek() (tx *txs.Tx, exists bool)

// RequestBuildBlock notifies the consensus engine that a block should be
// built if there is at least one transaction in the mempool.
Expand Down Expand Up @@ -182,9 +182,9 @@ func (m *mempool) Remove(txsToRemove []*txs.Tx) {
}
}

func (m *mempool) Peek() *txs.Tx {
_, tx, _ := m.unissuedTxs.Oldest()
return tx
func (m *mempool) Peek() (*txs.Tx, bool) {
_, tx, exists := m.unissuedTxs.Oldest()
return tx, exists
}

func (m *mempool) RequestBuildBlock() {
Expand Down
20 changes: 14 additions & 6 deletions vms/avm/txs/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,28 @@ func TestPeekTxs(t *testing.T) {

testTxs := createTestTxs(2)

require.Nil(mempool.Peek())
tx, exists := mempool.Peek()
require.False(exists)
require.Nil(tx)

require.NoError(mempool.Add(testTxs[0]))
require.NoError(mempool.Add(testTxs[1]))

require.Equal(mempool.Peek(), testTxs[0])
require.NotEqual(mempool.Peek(), testTxs[1])
tx, exists = mempool.Peek()
require.True(exists)
require.Equal(tx, testTxs[0])
require.NotEqual(tx, testTxs[1])

mempool.Remove([]*txs.Tx{testTxs[0]})

require.NotEqual(mempool.Peek(), testTxs[0])
require.Equal(mempool.Peek(), testTxs[1])
tx, exists = mempool.Peek()
require.True(exists)
require.NotEqual(tx, testTxs[0])
require.Equal(tx, testTxs[1])

mempool.Remove([]*txs.Tx{testTxs[1]})

require.Nil(mempool.Peek())
tx, exists = mempool.Peek()
require.False(exists)
require.Nil(tx)
}
5 changes: 3 additions & 2 deletions vms/avm/txs/mempool/mock_mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 18 additions & 8 deletions vms/platformvm/block/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ func (b *builder) ShutdownBlockTimer() {
// This method removes the transactions from the returned
// blocks from the mempool.
func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
b.Mempool.DisableAdding()
defer func() {
b.Mempool.EnableAdding()
// If we need to advance the chain's timestamp in a standard block, but
// we build an invalid block, then we need to re-trigger block building.
//
Expand Down Expand Up @@ -236,10 +234,6 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) {
return nil, err
}

// Remove selected txs from mempool now that we are returning the block to
// the consensus engine.
txs := statelessBlk.Txs()
b.Mempool.Remove(txs)
return b.blkManager.NewBlock(statelessBlk), nil
}

Expand Down Expand Up @@ -283,8 +277,24 @@ func buildBlock(
)
}

var (
blockTxs []*txs.Tx
remainingSize = targetBlockSize
)

for {
tx, exists := builder.Mempool.Peek()
if !exists || len(tx.Bytes()) > remainingSize {
break
}
builder.Mempool.Remove([]*txs.Tx{tx})

remainingSize -= len(tx.Bytes())
blockTxs = append(blockTxs, tx)
}

// If there is no reason to build a block, don't.
if !builder.Mempool.HasTxs() && !forceAdvanceTime {
if len(blockTxs) == 0 && !forceAdvanceTime {
builder.txExecutorBackend.Ctx.Log.Debug("no pending txs to issue into a block")
return nil, ErrNoPendingBlocks
}
Expand All @@ -294,7 +304,7 @@ func buildBlock(
timestamp,
parentID,
height,
builder.Mempool.PeekTxs(targetBlockSize),
blockTxs,
)
}

Expand Down
34 changes: 24 additions & 10 deletions vms/platformvm/block/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,15 @@ func TestBuildBlock(t *testing.T) {
builderF: func(ctrl *gomock.Controller) *builder {
mempool := mempool.NewMockMempool(ctrl)

// There are txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx, true),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil, false),
)

return &builder{
Mempool: mempool,
}
Expand Down Expand Up @@ -463,7 +468,7 @@ func TestBuildBlock(t *testing.T) {

// There are no txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(false)
mempool.EXPECT().Peek().Return(nil, false)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -511,8 +516,7 @@ func TestBuildBlock(t *testing.T) {

// There are no txs.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(false)
mempool.EXPECT().PeekTxs(targetBlockSize).Return(nil)
mempool.EXPECT().Peek().Return(nil, false)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -566,8 +570,13 @@ func TestBuildBlock(t *testing.T) {

// There is a tx.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx, true),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil, false),
)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down Expand Up @@ -620,8 +629,13 @@ func TestBuildBlock(t *testing.T) {
// There are no decision txs
// There is a staker tx.
mempool.EXPECT().DropExpiredStakerTxs(gomock.Any()).Return([]ids.ID{})
mempool.EXPECT().HasTxs().Return(true)
mempool.EXPECT().PeekTxs(targetBlockSize).Return([]*txs.Tx{tx})

gomock.InOrder(
mempool.EXPECT().Peek().Return(tx, true),
mempool.EXPECT().Remove([]*txs.Tx{tx}),
// Second loop iteration
mempool.EXPECT().Peek().Return(nil, false),
)

clk := &mockable.Clock{}
clk.Set(now)
Expand Down
52 changes: 6 additions & 46 deletions vms/platformvm/txs/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,13 @@ var (
)

type Mempool interface {
// we may want to be able to stop valid transactions
// from entering the mempool, e.g. during blocks creation
EnableAdding()
DisableAdding()

Add(tx *txs.Tx) error
Has(txID ids.ID) bool
Get(txID ids.ID) *txs.Tx
Remove(txs []*txs.Tx)

// Following Banff activation, all mempool transactions,
// (both decision and staker) are included into Standard blocks.
// HasTxs allow to check for availability of any mempool transaction.
HasTxs() bool
// PeekTxs returns the next txs for Banff blocks
// up to maxTxsBytes without removing them from the mempool.
PeekTxs(maxTxsBytes int) []*txs.Tx
// Peek returns the oldest tx in the mempool.
Peek() (tx *txs.Tx, exists bool)

// Drops all [txs.Staker] transactions whose [StartTime] is before
// [minStartTime] from [mempool]. The dropped tx ids are returned.
Expand All @@ -86,9 +76,6 @@ type Mempool interface {
// Transactions from clients that have not yet been put into blocks and added to
// consensus
type mempool struct {
// If true, drop transactions added to the mempool via Add.
dropIncoming bool

bytesAvailableMetric prometheus.Gauge
bytesAvailable int

Expand Down Expand Up @@ -137,24 +124,11 @@ func New(

droppedTxIDs: &cache.LRU[ids.ID, error]{Size: droppedTxIDsCacheSize},
consumedUTXOs: set.NewSet[ids.ID](initialConsumedUTXOsSize),
dropIncoming: false, // enable tx adding by default
toEngine: toEngine,
}, nil
}

func (m *mempool) EnableAdding() {
m.dropIncoming = false
}

func (m *mempool) DisableAdding() {
m.dropIncoming = true
}

func (m *mempool) Add(tx *txs.Tx) error {
if m.dropIncoming {
return fmt.Errorf("tx %s not added because mempool is closed", tx.ID())
}

switch tx.Unsigned.(type) {
case *txs.AdvanceTimeTx:
return errCantIssueAdvanceTimeTx
Expand Down Expand Up @@ -231,23 +205,9 @@ func (m *mempool) Remove(txsToRemove []*txs.Tx) {
}
}

func (m *mempool) HasTxs() bool {
return m.unissuedTxs.Len() > 0
}

func (m *mempool) PeekTxs(maxTxsBytes int) []*txs.Tx {
var txs []*txs.Tx
txIter := m.unissuedTxs.NewIterator()
size := 0
for txIter.Next() {
tx := txIter.Value()
size += len(tx.Bytes())
if size > maxTxsBytes {
return txs
}
txs = append(txs, tx)
}
return txs
func (m *mempool) Peek() (*txs.Tx, bool) {
_, tx, exists := m.unissuedTxs.Oldest()
return tx, exists
}

func (m *mempool) MarkDropped(txID ids.ID, reason error) {
Expand All @@ -260,7 +220,7 @@ func (m *mempool) GetDropReason(txID ids.ID) error {
}

func (m *mempool) RequestBuildBlock(emptyBlockPermitted bool) {
if !emptyBlockPermitted && !m.HasTxs() {
if !emptyBlockPermitted && m.unissuedTxs.Len() == 0 {
return
}

Expand Down
Loading

0 comments on commit 09af73d

Please sign in to comment.