Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [\#591](https://github.com/cosmos/evm/pull/591) CheckTxHandler should handle "invalid nonce" tx
- [\#643](https://github.com/cosmos/evm/pull/643) Support for mnemonic source (file, stdin,etc) flag in key add command.
- [\#645](https://github.com/cosmos/evm/pull/645) Align precise bank keeper for correct decimal conversion in evmd.
- [\#656](https://github.com/cosmos/evm/pull/656) Fix race condition in concurrent usage of mempool StateAt and NotifyNewBlock methods.

### IMPROVEMENTS

Expand Down
57 changes: 40 additions & 17 deletions mempool/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mempool
import (
"fmt"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -24,8 +25,8 @@ import (
)

var (
_ txpool.BlockChain = Blockchain{}
_ legacypool.BlockChain = Blockchain{}
_ txpool.BlockChain = &Blockchain{}
_ legacypool.BlockChain = &Blockchain{}
)

// Blockchain implements the BlockChain interface required by Ethereum transaction pools.
Expand All @@ -42,12 +43,13 @@ type Blockchain struct {
blockGasLimit uint64
previousHeaderHash common.Hash
latestCtx sdk.Context
mu sync.RWMutex
}

// newBlockchain creates a new Blockchain instance that bridges Cosmos SDK state with Ethereum mempools.
// NewBlockchain creates a new Blockchain instance that bridges Cosmos SDK state with Ethereum mempools.
// The getCtxCallback function provides access to Cosmos SDK contexts at different heights, vmKeeper manages EVM state,
// and feeMarketKeeper handles fee market operations like base fee calculations.
func newBlockchain(ctx func(height int64, prove bool) (sdk.Context, error), logger log.Logger, vmKeeper VMKeeperI, feeMarketKeeper FeeMarketKeeperI, blockGasLimit uint64) *Blockchain {
func NewBlockchain(ctx func(height int64, prove bool) (sdk.Context, error), logger log.Logger, vmKeeper VMKeeperI, feeMarketKeeper FeeMarketKeeperI, blockGasLimit uint64) *Blockchain {
// Add the blockchain name to the logger
logger = logger.With(log.ModuleKey, "Blockchain")

Expand All @@ -70,23 +72,24 @@ func newBlockchain(ctx func(height int64, prove bool) (sdk.Context, error), logg

// Config returns the Ethereum chain configuration. It should only be called after the chain is initialized.
// This provides the necessary parameters for EVM execution and transaction validation.
func (b Blockchain) Config() *params.ChainConfig {
func (b *Blockchain) Config() *params.ChainConfig {
return evmtypes.GetEthChainConfig()
}

// CurrentBlock returns the current block header for the app.
// It constructs an Ethereum-compatible header from the current Cosmos SDK context,
// including block height, timestamp, gas limits, and base fee (if London fork is active).
// Returns a zero header as placeholder if the context is not yet available.
func (b Blockchain) CurrentBlock() *types.Header {
func (b *Blockchain) CurrentBlock() *types.Header {
ctx, err := b.GetLatestContext()
if err != nil {
return b.zeroHeader
}

blockHeight := ctx.BlockHeight()
// prevent the reorg from triggering after a restart since previousHeaderHash is stored as an in-memory variable
if blockHeight > 1 && b.previousHeaderHash == (common.Hash{}) {
previousHeaderHash := b.getPreviousHeaderHash()
if blockHeight > 1 && previousHeaderHash == (common.Hash{}) {
return b.zeroHeader
}

Expand All @@ -99,7 +102,7 @@ func (b Blockchain) CurrentBlock() *types.Header {
Time: uint64(blockTime), // #nosec G115 -- overflow not a concern with unix time
GasLimit: b.blockGasLimit,
GasUsed: gasUsed,
ParentHash: b.previousHeaderHash,
ParentHash: previousHeaderHash,
Root: appHash, // we actually don't care that this isn't the getCtxCallback header, as long as we properly track roots and parent roots to prevent the reorg from triggering
Difficulty: big.NewInt(0), // 0 difficulty on PoS
}
Expand Down Expand Up @@ -139,7 +142,7 @@ func (b Blockchain) CurrentBlock() *types.Header {
// Cosmos chains have instant finality, so this method should only be called for the genesis block (block 0)
// or block 1, as reorgs never occur. Any other call indicates a bug in the mempool logic.
// Panics if called for blocks beyond block 1, as this would indicate an attempted reorg.
func (b Blockchain) GetBlock(_ common.Hash, _ uint64) *types.Block {
func (b *Blockchain) GetBlock(_ common.Hash, _ uint64) *types.Block {
currBlock := b.CurrentBlock()
blockNumber := currBlock.Number.Int64()

Expand All @@ -161,7 +164,7 @@ func (b Blockchain) GetBlock(_ common.Hash, _ uint64) *types.Block {

// SubscribeChainHeadEvent allows subscribers to receive notifications when new blocks are finalized.
// Returns a subscription that will receive ChainHeadEvent notifications via the provided channel.
func (b Blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
func (b *Blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
b.logger.Debug("new chain head event subscription created")
return b.chainHeadFeed.Subscribe(ch)
}
Expand All @@ -170,19 +173,19 @@ func (b Blockchain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event
func (b *Blockchain) NotifyNewBlock() {
latestCtx, err := b.newLatestContext()
if err != nil {
b.latestCtx = sdk.Context{}
b.setLatestContext(sdk.Context{})
b.logger.Debug("failed to get latest context, notifying chain head", "error", err)
}
b.latestCtx = latestCtx
b.setLatestContext(latestCtx)
header := b.CurrentBlock()
headerHash := header.Hash()

b.logger.Debug("notifying new block",
"block_number", header.Number.String(),
"block_hash", headerHash.Hex(),
"previous_hash", b.previousHeaderHash.Hex())
"previous_hash", b.getPreviousHeaderHash().Hex())

b.previousHeaderHash = headerHash
b.setPreviousHeaderHash(headerHash)
b.chainHeadFeed.Send(core.ChainHeadEvent{Header: header})

b.logger.Debug("chain head event sent to feed")
Expand All @@ -192,7 +195,7 @@ func (b *Blockchain) NotifyNewBlock() {
// In practice, this always returns the most recent state since the mempool
// only needs current state for validation. Historical state access is not supported
// as it's never required by the txpool.
func (b Blockchain) StateAt(hash common.Hash) (vm.StateDB, error) {
func (b *Blockchain) StateAt(hash common.Hash) (vm.StateDB, error) {
b.logger.Debug("StateAt called", "requested_hash", hash.Hex())

// This is returned at block 0, before the context is available.
Expand All @@ -215,10 +218,30 @@ func (b Blockchain) StateAt(hash common.Hash) (vm.StateDB, error) {
return stateDB, nil
}

func (b *Blockchain) getPreviousHeaderHash() common.Hash {
b.mu.RLock()
defer b.mu.RUnlock()
return b.previousHeaderHash
}

func (b *Blockchain) setPreviousHeaderHash(h common.Hash) {
b.mu.Lock()
defer b.mu.Unlock()
b.previousHeaderHash = h
}

func (b *Blockchain) setLatestContext(ctx sdk.Context) {
b.mu.Lock()
defer b.mu.Unlock()
b.latestCtx = ctx
}

// GetLatestContext returns the latest context as updated by the block,
// or attempts to retrieve it again if unavailable.
func (b Blockchain) GetLatestContext() (sdk.Context, error) {
func (b *Blockchain) GetLatestContext() (sdk.Context, error) {
b.logger.Debug("getting latest context")
b.mu.RLock()
defer b.mu.RUnlock()

if b.latestCtx.Context() != nil {
return b.latestCtx, nil
Expand All @@ -229,7 +252,7 @@ func (b Blockchain) GetLatestContext() (sdk.Context, error) {

// newLatestContext retrieves the most recent query context from the application.
// This provides access to the current blockchain state for transaction validation and execution.
func (b Blockchain) newLatestContext() (sdk.Context, error) {
func (b *Blockchain) newLatestContext() (sdk.Context, error) {
b.logger.Debug("getting latest context")

ctx, err := b.getCtxCallback(0, false)
Expand Down
123 changes: 123 additions & 0 deletions mempool/blockchain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package mempool_test

import (
"math/big"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"

"github.com/cosmos/evm/config"
"github.com/cosmos/evm/mempool"
"github.com/cosmos/evm/mempool/mocks"
"github.com/cosmos/evm/x/vm/statedb"
vmtypes "github.com/cosmos/evm/x/vm/types"

"cosmossdk.io/log"
storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

// createMockContext creates a basic mock context for testing
func createMockContext() sdk.Context {
return sdk.Context{}.
WithBlockTime(time.Now()).
WithBlockHeader(cmtproto.Header{AppHash: []byte("00000000000000000000000000000000")}).
WithBlockHeight(1)
}

// TestBlockchainRaceCondition tests concurrent access to NotifyNewBlock and StateAt
// to ensure there are no race conditions between these operations.
func TestBlockchainRaceCondition(t *testing.T) {
logger := log.NewNopLogger()

// Create mock keepers using generated mocks
mockVMKeeper := mocks.NewVmKeeper(t)
mockFeeMarketKeeper := mocks.NewFeeMarketKeeper(t)

// Set up mock expectations for methods that will be called
mockVMKeeper.On("GetBaseFee", mock.Anything).Return(big.NewInt(1000000000)).Maybe() // 1 gwei
mockFeeMarketKeeper.On("GetBlockGasWanted", mock.Anything).Return(uint64(10000000)).Maybe() // 10M gas
mockVMKeeper.On("GetParams", mock.Anything).Return(vmtypes.DefaultParams()).Maybe()
mockVMKeeper.On("GetAccount", mock.Anything, common.Address{}).Return(&statedb.Account{}).Maybe()
mockVMKeeper.On("GetState", mock.Anything, common.Address{}, common.Hash{}).Return(common.Hash{}).Maybe()
mockVMKeeper.On("GetCode", mock.Anything, common.Hash{}).Return([]byte{}).Maybe()
mockVMKeeper.On("GetCodeHash", mock.Anything, common.Address{}).Return(common.Hash{}).Maybe()
mockVMKeeper.On("ForEachStorage", mock.Anything, common.Address{}, mock.AnythingOfType("func(common.Hash, common.Hash) bool")).Maybe()
mockVMKeeper.On("KVStoreKeys").Return(make(map[string]*storetypes.KVStoreKey)).Maybe()

err := vmtypes.NewEVMConfigurator().WithEVMCoinInfo(config.ChainsCoinInfo[config.EighteenDecimalsChainID]).Configure()
require.NoError(t, err)

// Mock context callback that returns a valid context
getCtxCallback := func(height int64, prove bool) (sdk.Context, error) {
return createMockContext(), nil
}

blockchain := mempool.NewBlockchain(
getCtxCallback,
logger,
mockVMKeeper,
mockFeeMarketKeeper,
21000000, // block gas limit
)

const numIterations = 100
var wg sync.WaitGroup

// Channel to collect any errors from goroutines
errChan := make(chan error, numIterations*2)

// Start goroutine that calls NotifyNewBlock repeatedly
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numIterations; i++ {
blockchain.NotifyNewBlock()
// Small delay to allow interleaving
time.Sleep(time.Microsecond)
}
}()

// Start goroutine that calls StateAt repeatedly
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numIterations; i++ {
hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
_, err := blockchain.StateAt(hash)
if err != nil {
errChan <- err
return
}
// Small delay to allow interleaving
time.Sleep(time.Microsecond)
}
}()

// Wait for both goroutines to complete
wg.Wait()
close(errChan)

// Check for any errors
for err := range errChan {
require.NoError(t, err)
}

// Basic validation - ensure blockchain still functions correctly after concurrent access
header := blockchain.CurrentBlock()
require.NotNil(t, header)
require.Equal(t, int64(1), header.Number.Int64())

// Ensure StateAt still works after concurrent access
hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
stateDB, err := blockchain.StateAt(hash)
require.NoError(t, err)
require.NotNil(t, stateDB)
}
2 changes: 1 addition & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
config.BlockGasLimit = fallbackBlockGasLimit
}

blockchain = newBlockchain(getCtxCallback, logger, vmKeeper, feeMarketKeeper, config.BlockGasLimit)
blockchain = NewBlockchain(getCtxCallback, logger, vmKeeper, feeMarketKeeper, config.BlockGasLimit)

// Create txPool from configuration
legacyConfig := legacypool.DefaultConfig
Expand Down
45 changes: 45 additions & 0 deletions mempool/mocks/FeeMarketKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading