Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// StuckTxsEvent is posted when stuck transactions need rebroadcast.
type StuckTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct {
Block *types.Block
Expand Down
9 changes: 9 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,15 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool
}
}

// SubscribeRebroadcastTransactions returns a no-op subscription. Blob pool does not
// support stuck transaction rebroadcast since blobs are handled differently.
func (p *BlobPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *BlobPool) Nonce(addr common.Address) uint64 {
Expand Down
46 changes: 46 additions & 0 deletions core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1783,3 +1783,49 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
}
}
}

// TestSubscribeRebroadcastTransactions tests that the blob pool returns a no-op
// subscription for rebroadcast events since blobs are not rebroadcast.
func TestSubscribeRebroadcastTransactions(t *testing.T) {
// Create a temporary directory for the pool data
storage := t.TempDir()

// Create a minimal state database
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.Commit(0, true, false)

// Create a test blockchain with minimal config
chain := &testBlockChain{
config: testChainConfig,
basefee: uint256.NewInt(params.InitialBaseFee),
blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice),
statedb: statedb,
}

// Create the blob pool
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()

// Create a channel and subscribe to rebroadcast events
ch := make(chan core.StuckTxsEvent, 1)
sub := pool.SubscribeRebroadcastTransactions(ch)

// Verify the subscription is valid
if sub == nil {
t.Fatal("expected non-nil subscription")
}

// Unsubscribe should work without issues
sub.Unsubscribe()

// Channel should be empty (no events should be sent)
select {
case event := <-ch:
t.Fatalf("unexpected event: %v", event)
default:
// Expected - no events
}
}
148 changes: 148 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ var (
// misc metrics for functions using global lock
reportTimer = metrics.NewRegisteredTimer("txpool/misc/report", nil)
evictTimer = metrics.NewRegisteredTimer("txpool/misc/evict", nil)

// rebroadcast metrics
rebroadcastTxMeter = metrics.NewRegisteredMeter("txpool/rebroadcast", nil) // Transactions identified for rebroadcast
rebroadcastIdentifyTimer = metrics.NewRegisteredTimer("txpool/rebroadcast/identify", nil) // Time to identify stuck transactions
rebroadcastTrackingGauge = metrics.NewRegisteredGauge("txpool/rebroadcast/tracking", nil) // Transactions being tracked for rebroadcast
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -188,6 +193,12 @@ type Config struct {

// Transaction filtering configuration
FilteredAddresses map[common.Address]struct{} // Pre-loaded filtered addresses (populated by config)

// Rebroadcast configuration for stuck transactions
Rebroadcast bool // Enable stuck transaction rebroadcast
RebroadcastInterval time.Duration // Interval between rebroadcast checks
RebroadcastMaxAge time.Duration // Max age for rebroadcast eligibility
RebroadcastBatchSize int // Max transactions per rebroadcast cycle
}

// DefaultConfig contains the default configurations for the transaction pool.
Expand All @@ -205,6 +216,11 @@ var DefaultConfig = Config{

Lifetime: 3 * time.Hour,
AllowUnprotectedTxs: false,

Rebroadcast: true,
RebroadcastInterval: 30 * time.Second,
RebroadcastMaxAge: 10 * time.Minute,
RebroadcastBatchSize: 200,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -240,6 +256,19 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultConfig.Lifetime)
conf.Lifetime = DefaultConfig.Lifetime
}
// Sanitize rebroadcast configuration
if conf.RebroadcastInterval < 1*time.Second {
log.Warn("Sanitizing invalid txpool rebroadcast interval", "provided", conf.RebroadcastInterval, "updated", DefaultConfig.RebroadcastInterval)
conf.RebroadcastInterval = DefaultConfig.RebroadcastInterval
}
if conf.RebroadcastMaxAge < conf.RebroadcastInterval {
log.Warn("Sanitizing invalid txpool rebroadcast max age", "provided", conf.RebroadcastMaxAge, "updated", DefaultConfig.RebroadcastMaxAge)
conf.RebroadcastMaxAge = DefaultConfig.RebroadcastMaxAge
}
if conf.RebroadcastBatchSize < 1 {
log.Warn("Sanitizing invalid txpool rebroadcast batch size", "provided", conf.RebroadcastBatchSize, "updated", DefaultConfig.RebroadcastBatchSize)
conf.RebroadcastBatchSize = DefaultConfig.RebroadcastBatchSize
}
return conf
}

Expand Down Expand Up @@ -297,6 +326,10 @@ type LegacyPool struct {
promoteTxCh chan struct{} // should be used only for tests

filteredAddrs map[common.Address]struct{} // Map of addresses to filter

// Rebroadcast tracking
rebroadcastTxFeed event.Feed // Feed for stuck transaction events
lastRebroadcast map[common.Hash]time.Time // Track last rebroadcast time per tx hash
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -326,6 +359,7 @@ func New(config Config, chain BlockChain, options ...func(pool *LegacyPool)) *Le
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
filteredAddrs: make(map[common.Address]struct{}),
lastRebroadcast: make(map[common.Hash]time.Time),
}
pool.priced = newPricedList(pool.all)

