Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [\#591](https://github.com/cosmos/evm/pull/591) CheckTxHandler should handle "invalid nonce" tx
- [\#643](https://github.com/cosmos/evm/pull/643) Support for mnemonic source (file, stdin,etc) flag in key add command.
- [\#645](https://github.com/cosmos/evm/pull/645) Align precise bank keeper for correct decimal conversion in evmd.
- [\#656](https://github.com/cosmos/evm/pull/656) Fix race condition in mempool

### IMPROVEMENTS

Expand Down
57 changes: 40 additions & 17 deletions mempool/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mempool
import (
"fmt"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -24,8 +25,8 @@ import (
)

var (
_ txpool.BlockChain = Blockchain{}
_ legacypool.BlockChain = Blockchain{}
_ txpool.BlockChain = &Blockchain{}
_ legacypool.BlockChain = &Blockchain{}
)

// Blockchain implements the BlockChain interface required by Ethereum transaction pools.
Expand All @@ -42,12 +43,13 @@ type Blockchain struct {
blockGasLimit uint64
previousHeaderHash common.Hash
latestCtx sdk.Context
mu sync.RWMutex
}

// newBlockchain creates a new Blockchain instance that bridges Cosmos SDK state with Ethereum mempools.
// NewBlockchain creates a new Blockchain instance that bridges Cosmos SDK state with Ethereum mempools.
// The getCtxCallback function provides access to Cosmos SDK contexts at different heights, vmKeeper manages EVM state,
// and feeMarketKeeper handles fee market operations like base fee calculations.
func newBlockchain(ctx func(height int64, prove bool) (sdk.Context, error), logger log.Logger, vmKeeper VMKeeperI, feeMarketKeeper FeeMarketKeeperI, blockGasLimit uint64) *Blockchain {
func NewBlockchain(ctx func(height int64, prove bool) (sdk.Context, error), logger log.Logger, vmKeeper VMKeeperI, feeMarketKeeper FeeMarketKeeperI, blockGasLimit uint64) *Blockchain {
// Add the blockchain name to the logger
logger = logger.With(log.ModuleKey, "Blockchain")

Expand All @@ -70,23 +72,24 @@ func newBlockchain(ctx func(height int64, prove bool) (sdk.Context, error), logg

// Config returns the Ethereum chain configuration. It should only be called after the chain is initialized.
// This provides the necessary parameters for EVM execution and transaction validation.
func (b Blockchain) Config() *params.ChainConfig {
func (b *Blockchain) Config() *params.ChainConfig {
return evmtypes.GetEthChainConfig()
}

// CurrentBlock returns the current block header for the app.
// It constructs an Ethereum-compatible header from the current Cosmos SDK context,
// including block height, timestamp, gas limits, and base fee (if London fork is active).
// Returns a zero header as placeholder if the context is not yet available.
func (b Blockchain) CurrentBlock() *types.Header {
func (b *Blockchain) CurrentBlock() *types.Header {
ctx, err := b.GetLatestContext()
if err != nil {
return b.zeroHeader
}

blockHeight := ctx.BlockHeight()
// prevent the reorg from triggering after a restart since previousHeaderHash is stored as an in-memory variable
if blockHeight > 1 && b.previousHeaderHash == (common.Hash{}) {
previousHeaderHash := b.getPreviousHeaderHash()
if blockHeight > 1 && previousHeaderHash == (common.Hash{}) {
return b.zeroHeader
}

Expand All @@ -99,7 +102,7 @@ func (b Blockchain) CurrentBlock() *types.Header {
Time: uint64(blockTime), // #nosec G115 -- overflow not a concern with unix time
GasLimit: b.blockGasLimit,
GasUsed: gasUsed,
ParentHash: b.previousHeaderHash,
ParentHash: previousHeaderHash,
Root: appHash, // we actually don't care that this isn't the getCtxCallback header, as long as we properly track roots and parent roots to prevent the reorg from triggering
Difficulty: big.NewInt(0), // 0 difficulty on PoS
}
Expand Down Expand Up @@ -139,7 +142,7 @@ func (b Blockchain) CurrentBlock() *types.Header {
// Cosmos chains have instant finality, so this method should only be called for the genesis block (block 0)
// or block 1, as reorgs never occur. Any other call indicates a bug in the mempool logic.
// Panics if called for blocks beyond block 1, as this would indicate an attempted reorg.
func (b Blockchain) GetBlock(_ common.Hash, _ uint64) *types.Block {
func (b *Blockchain) GetBlock(_ common.Hash, _ uint64) *types.Block {
currBlock := b.CurrentBlock()
blockNumber := currBlock.Number.Int64()

Expand All @@ -161,7 +164,7 @@ func (b Blockchain) GetBlock(_ common.Hash, _ uint64) *types.Block {

// SubscribeChainHeadEvent allows subscribers to receive notifications when new blocks are finalized.
// Returns a subscription that will receive ChainHeadEvent notifications via the provided channel.
func (b Blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
func (b *Blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
b.logger.Debug("new chain head event subscription created")
return b.chainHeadFeed.Subscribe(ch)
}
Expand All @@ -170,19 +173,19 @@ func (b Blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event
func (b *Blockchain) NotifyNewBlock() {
latestCtx, err := b.newLatestContext()
if err != nil {
b.latestCtx = sdk.Context{}
b.setLatestContext(sdk.Context{})
b.logger.Debug("failed to get latest context, notifying chain head", "error", err)
}
b.latestCtx = latestCtx
b.setLatestContext(latestCtx)
header := b.CurrentBlock()
headerHash := header.Hash()

b.logger.Debug("notifying new block",
"block_number", header.Number.String(),
"block_hash", headerHash.Hex(),
"previous_hash", b.previousHeaderHash.Hex())
"previous_hash", b.getPreviousHeaderHash().Hex())

b.previousHeaderHash = headerHash
b.setPreviousHeaderHash(headerHash)
b.chainHeadFeed.Send(core.ChainHeadEvent{Header: header})

b.logger.Debug("chain head event sent to feed")
Expand All @@ -192,7 +195,7 @@ func (b *Blockchain) NotifyNewBlock() {
// In practice, this always returns the most recent state since the mempool
// only needs current state for validation. Historical state access is not supported
// as it's never required by the txpool.
func (b Blockchain) StateAt(hash common.Hash) (vm.StateDB, error) {
func (b *Blockchain) StateAt(hash common.Hash) (vm.StateDB, error) {
b.logger.Debug("StateAt called", "requested_hash", hash.Hex())

// This is returned at block 0, before the context is available.
Expand All @@ -215,10 +218,30 @@ func (b Blockchain) StateAt(hash common.Hash) (vm.StateDB, error) {
return stateDB, nil
}

func (b *Blockchain) getPreviousHeaderHash() common.Hash {
b.mu.RLock()
defer b.mu.RUnlock()
return b.previousHeaderHash
}

func (b *Blockchain) setPreviousHeaderHash(h common.Hash) {
b.mu.Lock()
defer b.mu.Unlock()
b.previousHeaderHash = h
}

func (b *Blockchain) setLatestContext(ctx sdk.Context) {
b.mu.Lock()
defer b.mu.Unlock()
b.latestCtx = ctx
}

// GetLatestContext returns the latest context as updated by the block,
// or attempts to retrieve it again if unavailable.
func (b Blockchain) GetLatestContext() (sdk.Context, error) {
func (b *Blockchain) GetLatestContext() (sdk.Context, error) {
b.logger.Debug("getting latest context")
b.mu.RLock()
defer b.mu.RUnlock()

if b.latestCtx.Context() != nil {
return b.latestCtx, nil
Expand All @@ -229,7 +252,7 @@ func (b Blockchain) GetLatestContext() (sdk.Context, error) {

// newLatestContext retrieves the most recent query context from the application.
// This provides access to the current blockchain state for transaction validation and execution.
func (b Blockchain) newLatestContext() (sdk.Context, error) {
func (b *Blockchain) newLatestContext() (sdk.Context, error) {
b.logger.Debug("getting latest context")

ctx, err := b.getCtxCallback(0, false)
Expand Down
124 changes: 124 additions & 0 deletions mempool/blockchain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//go:build test
// +build test

package mempool_test

import (
"math/big"
"sync"
"testing"
"time"

cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
"github.com/cosmos/evm/mempool"
"github.com/cosmos/evm/testutil/config"

"cosmossdk.io/log"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/cosmos/evm/mempool/mocks"
"github.com/cosmos/evm/x/vm/statedb"
vmtypes "github.com/cosmos/evm/x/vm/types"
)

// createMockContext creates a basic mock context for testing
func createMockContext() sdk.Context {
return sdk.Context{}.
WithBlockTime(time.Now()).
WithBlockHeader(cmtproto.Header{AppHash: []byte("00000000000000000000000000000000")}).
WithBlockHeight(1)
}

// TestBlockchainRaceCondition tests concurrent access to NotifyNewBlock and StateAt
// to ensure there are no race conditions between these operations.
func TestBlockchainRaceCondition(t *testing.T) {
logger := log.NewNopLogger()

// Create mock keepers using generated mocks
mockVMKeeper := mocks.NewVmKeeper(t)
mockFeeMarketKeeper := mocks.NewFeeMarketKeeper(t)

// Set up mock expectations for methods that will be called
mockVMKeeper.On("GetBaseFee", mock.Anything).Return(big.NewInt(1000000000)).Maybe() // 1 gwei
mockFeeMarketKeeper.On("GetBlockGasWanted", mock.Anything).Return(uint64(10000000)).Maybe() // 10M gas
mockVMKeeper.On("GetParams", mock.Anything).Return(vmtypes.DefaultParams()).Maybe()
mockVMKeeper.On("GetAccount", mock.Anything, common.Address{}).Return(&statedb.Account{}).Maybe()
mockVMKeeper.On("GetState", mock.Anything, common.Address{}, common.Hash{}).Return(common.Hash{}).Maybe()
mockVMKeeper.On("GetCode", mock.Anything, common.Hash{}).Return([]byte{}).Maybe()
mockVMKeeper.On("GetCodeHash", mock.Anything, common.Address{}).Return(common.Hash{}).Maybe()
mockVMKeeper.On("ForEachStorage", mock.Anything, common.Address{}, mock.AnythingOfType("func(common.Hash, common.Hash) bool")).Maybe()
mockVMKeeper.On("KVStoreKeys").Return(make(map[string]*storetypes.KVStoreKey)).Maybe()

err := vmtypes.NewEVMConfigurator().WithEVMCoinInfo(config.TestChainsCoinInfo[config.EVMChainID]).Configure()
require.NoError(t, err)

// Mock context callback that returns a valid context
getCtxCallback := func(height int64, prove bool) (sdk.Context, error) {
return createMockContext(), nil
}

blockchain := mempool.NewBlockchain(
getCtxCallback,
logger,
mockVMKeeper,
mockFeeMarketKeeper,
21000000, // block gas limit
)

const numIterations = 100
var wg sync.WaitGroup

// Channel to collect any errors from goroutines
errChan := make(chan error, numIterations*2)

// Start goroutine that calls NotifyNewBlock repeatedly
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numIterations; i++ {
blockchain.NotifyNewBlock()
// Small delay to allow interleaving
time.Sleep(time.Microsecond)
}
}()

// Start goroutine that calls StateAt repeatedly
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numIterations; i++ {
hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
_, err := blockchain.StateAt(hash)
if err != nil {
errChan <- err
return
}
// Small delay to allow interleaving
time.Sleep(time.Microsecond)
}
}()

// Wait for both goroutines to complete
wg.Wait()
close(errChan)

// Check for any errors
for err := range errChan {
require.NoError(t, err)
}

// Basic validation - ensure blockchain still functions correctly after concurrent access
header := blockchain.CurrentBlock()
require.NotNil(t, header)
require.Equal(t, int64(1), header.Number.Int64())

// Ensure StateAt still works after concurrent access
hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
stateDB, err := blockchain.StateAt(hash)
require.NoError(t, err)
require.NotNil(t, stateDB)
}
2 changes: 1 addition & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
config.BlockGasLimit = fallbackBlockGasLimit
}

blockchain = newBlockchain(getCtxCallback, logger, vmKeeper, feeMarketKeeper, config.BlockGasLimit)
blockchain = NewBlockchain(getCtxCallback, logger, vmKeeper, feeMarketKeeper, config.BlockGasLimit)

// Create txPool from configuration
legacyConfig := legacypool.DefaultConfig
Expand Down
45 changes: 45 additions & 0 deletions mempool/mocks/FeeMarketKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading