Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions evmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/spf13/cast"

Expand Down Expand Up @@ -145,7 +146,10 @@ func init() {
defaultNodeHome = evmdconfig.MustGetDefaultNodeHome()
}

const appName = "evmd"
const (
appName = "evmd"
defaultCloseTimeout = 5 * time.Second
)

// defaultNodeHome default home directories for the application daemon
var defaultNodeHome string
Expand Down Expand Up @@ -1148,17 +1152,26 @@ func (app *EVMD) SetClientCtx(clientCtx client.Context) {

// Close unsubscribes from the CometBFT event bus (if set) and closes the underlying BaseApp.
func (app *EVMD) Close() error {
return app.CloseWithTimeout(defaultCloseTimeout)
}

// CloseWithTimeout closes the application with a timeout.
// If timeout is 0, it forces immediate shutdown without waiting for mempool cleanup.
func (app *EVMD) CloseWithTimeout(timeout time.Duration) error {
var err error
if m, ok := app.GetMempool().(*evmmempool.ExperimentalEVMMempool); ok {
err = m.Close()
app.Logger().Info("Shutting down mempool", "timeout", timeout)
err = m.CloseWithTimeout(timeout)
}

msg := fmt.Sprintf("Application shutdown with timeout %s", timeout)
err = errors.Join(err, app.BaseApp.Close())
msg := "Application gracefully shutdown"
if err == nil {
app.Logger().Info(msg)
} else {
app.Logger().Error(msg, "error", err)
}

return err
}

Expand Down
26 changes: 22 additions & 4 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
Expand All @@ -29,7 +30,10 @@ import (

var _ sdkmempool.ExtMempool = &ExperimentalEVMMempool{}

const SubscriberName = "evm"
const (
SubscriberName = "evm"
defaultCloseTimeout = 5 * time.Second
)

type (
// ExperimentalEVMMempool is a unified mempool that manages both EVM and Cosmos SDK transactions.
Expand Down Expand Up @@ -392,12 +396,26 @@ func (m *ExperimentalEVMMempool) SetEventBus(eventBus *cmttypes.EventBus) {
}()
}

// Close unsubscribes from the CometBFT event bus.
// Close unsubscribes from the CometBFT event bus and shuts down the mempool.
func (m *ExperimentalEVMMempool) Close() error {
return m.CloseWithTimeout(defaultCloseTimeout)
}

// CloseWithTimeout shuts down the mempool with a timeout.
// If timeout is 0, it forces immediate shutdown without waiting.
func (m *ExperimentalEVMMempool) CloseWithTimeout(timeout time.Duration) error {
var errs []error
if m.eventBus != nil {
return m.eventBus.Unsubscribe(context.Background(), SubscriberName, stream.NewBlockHeaderEvents)
if err := m.eventBus.Unsubscribe(context.Background(), SubscriberName, stream.NewBlockHeaderEvents); err != nil {
errs = append(errs, fmt.Errorf("failed to unsubscribe from event bus: %w", err))
}
}
return nil

if err := m.txPool.CloseWithTimeout(timeout); err != nil {
errs = append(errs, fmt.Errorf("failed to close txpool: %w", err))
}

return errors.Join(errs...)
}

// getEVMMessage validates that the transaction contains exactly one message and returns it if it's an EVM message.
Expand Down
32 changes: 30 additions & 2 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package legacypool

import (
"context"
"errors"
"maps"
"math/big"
Expand Down Expand Up @@ -255,6 +256,10 @@

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

// Shutdown context for immediate termination
shutdownCtx context.Context
shutdownCancel context.CancelFunc

BroadcastTxFn func(txs []*types.Transaction) error
}

Expand All @@ -269,6 +274,7 @@
config = (&config).sanitize()

// Create the transaction pool with its initial settings
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
pool := &LegacyPool{
config: config,
chain: chain,
Expand All @@ -284,6 +290,8 @@
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
}
pool.priced = newPricedList(pool.all)

Expand Down Expand Up @@ -389,9 +397,29 @@

// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Terminate the pool reorger and return
return pool.CloseWithTimeout(5 * time.Second)
}

// CloseWithTimeout terminates the transaction pool with a timeout.
// If timeout is 0, it forces immediate shutdown without waiting.
func (pool *LegacyPool) CloseWithTimeout(timeout time.Duration) error {
pool.shutdownCancel()
close(pool.reorgShutdownCh)
pool.wg.Wait()

// wait for wg with timeout
if timeout > 0 {
done := make(chan struct{})
go func() {
pool.wg.Wait()
close(done)
}()

select {
case <-done:
case <-time.After(timeout):
log.Warn("Transaction pool shutdown timeout, some goroutines may still be running")
}
}

log.Info("Transaction pool stopped")
return nil
Expand Down
68 changes: 52 additions & 16 deletions mempool/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package txpool

import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
Expand All @@ -40,6 +42,8 @@ const (
TxStatusQueued
TxStatusPending
TxStatusIncluded

defaultCloseTimeout = 5 * time.Second
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -76,6 +80,10 @@ type TxPool struct {
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done

// Shutdown context for immediate termination
shutdownCtx context.Context
shutdownCancel context.CancelFunc
}

// New creates a new transaction pool to gather, sort and filter inbound
Expand All @@ -96,14 +104,17 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
if err != nil {
return nil, err
}
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
pool := &TxPool{
Subpools: subpools,
chain: chain,
signer: types.LatestSigner(chain.Config()),
state: statedb,
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
Subpools: subpools,
chain: chain,
signer: types.LatestSigner(chain.Config()),
state: statedb,
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
shutdownCtx: shutdownCtx,
shutdownCancel: shutdownCancel,
}
reserver := NewReservationTracker()
for i, subpool := range subpools {
Expand All @@ -120,14 +131,36 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {

// Close terminates the transaction pool and all its Subpools.
func (p *TxPool) Close() error {
return p.CloseWithTimeout(defaultCloseTimeout)
}

// CloseWithTimeout terminates the transaction pool and all its Subpools with a timeout.
// If timeout is 0, it forces immediate shutdown without waiting.
func (p *TxPool) CloseWithTimeout(timeout time.Duration) error {
var errs []error

// Terminate the reset loop and wait for it to finish
errc := make(chan error)
p.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
if timeout > 0 {
errc := make(chan error, 1)

select {
case p.quit <- errc:
// Wait for worker response or timeout
select {
case err := <-errc:
if err != nil {
errs = append(errs, err)
}
case <-time.After(timeout):
errs = append(errs, errors.New("timeout waiting for pool loop to terminate"))
}
case <-time.After(timeout):
errs = append(errs, errors.New("timeout sending quit signal to pool loop"))
}
}

// Cancel context as fallback after graceful shutdown attempt
p.shutdownCancel()

// Terminate each subpool
for _, subpool := range p.Subpools {
if err := subpool.Close(); err != nil {
Expand All @@ -137,10 +170,7 @@ func (p *TxPool) Close() error {
// Unsubscribe anyone still listening for tx events
p.subs.Close()

if len(errs) > 0 {
return fmt.Errorf("subpool close errors: %v", errs)
}
return nil
return errors.Join(errs...)
}

// loop is the transaction pool's main event loop, waiting for and reacting to
Expand Down Expand Up @@ -251,6 +281,12 @@ func (p *TxPool) loop(head *types.Header) {
// queue waiting for a reset.
resetForced = true
resetWaiter = syncc

case <-p.shutdownCtx.Done():
// Immediate shutdown requested, break out immediately
log.Info("Transaction pool loop terminating due to shutdown context")
errc = make(chan error, 1)
errc <- errors.New("pool shutdown requested")
}
}
// Notify the closer of termination (no error possible for now)
Expand Down
2 changes: 1 addition & 1 deletion server/config/migration/v0.50-app.toml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ stop-node-on-err = true
#
# Note, this configuration only applies to SDK built-in app-side mempool
# implementations.
max-txs = -1
max-txs = 10000

###############################################################################
### EVM Configuration ###
Expand Down
Loading