Expand Down Expand Up @@ -404,9 +438,22 @@ func (pool *LegacyPool) loop() {
defer report.Stop()
defer evict.Stop()

// Start the rebroadcast ticker if enabled
var rebroadcast *time.Ticker
if pool.config.Rebroadcast {
rebroadcast = time.NewTicker(pool.config.RebroadcastInterval)
defer rebroadcast.Stop()
}

// Notify tests that the init phase is done
close(pool.initDoneCh)
for {
// Use a nil channel for rebroadcast if disabled
var rebroadcastC <-chan time.Time
if rebroadcast != nil {
rebroadcastC = rebroadcast.C
}

select {
// Handle pool shutdown
case <-pool.reorgShutdownCh:
Expand All @@ -426,6 +473,30 @@ func (pool *LegacyPool) loop() {
prevPending, prevQueued, prevStales = pending, queued, stales
}

// Handle stuck transaction rebroadcast
case <-rebroadcastC:
// Use RLock for reading to minimize contention with add()/reorg()
identifyStart := time.Now()
pool.mu.RLock()
stuckTxs := pool.identifyStuckTransactions()
pool.mu.RUnlock()
rebroadcastIdentifyTimer.Update(time.Since(identifyStart))

if len(stuckTxs) > 0 {
// Brief Lock only to update lastRebroadcast timestamps
now := time.Now()
pool.mu.Lock()
for _, tx := range stuckTxs {
pool.lastRebroadcast[tx.Hash()] = now
}
rebroadcastTrackingGauge.Update(int64(len(pool.lastRebroadcast)))
pool.mu.Unlock()

pool.rebroadcastTxFeed.Send(core.StuckTxsEvent{Txs: stuckTxs})
rebroadcastTxMeter.Mark(int64(len(stuckTxs)))
log.Debug("Identified stuck transactions for rebroadcast", "count", len(stuckTxs))
}

// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
Expand Down Expand Up @@ -473,6 +544,81 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
return pool.txFeed.Subscribe(ch)
}

// SubscribeRebroadcastTransactions registers a subscription for stuck transaction
// rebroadcast events.
func (pool *LegacyPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription {
return pool.rebroadcastTxFeed.Subscribe(ch)
}

// identifyStuckTransactions identifies pending transactions that may be stuck
// and need rebroadcasting. It returns transactions that:
// - Have been pending longer than RebroadcastInterval but less than RebroadcastMaxAge
// - Have not been rebroadcast recently (within RebroadcastInterval)
// - Are immediately executable (gas price meets current requirements)
//
// Must be called with pool.mu.RLock held (read lock only - does not modify pool state).
func (pool *LegacyPool) identifyStuckTransactions() []*types.Transaction {
now := time.Now()
head := pool.currentHead.Load()
if head == nil {
return nil
}

// Calculate base fee only if London is enabled and header has base fee
var baseFee *big.Int
if pool.chainconfig.IsLondon(head.Number) && head.BaseFee != nil {
baseFee = eip1559.CalcBaseFee(pool.chainconfig, head)
}
minTip := pool.gasTip.Load().ToBig()

var stuckTxs []*types.Transaction

for _, list := range pool.pending {
for _, tx := range list.Flatten() {
hash := tx.Hash()
age := now.Sub(tx.Time())

// Skip if too young (hasn't had time to propagate yet)
if age < pool.config.RebroadcastInterval {
continue
}

// Check rebroadcast history
lastTime, wasRebroadcast := pool.lastRebroadcast[hash]
if wasRebroadcast {
// Skip if recently rebroadcast
if now.Sub(lastTime) < pool.config.RebroadcastInterval {
continue
}
// Skip if we've been rebroadcasting this tx for too long (max age applies
// only to previously rebroadcast txs - new txs that just became executable
// after a base fee drop should still be considered)
if pool.config.RebroadcastMaxAge > 0 && age > pool.config.RebroadcastMaxAge {
continue
}
}

// Skip if not immediately executable (gas price too low for current conditions)
// For EIP-1559 transactions, check gas fee cap against base fee
if baseFee != nil && tx.GasFeeCap().Cmp(baseFee) < 0 {
continue
}
if tx.GasTipCap().Cmp(minTip) < 0 {
continue
}

stuckTxs = append(stuckTxs, tx)

// Enforce batch limit
if len(stuckTxs) >= pool.config.RebroadcastBatchSize {
return stuckTxs
}
}
}

return stuckTxs
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
Expand Down Expand Up @@ -1351,6 +1497,8 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if outofbound {
pool.priced.Removed(1)
}
// Clean up rebroadcast tracking
delete(pool.lastRebroadcast, hash)
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
Expand Down
Loading
Loading