From 5c7a1821e07767d80f9e5654f28f4447dd1058c6 Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 16 Jan 2026 14:10:32 -0800 Subject: [PATCH 1/5] txpool: add stuck transaction rebroadcast mechanism When transactions are broadcast to peers, they're marked as "known" in each peer's knownTxs cache. If the network send fails or the peer fails to request transaction body, the peer will never receive the transaction but the hash remains in knownTxs, preventing future rebroadcasts and causing transactions to get stuck. This change adds a periodic rebroadcast mechanism that: - Identifies immediately executable transactions older than RebroadcastInterval but younger than RebroadcastMaxAge - Clears their hashes from peers' knownTxs caches - Rebroadcasts them to the network --- core/events.go | 3 + core/txpool/blobpool/blobpool.go | 9 + core/txpool/legacypool/legacypool.go | 145 ++++++++++++ core/txpool/legacypool/legacypool_test.go | 262 ++++++++++++++++++++++ core/txpool/subpool.go | 3 + core/txpool/txpool.go | 16 ++ eth/handler.go | 48 +++- eth/handler_test.go | 8 + eth/peerset.go | 11 + eth/protocols/eth/peer.go | 13 ++ internal/cli/server/config.go | 48 +++- internal/cli/server/flags.go | 28 +++ 12 files changed, 582 insertions(+), 12 deletions(-) diff --git a/core/events.go b/core/events.go index 3b5539e751..b8611daff5 100644 --- a/core/events.go +++ b/core/events.go @@ -24,6 +24,9 @@ import ( // NewTxsEvent is posted when a batch of transactions enter the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } +// StuckTxsEvent is posted when stuck transactions need rebroadcast. +type StuckTxsEvent struct{ Txs []*types.Transaction } + // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct { Block *types.Block diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index c7268e6ce2..3f603f23d4 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1748,6 +1748,15 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool } } +// SubscribeRebroadcastTransactions returns a no-op subscription. Blob pool does not +// support stuck transaction rebroadcast since blobs are handled differently. +func (p *BlobPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) +} + // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (p *BlobPool) Nonce(addr common.Address) uint64 { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 5aebd5b171..d9d3e36867 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -150,6 +150,11 @@ var ( // misc metrics for functions using global lock reportTimer = metrics.NewRegisteredTimer("txpool/misc/report", nil) evictTimer = metrics.NewRegisteredTimer("txpool/misc/evict", nil) + + // rebroadcast metrics + rebroadcastTxMeter = metrics.NewRegisteredMeter("txpool/rebroadcast", nil) // Transactions identified for rebroadcast + rebroadcastIdentifyTimer = metrics.NewRegisteredTimer("txpool/rebroadcast/identify", nil) // Time to identify stuck transactions + rebroadcastTrackingGauge = metrics.NewRegisteredGauge("txpool/rebroadcast/tracking", nil) // Transactions being tracked for rebroadcast ) // BlockChain defines the minimal set of methods needed to back a tx pool with @@ -188,6 +193,12 @@ type Config struct { // Transaction filtering configuration FilteredAddresses map[common.Address]struct{} // Pre-loaded filtered addresses (populated by config) + + // Rebroadcast configuration for stuck transactions + Rebroadcast bool // Enable stuck transaction rebroadcast + RebroadcastInterval time.Duration // Interval between rebroadcast checks + RebroadcastMaxAge time.Duration // Max age for rebroadcast eligibility + RebroadcastBatchSize int // Max transactions per rebroadcast cycle } // DefaultConfig contains the default configurations for the transaction pool. @@ -205,6 +216,11 @@ var DefaultConfig = Config{ Lifetime: 3 * time.Hour, AllowUnprotectedTxs: false, + + Rebroadcast: true, + RebroadcastInterval: 5 * time.Second, + RebroadcastMaxAge: 10 * time.Minute, + RebroadcastBatchSize: 200, } // sanitize checks the provided user configurations and changes anything that's @@ -240,6 +256,19 @@ func (config *Config) sanitize() Config { log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultConfig.Lifetime) conf.Lifetime = DefaultConfig.Lifetime } + // Sanitize rebroadcast configuration + if conf.RebroadcastInterval < 1*time.Second { + log.Warn("Sanitizing invalid txpool rebroadcast interval", "provided", conf.RebroadcastInterval, "updated", DefaultConfig.RebroadcastInterval) + conf.RebroadcastInterval = DefaultConfig.RebroadcastInterval + } + if conf.RebroadcastMaxAge < conf.RebroadcastInterval { + log.Warn("Sanitizing invalid txpool rebroadcast max age", "provided", conf.RebroadcastMaxAge, "updated", DefaultConfig.RebroadcastMaxAge) + conf.RebroadcastMaxAge = DefaultConfig.RebroadcastMaxAge + } + if conf.RebroadcastBatchSize < 1 { + log.Warn("Sanitizing invalid txpool rebroadcast batch size", "provided", conf.RebroadcastBatchSize, "updated", DefaultConfig.RebroadcastBatchSize) + conf.RebroadcastBatchSize = DefaultConfig.RebroadcastBatchSize + } return conf } @@ -297,6 +326,10 @@ type LegacyPool struct { promoteTxCh chan struct{} // should be used only for tests filteredAddrs map[common.Address]struct{} // Map of addresses to filter + + // Rebroadcast tracking + rebroadcastTxFeed event.Feed // Feed for stuck transaction events + lastRebroadcast map[common.Hash]time.Time // Track last rebroadcast time per tx hash } type txpoolResetRequest struct { @@ -326,6 +359,7 @@ func New(config Config, chain BlockChain, options ...func(pool *LegacyPool)) *Le reorgShutdownCh: make(chan struct{}), initDoneCh: make(chan struct{}), filteredAddrs: make(map[common.Address]struct{}), + lastRebroadcast: make(map[common.Hash]time.Time), } pool.priced = newPricedList(pool.all) @@ -404,9 +438,22 @@ func (pool *LegacyPool) loop() { defer report.Stop() defer evict.Stop() + // Start the rebroadcast ticker if enabled + var rebroadcast *time.Ticker + if pool.config.Rebroadcast { + rebroadcast = time.NewTicker(pool.config.RebroadcastInterval) + defer rebroadcast.Stop() + } + // Notify tests that the init phase is done close(pool.initDoneCh) for { + // Use a nil channel for rebroadcast if disabled + var rebroadcastC <-chan time.Time + if rebroadcast != nil { + rebroadcastC = rebroadcast.C + } + select { // Handle pool shutdown case <-pool.reorgShutdownCh: @@ -426,6 +473,30 @@ func (pool *LegacyPool) loop() { prevPending, prevQueued, prevStales = pending, queued, stales } + // Handle stuck transaction rebroadcast + case <-rebroadcastC: + // Use RLock for reading to minimize contention with add()/reorg() + identifyStart := time.Now() + pool.mu.RLock() + stuckTxs := pool.identifyStuckTransactions() + pool.mu.RUnlock() + rebroadcastIdentifyTimer.Update(time.Since(identifyStart)) + + if len(stuckTxs) > 0 { + // Brief Lock only to update lastRebroadcast timestamps + now := time.Now() + pool.mu.Lock() + for _, tx := range stuckTxs { + pool.lastRebroadcast[tx.Hash()] = now + } + rebroadcastTrackingGauge.Update(int64(len(pool.lastRebroadcast))) + pool.mu.Unlock() + + pool.rebroadcastTxFeed.Send(core.StuckTxsEvent{Txs: stuckTxs}) + rebroadcastTxMeter.Mark(int64(len(stuckTxs))) + log.Debug("Identified stuck transactions for rebroadcast", "count", len(stuckTxs)) + } + // Handle inactive account transaction eviction case <-evict.C: pool.mu.Lock() @@ -473,6 +544,78 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs return pool.txFeed.Subscribe(ch) } +// SubscribeRebroadcastTransactions registers a subscription for stuck transaction +// rebroadcast events. +func (pool *LegacyPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription { + return pool.rebroadcastTxFeed.Subscribe(ch) +} + +// identifyStuckTransactions identifies pending transactions that may be stuck +// and need rebroadcasting. It returns transactions that: +// - Have been pending longer than RebroadcastInterval but less than RebroadcastMaxAge +// - Have not been rebroadcast recently (within RebroadcastInterval) +// - Are immediately executable (gas price meets current requirements) +// +// identifyStuckTransactions finds pending transactions that may be stuck and need rebroadcast. +// Must be called with pool.mu.RLock held (read lock only - does not modify pool state). +func (pool *LegacyPool) identifyStuckTransactions() []*types.Transaction { + now := time.Now() + head := pool.currentHead.Load() + if head == nil { + return nil + } + + // Calculate base fee only if London is enabled and header has base fee + var baseFee *big.Int + if pool.chainconfig.IsLondon(head.Number) && head.BaseFee != nil { + baseFee = eip1559.CalcBaseFee(pool.chainconfig, head) + } + minTip := pool.gasTip.Load().ToBig() + + var stuckTxs []*types.Transaction + + for _, list := range pool.pending { + for _, tx := range list.Flatten() { + hash := tx.Hash() + age := now.Sub(tx.Time()) + + // Skip if too young (hasn't had time to propagate yet) + if age < pool.config.RebroadcastInterval { + continue + } + // Skip if too old (beyond max age, stop trying) + if age > pool.config.RebroadcastMaxAge { + continue + } + + // Skip if recently rebroadcast + if lastTime, ok := pool.lastRebroadcast[hash]; ok { + if now.Sub(lastTime) < pool.config.RebroadcastInterval { + continue + } + } + + // Skip if not immediately executable (gas price too low for current conditions) + // For EIP-1559 transactions, check gas fee cap against base fee + if baseFee != nil && tx.GasFeeCap().Cmp(baseFee) < 0 { + continue + } + if tx.GasTipCap().Cmp(minTip) < 0 { + continue + } + + stuckTxs = append(stuckTxs, tx) + + // Enforce batch limit + if len(stuckTxs) >= pool.config.RebroadcastBatchSize { + return stuckTxs + } + } + } + + return stuckTxs +} + // SetGasTip updates the minimum gas tip required by the transaction pool for a // new transaction, and drops all transactions below this threshold. func (pool *LegacyPool) SetGasTip(tip *big.Int) { @@ -1351,6 +1494,8 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo if outofbound { pool.priced.Removed(1) } + // Clean up rebroadcast tracking + delete(pool.lastRebroadcast, hash) // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 36b90fecfd..5d7c8c254f 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -5019,3 +5019,265 @@ func TestTxFiltering(t *testing.T) { } }) } + +// TestIdentifyStuckTransactions tests the identifyStuckTransactions method +func TestIdentifyStuckTransactions(t *testing.T) { + t.Parallel() + + // Create a test account + key, _ := crypto.GenerateKey() + + t.Run("TooYoungTransactions", func(t *testing.T) { + // Transactions younger than RebroadcastInterval should not be identified as stuck + pool, _ := setupPoolWithConfig(params.TestChainConfig) + defer pool.Close() + + // Add a new transaction (will have current timestamp) + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + pool.mu.Lock() + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + if len(stuckTxs) != 0 { + t.Errorf("expected 0 stuck transactions for young tx, got %d", len(stuckTxs)) + } + }) + + t.Run("TooOldTransactions", func(t *testing.T) { + // Transactions older than RebroadcastMaxAge should not be identified as stuck + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 5 * time.Second + config.RebroadcastBatchSize = 100 + + pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(testKey.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + // Simulate an old transaction by modifying the Time directly + pool.mu.Lock() + if list := pool.pending[from]; list != nil { + for _, pendingTx := range list.Flatten() { + // Set time to 10 seconds ago (older than maxAge of 5s) + pendingTx.SetTime(time.Now().Add(-10 * time.Second)) + } + } + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + if len(stuckTxs) != 0 { + t.Errorf("expected 0 stuck transactions for old tx, got %d", len(stuckTxs)) + } + }) + + t.Run("EligibleTransactions", func(t *testing.T) { + // Transactions within the age range should be identified as stuck + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(testKey.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + // Simulate a moderately old transaction (within range) + pool.mu.Lock() + if list := pool.pending[from]; list != nil { + for _, pendingTx := range list.Flatten() { + // Set time to 3 seconds ago (between interval and maxAge) + pendingTx.SetTime(time.Now().Add(-3 * time.Second)) + } + } + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + if len(stuckTxs) != 1 { + t.Errorf("expected 1 stuck transaction, got %d", len(stuckTxs)) + } + }) + + t.Run("RecentlyRebroadcast", func(t *testing.T) { + // Transactions that were recently rebroadcast should not be identified again + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(testKey.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + // First identification should find the tx + pool.mu.Lock() + if list := pool.pending[from]; list != nil { + for _, pendingTx := range list.Flatten() { + pendingTx.SetTime(time.Now().Add(-3 * time.Second)) + } + } + stuckTxs1 := pool.identifyStuckTransactions() + + if len(stuckTxs1) != 1 { + pool.mu.Unlock() + t.Fatalf("expected 1 stuck transaction on first call, got %d", len(stuckTxs1)) + } + + // Simulate what the caller does: update lastRebroadcast for identified txs + now := time.Now() + for _, tx := range stuckTxs1 { + pool.lastRebroadcast[tx.Hash()] = now + } + pool.mu.Unlock() + + // Immediately calling again should not find it (recently rebroadcast) + pool.mu.RLock() + stuckTxs2 := pool.identifyStuckTransactions() + pool.mu.RUnlock() + + if len(stuckTxs2) != 0 { + t.Errorf("expected 0 stuck transactions after recent rebroadcast, got %d", len(stuckTxs2)) + } + }) + + t.Run("BatchSizeLimit", func(t *testing.T) { + // Only RebroadcastBatchSize transactions should be returned + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 3 + + pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(testKey.PublicKey) + balance, _ := new(big.Int).SetString("100000000000000000000", 10) // 100 ETH + testAddBalance(pool, from, balance) + + // Add 5 transactions + for i := 0; i < 5; i++ { + tx := pricedTransaction(uint64(i), 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction %d: %v", i, err) + } + } + + // Make them eligible for rebroadcast + pool.mu.Lock() + if list := pool.pending[from]; list != nil { + for _, pendingTx := range list.Flatten() { + pendingTx.SetTime(time.Now().Add(-3 * time.Second)) + } + } + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + if len(stuckTxs) != 3 { + t.Errorf("expected batch size limit of 3, got %d", len(stuckTxs)) + } + }) +} + +// TestRebroadcastCleanup tests that lastRebroadcast entries are cleaned up when txs are removed +func TestRebroadcastCleanup(t *testing.T) { + t.Parallel() + + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, key := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + hash := tx.Hash() + + // Make it eligible and identify it + pool.mu.Lock() + if list := pool.pending[from]; list != nil { + for _, pendingTx := range list.Flatten() { + pendingTx.SetTime(time.Now().Add(-3 * time.Second)) + } + } + stuckTxs := pool.identifyStuckTransactions() + + // Simulate what the caller does: update lastRebroadcast for identified txs + now := time.Now() + for _, stuckTx := range stuckTxs { + pool.lastRebroadcast[stuckTx.Hash()] = now + } + + // Verify it's tracked + if _, ok := pool.lastRebroadcast[hash]; !ok { + pool.mu.Unlock() + t.Fatal("transaction should be tracked in lastRebroadcast") + } + + // Remove the transaction + pool.removeTx(hash, true, true) + + // Verify cleanup + _, stillTracked := pool.lastRebroadcast[hash] + pool.mu.Unlock() + + if stillTracked { + t.Error("transaction should be removed from lastRebroadcast after removal") + } +} + +// TestSubscribeRebroadcastTransactions tests the subscription mechanism +func TestSubscribeRebroadcastTransactions(t *testing.T) { + t.Parallel() + + pool, _ := setupPoolWithConfig(params.TestChainConfig) + defer pool.Close() + + ch := make(chan core.StuckTxsEvent, 1) + sub := pool.SubscribeRebroadcastTransactions(ch) + defer sub.Unsubscribe() + + // The subscription should be valid + if sub == nil { + t.Error("subscription should not be nil") + } +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 9233653822..9122ab195c 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -158,6 +158,9 @@ type SubPool interface { // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + // SubscribeRebroadcastTransactions subscribes to stuck transaction rebroadcast events. + SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription + // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. Nonce(addr common.Address) uint64 diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index e8d3bd581d..19741662ea 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -413,6 +413,22 @@ func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) return p.subs.Track(event.JoinSubscriptions(subs...)) } +// SubscribeRebroadcastTransactions registers a subscription for stuck transaction +// rebroadcast events from all subpools. +func (p *TxPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription { + if p == nil { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) + } + subs := make([]event.Subscription, len(p.subpools)) + for i, subpool := range p.subpools { + subs[i] = subpool.SubscribeRebroadcastTransactions(ch) + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + // PoolNonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (p *TxPool) PoolNonce(addr common.Address) uint64 { diff --git a/eth/handler.go b/eth/handler.go index 50bc52f161..2324756443 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -96,6 +96,9 @@ type txPool interface { // can decide whether to receive notifications only for newly seen transactions // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + + // SubscribeRebroadcastTransactions subscribes to stuck transaction rebroadcast events. + SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -144,6 +147,8 @@ type handler struct { eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription + stuckTxsCh chan core.StuckTxsEvent + stuckTxsSub event.Subscription minedBlockSub *event.TypeMuxSubscription blockRange *blockRangeState @@ -556,6 +561,12 @@ func (h *handler) Start(maxPeers int) { h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) go h.txBroadcastLoop() + // rebroadcast stuck transactions + h.wg.Add(1) + h.stuckTxsCh = make(chan core.StuckTxsEvent, txChanSize) + h.stuckTxsSub = h.txpool.SubscribeRebroadcastTransactions(h.stuckTxsCh) + go h.stuckTxBroadcastLoop() + // broadcast mined blocks h.wg.Add(1) h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{}) @@ -575,7 +586,8 @@ func (h *handler) Start(maxPeers int) { } func (h *handler) Stop() { - h.txsSub.Unsubscribe() // quits txBroadcastLoop + h.txsSub.Unsubscribe() // quits txBroadcastLoop + h.stuckTxsSub.Unsubscribe() // quits stuckTxBroadcastLoop h.minedBlockSub.Unsubscribe() h.blockRange.stop() @@ -790,6 +802,40 @@ func (h *handler) txBroadcastLoop() { } } +// stuckTxBroadcastLoop handles rebroadcasting of stuck transactions. +// It clears the transaction hashes from peers' knownTxs caches and +// rebroadcasts them to the network. +func (h *handler) stuckTxBroadcastLoop() { + defer h.wg.Done() + + for { + select { + case event := <-h.stuckTxsCh: + // Only rebroadcast when synced + if !h.synced.Load() { + continue + } + + // Collect hashes to clear from knownTxs + hashes := make([]common.Hash, len(event.Txs)) + for i, tx := range event.Txs { + hashes[i] = tx.Hash() + } + + // Clear from all peers' knownTxs + h.peers.ForgetTransactions(hashes) + + // Rebroadcast + h.BroadcastTransactions(event.Txs) + + log.Debug("Rebroadcast stuck transactions", "count", len(event.Txs)) + + case <-h.stuckTxsSub.Err(): + return + } + } +} + // enableSyncedFeatures enables the post-sync functionalities when the initial // sync is finished. func (h *handler) enableSyncedFeatures() { diff --git a/eth/handler_test.go b/eth/handler_test.go index 79a648eea8..f2385db920 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -159,6 +159,14 @@ func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bo return p.txFeed.Subscribe(ch) } +// SubscribeRebroadcastTransactions returns a no-op subscription for tests. +func (p *testTxPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) +} + // testHandler is a live implementation of the Ethereum protocol handler, just // preinitialized with some sane testing defaults and the transaction pool mocked // out. diff --git a/eth/peerset.go b/eth/peerset.go index a7be681828..a869823861 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -412,3 +412,14 @@ func (ps *peerSet) close() { } ps.closed = true } + +// ForgetTransactions removes the given transaction hashes from all peers' +// known transaction sets, allowing them to be re-broadcast. +func (ps *peerSet) ForgetTransactions(hashes []common.Hash) { + ps.lock.RLock() + defer ps.lock.RUnlock() + + for _, p := range ps.peers { + p.Peer.ForgetTransactions(hashes) + } +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 2a2f89c3a9..e01423193a 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -520,3 +520,16 @@ func (k *knownCache) Contains(hash common.Hash) bool { func (k *knownCache) Cardinality() int { return k.hashes.Cardinality() } + +// Remove removes a list of elements from the set. +func (k *knownCache) Remove(hashes ...common.Hash) { + for _, hash := range hashes { + k.hashes.Remove(hash) + } +} + +// ForgetTransactions removes the given transaction hashes from the peer's +// known transaction set, allowing them to be re-broadcast to this peer. +func (p *Peer) ForgetTransactions(hashes []common.Hash) { + p.knownTxs.Remove(hashes...) +} diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 4d27dcd850..4891d4cac2 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -365,6 +365,20 @@ type TxPoolConfig struct { // FilteredAddressesFile is the path to newline-separated list of addresses whose transactions will be filtered FilteredAddressesFile string `hcl:"filtered-addresses,optional" toml:"filtered-addresses,optional"` + + // Rebroadcast enables the stuck transaction rebroadcast mechanism + Rebroadcast bool `hcl:"rebroadcast,optional" toml:"rebroadcast,optional"` + + // RebroadcastInterval is the interval between rebroadcast checks + RebroadcastInterval time.Duration `hcl:"-,optional" toml:"-"` + RebroadcastIntervalRaw string `hcl:"rebroadcast-interval,optional" toml:"rebroadcast-interval,optional"` + + // RebroadcastMaxAge is the maximum age for rebroadcast eligibility + RebroadcastMaxAge time.Duration `hcl:"-,optional" toml:"-"` + RebroadcastMaxAgeRaw string `hcl:"rebroadcastmaxage,optional" toml:"rebroadcastmaxage,optional"` + + // RebroadcastBatchSize is the maximum number of transactions per rebroadcast cycle + RebroadcastBatchSize int `hcl:"rebroadcastbatch,optional" toml:"rebroadcastbatch,optional"` } type SealerConfig struct { @@ -772,17 +786,21 @@ func DefaultConfig() *Config { BorLogs: false, TxPool: &TxPoolConfig{ - Locals: []string{}, - NoLocals: false, - Journal: "transactions.rlp", - Rejournal: 1 * time.Hour, - PriceLimit: params.BorDefaultTxPoolPriceLimit, // bor's default - PriceBump: 10, - AccountSlots: 16, - GlobalSlots: 131072, - AccountQueue: 64, - GlobalQueue: 131072, - LifeTime: 3 * time.Hour, + Locals: []string{}, + NoLocals: false, + Journal: "transactions.rlp", + Rejournal: 1 * time.Hour, + PriceLimit: params.BorDefaultTxPoolPriceLimit, // bor's default + PriceBump: 10, + AccountSlots: 16, + GlobalSlots: 131072, + AccountQueue: 64, + GlobalQueue: 131072, + LifeTime: 3 * time.Hour, + Rebroadcast: true, + RebroadcastInterval: 5 * time.Second, + RebroadcastMaxAge: 10 * time.Minute, + RebroadcastBatchSize: 200, }, Sealer: &SealerConfig{ Enabled: false, @@ -1002,6 +1020,8 @@ func (c *Config) fillTimeDurations() error { {"jsonrpc.http.ep-requesttimeout", &c.JsonRPC.Http.ExecutionPoolRequestTimeout, &c.JsonRPC.Http.ExecutionPoolRequestTimeoutRaw}, {"txpool.lifetime", &c.TxPool.LifeTime, &c.TxPool.LifeTimeRaw}, {"txpool.rejournal", &c.TxPool.Rejournal, &c.TxPool.RejournalRaw}, + {"txpool.rebroadcast-interval", &c.TxPool.RebroadcastInterval, &c.TxPool.RebroadcastIntervalRaw}, + {"txpool.rebroadcastmaxage", &c.TxPool.RebroadcastMaxAge, &c.TxPool.RebroadcastMaxAgeRaw}, {"cache.timeout", &c.Cache.TrieTimeout, &c.Cache.TrieTimeoutRaw}, {"p2p.txarrivalwait", &c.P2P.TxArrivalWait, &c.P2P.TxArrivalWaitRaw}, } @@ -1126,6 +1146,12 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* } else { n.TxPool.FilteredAddresses = filteredAddrs } + + // Rebroadcast options + n.TxPool.Rebroadcast = c.TxPool.Rebroadcast + n.TxPool.RebroadcastInterval = c.TxPool.RebroadcastInterval + n.TxPool.RebroadcastMaxAge = c.TxPool.RebroadcastMaxAge + n.TxPool.RebroadcastBatchSize = c.TxPool.RebroadcastBatchSize } // miner options diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 356b3d971e..ccc821988b 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -313,6 +313,34 @@ func (c *Command) Flags(config *Config) *flagset.Flagset { Default: c.cliConfig.TxPool.FilteredAddressesFile, Group: "Transaction Pool", }) + f.BoolFlag(&flagset.BoolFlag{ + Name: "txpool.rebroadcast", + Usage: "Enable stuck transaction rebroadcast mechanism", + Value: &c.cliConfig.TxPool.Rebroadcast, + Default: c.cliConfig.TxPool.Rebroadcast, + Group: "Transaction Pool", + }) + f.DurationFlag(&flagset.DurationFlag{ + Name: "txpool.rebroadcast-interval", + Usage: "Interval between rebroadcast checks for stuck transactions", + Value: &c.cliConfig.TxPool.RebroadcastInterval, + Default: c.cliConfig.TxPool.RebroadcastInterval, + Group: "Transaction Pool", + }) + f.DurationFlag(&flagset.DurationFlag{ + Name: "txpool.rebroadcastmaxage", + Usage: "Maximum age for a transaction to be eligible for rebroadcast", + Value: &c.cliConfig.TxPool.RebroadcastMaxAge, + Default: c.cliConfig.TxPool.RebroadcastMaxAge, + Group: "Transaction Pool", + }) + f.IntFlag(&flagset.IntFlag{ + Name: "txpool.rebroadcastbatch", + Usage: "Maximum number of transactions to rebroadcast per cycle", + Value: &c.cliConfig.TxPool.RebroadcastBatchSize, + Default: c.cliConfig.TxPool.RebroadcastBatchSize, + Group: "Transaction Pool", + }) // sealer options f.BoolFlag(&flagset.BoolFlag{ From 83130029948107a9b5268490b35d1dafd2c680ed Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 16 Jan 2026 14:59:21 -0800 Subject: [PATCH 2/5] minor fix --- core/txpool/legacypool/legacypool.go | 18 ++- core/txpool/legacypool/legacypool_test.go | 189 +++++++++------------- 2 files changed, 86 insertions(+), 121 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index d9d3e36867..e55bcfca4d 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -218,7 +218,7 @@ var DefaultConfig = Config{ AllowUnprotectedTxs: false, Rebroadcast: true, - RebroadcastInterval: 5 * time.Second, + RebroadcastInterval: 30 * time.Second, RebroadcastMaxAge: 10 * time.Minute, RebroadcastBatchSize: 200, } @@ -583,16 +583,20 @@ func (pool *LegacyPool) identifyStuckTransactions() []*types.Transaction { if age < pool.config.RebroadcastInterval { continue } - // Skip if too old (beyond max age, stop trying) - if age > pool.config.RebroadcastMaxAge { - continue - } - // Skip if recently rebroadcast - if lastTime, ok := pool.lastRebroadcast[hash]; ok { + // Check rebroadcast history + lastTime, wasRebroadcast := pool.lastRebroadcast[hash] + if wasRebroadcast { + // Skip if recently rebroadcast if now.Sub(lastTime) < pool.config.RebroadcastInterval { continue } + // Skip if we've been rebroadcasting this tx for too long (max age applies + // only to previously rebroadcast txs - new txs that just became executable + // after a base fee drop should still be considered) + if pool.config.RebroadcastMaxAge > 0 && age > pool.config.RebroadcastMaxAge { + continue + } } // Skip if not immediately executable (gas price too low for current conditions) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 5d7c8c254f..3f14a8aa6a 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -5020,26 +5020,50 @@ func TestTxFiltering(t *testing.T) { }) } +// setupRebroadcastTest creates a pool with rebroadcast config and adds a transaction. +// Returns the pool, key, sender address, and the added transaction. +func setupRebroadcastTest(t *testing.T, interval, maxAge time.Duration, batchSize int) (*LegacyPool, *ecdsa.PrivateKey, common.Address, *types.Transaction) { + t.Helper() + config := testTxPoolConfig + config.RebroadcastInterval = interval + config.RebroadcastMaxAge = maxAge + config.RebroadcastBatchSize = batchSize + + pool, key := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) + if err := pool.addRemoteSync(tx); err != nil { + pool.Close() + t.Fatalf("failed to add transaction: %v", err) + } + + return pool, key, from, tx +} + +// setTxAge sets the time of all pending transactions for the given address. +// Must be called with pool.mu held. +func setTxAge(pool *LegacyPool, from common.Address, age time.Duration) { + if list := pool.pending[from]; list != nil { + for _, tx := range list.Flatten() { + tx.SetTime(time.Now().Add(-age)) + } + } +} + // TestIdentifyStuckTransactions tests the identifyStuckTransactions method func TestIdentifyStuckTransactions(t *testing.T) { t.Parallel() - // Create a test account - key, _ := crypto.GenerateKey() - t.Run("TooYoungTransactions", func(t *testing.T) { // Transactions younger than RebroadcastInterval should not be identified as stuck - pool, _ := setupPoolWithConfig(params.TestChainConfig) + pool, _, _, _ := setupRebroadcastTest(t, 1*time.Second, 10*time.Second, 100) defer pool.Close() - // Add a new transaction (will have current timestamp) - from := crypto.PubkeyToAddress(key.PublicKey) - testAddBalance(pool, from, big.NewInt(1000000000000000000)) - tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) - if err := pool.addRemoteSync(tx); err != nil { - t.Fatalf("failed to add transaction: %v", err) - } - + // Transaction has current timestamp, so it's too young pool.mu.Lock() stuckTxs := pool.identifyStuckTransactions() pool.mu.Unlock() @@ -5050,67 +5074,48 @@ func TestIdentifyStuckTransactions(t *testing.T) { }) t.Run("TooOldTransactions", func(t *testing.T) { - // Transactions older than RebroadcastMaxAge should not be identified as stuck - config := testTxPoolConfig - config.RebroadcastInterval = 1 * time.Second - config.RebroadcastMaxAge = 5 * time.Second - config.RebroadcastBatchSize = 100 - - pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { - pool.config = config - }) + // Transactions older than RebroadcastMaxAge that have been rebroadcast before + // should not be identified again (to prevent infinite rebroadcasting) + pool, _, from, _ := setupRebroadcastTest(t, 1*time.Second, 5*time.Second, 100) defer pool.Close() - from := crypto.PubkeyToAddress(testKey.PublicKey) - testAddBalance(pool, from, big.NewInt(1000000000000000000)) - tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) - if err := pool.addRemoteSync(tx); err != nil { - t.Fatalf("failed to add transaction: %v", err) - } - - // Simulate an old transaction by modifying the Time directly pool.mu.Lock() - if list := pool.pending[from]; list != nil { - for _, pendingTx := range list.Flatten() { - // Set time to 10 seconds ago (older than maxAge of 5s) - pendingTx.SetTime(time.Now().Add(-10 * time.Second)) - } + setTxAge(pool, from, 10*time.Second) // older than maxAge of 5s + // Mark as previously rebroadcast + for _, tx := range pool.pending[from].Flatten() { + pool.lastRebroadcast[tx.Hash()] = time.Now().Add(-6 * time.Second) } stuckTxs := pool.identifyStuckTransactions() pool.mu.Unlock() if len(stuckTxs) != 0 { - t.Errorf("expected 0 stuck transactions for old tx, got %d", len(stuckTxs)) + t.Errorf("expected 0 stuck transactions for old previously-rebroadcast tx, got %d", len(stuckTxs)) } }) - t.Run("EligibleTransactions", func(t *testing.T) { - // Transactions within the age range should be identified as stuck - config := testTxPoolConfig - config.RebroadcastInterval = 1 * time.Second - config.RebroadcastMaxAge = 10 * time.Second - config.RebroadcastBatchSize = 100 - - pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { - pool.config = config - }) + t.Run("OldTransactionsNeverRebroadcast", func(t *testing.T) { + // Old transactions that have NEVER been rebroadcast should still be identified + // (e.g., a transaction that just became executable after base fee dropped) + pool, _, from, _ := setupRebroadcastTest(t, 1*time.Second, 5*time.Second, 100) defer pool.Close() - from := crypto.PubkeyToAddress(testKey.PublicKey) - testAddBalance(pool, from, big.NewInt(1000000000000000000)) - tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) - if err := pool.addRemoteSync(tx); err != nil { - t.Fatalf("failed to add transaction: %v", err) + pool.mu.Lock() + setTxAge(pool, from, 10*time.Second) // older than maxAge, but never rebroadcast + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + if len(stuckTxs) != 1 { + t.Errorf("expected 1 stuck transaction for old never-rebroadcast tx, got %d", len(stuckTxs)) } + }) + + t.Run("EligibleTransactions", func(t *testing.T) { + // Transactions within the age range should be identified as stuck + pool, _, from, _ := setupRebroadcastTest(t, 1*time.Second, 10*time.Second, 100) + defer pool.Close() - // Simulate a moderately old transaction (within range) pool.mu.Lock() - if list := pool.pending[from]; list != nil { - for _, pendingTx := range list.Flatten() { - // Set time to 3 seconds ago (between interval and maxAge) - pendingTx.SetTime(time.Now().Add(-3 * time.Second)) - } - } + setTxAge(pool, from, 3*time.Second) // between interval and maxAge stuckTxs := pool.identifyStuckTransactions() pool.mu.Unlock() @@ -5121,41 +5126,21 @@ func TestIdentifyStuckTransactions(t *testing.T) { t.Run("RecentlyRebroadcast", func(t *testing.T) { // Transactions that were recently rebroadcast should not be identified again - config := testTxPoolConfig - config.RebroadcastInterval = 1 * time.Second - config.RebroadcastMaxAge = 10 * time.Second - config.RebroadcastBatchSize = 100 - - pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { - pool.config = config - }) + pool, _, from, _ := setupRebroadcastTest(t, 1*time.Second, 10*time.Second, 100) defer pool.Close() - from := crypto.PubkeyToAddress(testKey.PublicKey) - testAddBalance(pool, from, big.NewInt(1000000000000000000)) - tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) - if err := pool.addRemoteSync(tx); err != nil { - t.Fatalf("failed to add transaction: %v", err) - } - // First identification should find the tx pool.mu.Lock() - if list := pool.pending[from]; list != nil { - for _, pendingTx := range list.Flatten() { - pendingTx.SetTime(time.Now().Add(-3 * time.Second)) - } - } + setTxAge(pool, from, 3*time.Second) stuckTxs1 := pool.identifyStuckTransactions() - if len(stuckTxs1) != 1 { pool.mu.Unlock() t.Fatalf("expected 1 stuck transaction on first call, got %d", len(stuckTxs1)) } - // Simulate what the caller does: update lastRebroadcast for identified txs - now := time.Now() + // Mark as rebroadcast for _, tx := range stuckTxs1 { - pool.lastRebroadcast[tx.Hash()] = now + pool.lastRebroadcast[tx.Hash()] = time.Now() } pool.mu.Unlock() @@ -5176,30 +5161,25 @@ func TestIdentifyStuckTransactions(t *testing.T) { config.RebroadcastMaxAge = 10 * time.Second config.RebroadcastBatchSize = 3 - pool, testKey := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool, key := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { pool.config = config }) defer pool.Close() - from := crypto.PubkeyToAddress(testKey.PublicKey) + from := crypto.PubkeyToAddress(key.PublicKey) balance, _ := new(big.Int).SetString("100000000000000000000", 10) // 100 ETH testAddBalance(pool, from, balance) // Add 5 transactions - for i := 0; i < 5; i++ { - tx := pricedTransaction(uint64(i), 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), testKey) + for i := range 5 { + tx := pricedTransaction(uint64(i), 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) if err := pool.addRemoteSync(tx); err != nil { t.Fatalf("failed to add transaction %d: %v", i, err) } } - // Make them eligible for rebroadcast pool.mu.Lock() - if list := pool.pending[from]; list != nil { - for _, pendingTx := range list.Flatten() { - pendingTx.SetTime(time.Now().Add(-3 * time.Second)) - } - } + setTxAge(pool, from, 3*time.Second) stuckTxs := pool.identifyStuckTransactions() pool.mu.Unlock() @@ -5213,38 +5193,19 @@ func TestIdentifyStuckTransactions(t *testing.T) { func TestRebroadcastCleanup(t *testing.T) { t.Parallel() - config := testTxPoolConfig - config.RebroadcastInterval = 1 * time.Second - config.RebroadcastMaxAge = 10 * time.Second - config.RebroadcastBatchSize = 100 - - pool, key := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { - pool.config = config - }) + pool, _, from, tx := setupRebroadcastTest(t, 1*time.Second, 10*time.Second, 100) defer pool.Close() - from := crypto.PubkeyToAddress(key.PublicKey) - testAddBalance(pool, from, big.NewInt(1000000000000000000)) - tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) - if err := pool.addRemoteSync(tx); err != nil { - t.Fatalf("failed to add transaction: %v", err) - } - hash := tx.Hash() // Make it eligible and identify it pool.mu.Lock() - if list := pool.pending[from]; list != nil { - for _, pendingTx := range list.Flatten() { - pendingTx.SetTime(time.Now().Add(-3 * time.Second)) - } - } + setTxAge(pool, from, 3*time.Second) stuckTxs := pool.identifyStuckTransactions() - // Simulate what the caller does: update lastRebroadcast for identified txs - now := time.Now() + // Mark as rebroadcast for _, stuckTx := range stuckTxs { - pool.lastRebroadcast[stuckTx.Hash()] = now + pool.lastRebroadcast[stuckTx.Hash()] = time.Now() } // Verify it's tracked From aa5128706f7874b3f777a6846c4b26f4d094181b Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 16 Jan 2026 15:22:15 -0800 Subject: [PATCH 3/5] Update default rebroadcast interval to 30s --- eth/handler.go | 4 ++-- internal/cli/server/config.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 2324756443..0790bee6b3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -586,8 +586,8 @@ func (h *handler) Start(maxPeers int) { } func (h *handler) Stop() { - h.txsSub.Unsubscribe() // quits txBroadcastLoop - h.stuckTxsSub.Unsubscribe() // quits stuckTxBroadcastLoop + h.txsSub.Unsubscribe() // quits txBroadcastLoop + h.stuckTxsSub.Unsubscribe() // quits stuckTxBroadcastLoop h.minedBlockSub.Unsubscribe() h.blockRange.stop() diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 4891d4cac2..3a448260bf 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -798,7 +798,7 @@ func DefaultConfig() *Config { GlobalQueue: 131072, LifeTime: 3 * time.Hour, Rebroadcast: true, - RebroadcastInterval: 5 * time.Second, + RebroadcastInterval: 30 * time.Second, RebroadcastMaxAge: 10 * time.Minute, RebroadcastBatchSize: 200, }, From ce34b3d0c1b3ec8131b52f0013386f52db44b216 Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 16 Jan 2026 19:00:30 -0800 Subject: [PATCH 4/5] add tests --- core/txpool/blobpool/blobpool_test.go | 46 +++++ core/txpool/legacypool/legacypool_test.go | 203 ++++++++++++++++++++++ core/txpool/txpool_test.go | 33 ++++ eth/handler_test.go | 64 ++++++- eth/peerset_test.go | 63 +++++++ eth/protocols/eth/peer_test.go | 78 +++++++++ 6 files changed, 480 insertions(+), 7 deletions(-) create mode 100644 core/txpool/txpool_test.go create mode 100644 eth/peerset_test.go diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 984c27c600..c9e9d61004 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -1783,3 +1783,49 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { } } } + +// TestSubscribeRebroadcastTransactions tests that the blob pool returns a no-op +// subscription for rebroadcast events since blobs are not rebroadcast. +func TestSubscribeRebroadcastTransactions(t *testing.T) { + // Create a temporary directory for the pool data + storage := t.TempDir() + + // Create a minimal state database + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + statedb.Commit(0, true, false) + + // Create a test blockchain with minimal config + chain := &testBlockChain{ + config: testChainConfig, + basefee: uint256.NewInt(params.InitialBaseFee), + blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice), + statedb: statedb, + } + + // Create the blob pool + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { + t.Fatalf("failed to create blob pool: %v", err) + } + defer pool.Close() + + // Create a channel and subscribe to rebroadcast events + ch := make(chan core.StuckTxsEvent, 1) + sub := pool.SubscribeRebroadcastTransactions(ch) + + // Verify the subscription is valid + if sub == nil { + t.Fatal("expected non-nil subscription") + } + + // Unsubscribe should work without issues + sub.Unsubscribe() + + // Channel should be empty (no events should be sent) + select { + case event := <-ch: + t.Fatalf("unexpected event: %v", event) + default: + // Expected - no events + } +} diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 3f14a8aa6a..9472a8f1b2 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -5242,3 +5242,206 @@ func TestSubscribeRebroadcastTransactions(t *testing.T) { t.Error("subscription should not be nil") } } + +// TestIdentifyStuckTransactionsWithBaseFee tests rebroadcast with EIP-1559 base fee filtering +func TestIdentifyStuckTransactionsWithBaseFee(t *testing.T) { + t.Parallel() + + t.Run("NilHead", func(t *testing.T) { + // When currentHead is nil, should return nil + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, key := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + pool.mu.Lock() + setTxAge(pool, from, 3*time.Second) + // Set currentHead to nil + pool.currentHead.Store(nil) + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + // Should return nil when head is nil + if stuckTxs != nil { + t.Errorf("expected nil for nil head, got %d transactions", len(stuckTxs)) + } + }) + + t.Run("LowGasFeeCap", func(t *testing.T) { + // Transactions with gas fee cap below base fee should not be identified + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, key := setupPoolWithConfig(eip1559Config, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + + // Add a dynamic fee tx with low gas fee cap + tx := dynamicFeeTx(0, 100000, big.NewInt(100), big.NewInt(1), key) // fee cap = 100 wei + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + // Set the current head to have a high base fee to trigger filtering + pool.mu.Lock() + setTxAge(pool, from, 3*time.Second) + // Inject a header with BaseFee higher than the tx's gas fee cap + pool.currentHead.Store(&types.Header{ + Number: big.NewInt(1), + BaseFee: big.NewInt(1000000000), // 1 gwei, higher than tx's 100 wei fee cap + }) + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + // Should NOT be identified because gas fee cap is below base fee + if len(stuckTxs) != 0 { + t.Errorf("expected 0 stuck transactions for low fee cap tx, got %d", len(stuckTxs)) + } + }) + + t.Run("LowGasTip", func(t *testing.T) { + // Transactions with gas tip below minimum should not be identified + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, key := setupPoolWithConfig(eip1559Config, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + + // Add a dynamic fee tx with low tip (tip = 1 wei) + tx := dynamicFeeTx(0, 100000, big.NewInt(10000000000), big.NewInt(1), key) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + pool.mu.Lock() + setTxAge(pool, from, 3*time.Second) + // Directly set a high gasTip without calling SetGasTip (which would remove the tx) + pool.gasTip.Store(uint256.NewInt(1000000000)) // 1 gwei, higher than tx's 1 wei tip + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + // Should NOT be identified because gas tip is below minimum + if len(stuckTxs) != 0 { + t.Errorf("expected 0 stuck transactions for low tip tx, got %d", len(stuckTxs)) + } + }) + + t.Run("ValidEIP1559Transaction", func(t *testing.T) { + // Valid EIP-1559 transactions with sufficient fee cap and tip should be identified + config := testTxPoolConfig + config.RebroadcastInterval = 1 * time.Second + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, key := setupPoolWithConfig(eip1559Config, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + + // Add a dynamic fee tx with sufficient fee cap and tip + tx := dynamicFeeTx(0, 100000, big.NewInt(10000000000), big.NewInt(1000000000), key) // 10 gwei fee cap, 1 gwei tip + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + pool.mu.Lock() + setTxAge(pool, from, 3*time.Second) + // Inject a header with BaseFee lower than the tx's gas fee cap + pool.currentHead.Store(&types.Header{ + Number: big.NewInt(1), + BaseFee: big.NewInt(1000000000), // 1 gwei, lower than tx's 10 gwei fee cap + }) + stuckTxs := pool.identifyStuckTransactions() + pool.mu.Unlock() + + // Should be identified because fee cap and tip are sufficient + if len(stuckTxs) != 1 { + t.Errorf("expected 1 stuck transaction for valid EIP-1559 tx, got %d", len(stuckTxs)) + } + }) +} + +// TestRebroadcastLoopIntegration tests the actual rebroadcast loop sending events +func TestRebroadcastLoopIntegration(t *testing.T) { + t.Parallel() + + config := testTxPoolConfig + config.RebroadcastInterval = 100 * time.Millisecond // Short interval for testing + config.RebroadcastMaxAge = 10 * time.Second + config.RebroadcastBatchSize = 100 + + pool, key := setupPoolWithConfig(params.TestChainConfig, func(pool *LegacyPool) { + pool.config = config + }) + defer pool.Close() + + // Subscribe to rebroadcast events + ch := make(chan core.StuckTxsEvent, 10) + sub := pool.SubscribeRebroadcastTransactions(ch) + defer sub.Unsubscribe() + + from := crypto.PubkeyToAddress(key.PublicKey) + testAddBalance(pool, from, big.NewInt(1000000000000000000)) + + // Add a transaction + tx := pricedTransaction(0, 100000, big.NewInt(params.BorDefaultTxPoolPriceLimit), key) + if err := pool.addRemoteSync(tx); err != nil { + t.Fatalf("failed to add transaction: %v", err) + } + + // Manually set the transaction time to make it eligible for rebroadcast + pool.mu.Lock() + setTxAge(pool, from, 200*time.Millisecond) + pool.mu.Unlock() + + // Wait for the rebroadcast loop to trigger + select { + case event := <-ch: + if len(event.Txs) != 1 { + t.Errorf("expected 1 transaction in rebroadcast event, got %d", len(event.Txs)) + } + if event.Txs[0].Hash() != tx.Hash() { + t.Errorf("wrong transaction hash in rebroadcast event") + } + case <-time.After(500 * time.Millisecond): + t.Error("timeout waiting for rebroadcast event") + } + + // Verify lastRebroadcast was updated + pool.mu.RLock() + _, tracked := pool.lastRebroadcast[tx.Hash()] + pool.mu.RUnlock() + + if !tracked { + t.Error("transaction should be tracked in lastRebroadcast after rebroadcast") + } +} diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go new file mode 100644 index 0000000000..354365ccee --- /dev/null +++ b/core/txpool/txpool_test.go @@ -0,0 +1,33 @@ +package txpool + +import ( + "testing" + + "github.com/ethereum/go-ethereum/core" +) + +// TestSubscribeRebroadcastTransactionsNilPool tests that calling +// SubscribeRebroadcastTransactions on a nil TxPool returns a valid no-op +// subscription. +func TestSubscribeRebroadcastTransactionsNilPool(t *testing.T) { + var pool *TxPool // nil pool + + ch := make(chan core.StuckTxsEvent, 1) + sub := pool.SubscribeRebroadcastTransactions(ch) + + // Verify the subscription is valid even for nil pool + if sub == nil { + t.Fatal("expected non-nil subscription") + } + + // Unsubscribe should work without issues + sub.Unsubscribe() + + // Channel should be empty (no events should be sent) + select { + case event := <-ch: + t.Fatalf("unexpected event: %v", event) + default: + // Expected - no events + } +} diff --git a/eth/handler_test.go b/eth/handler_test.go index f2385db920..20bde0a185 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -21,6 +21,8 @@ import ( "sort" "sync" "sync/atomic" + "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -51,8 +53,9 @@ var ( type testTxPool struct { pool map[common.Hash]*types.Transaction // Hash map of collected transactions - txFeed event.Feed // Notification feed to allow waiting for inclusion - lock sync.RWMutex // Protects the transaction pool + txFeed event.Feed // Notification feed to allow waiting for inclusion + rebroadcastFeed event.Feed // Notification feed for stuck tx rebroadcast + lock sync.RWMutex // Protects the transaction pool } // newTestTxPool creates a mock transaction pool. @@ -159,12 +162,14 @@ func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bo return p.txFeed.Subscribe(ch) } -// SubscribeRebroadcastTransactions returns a no-op subscription for tests. +// SubscribeRebroadcastTransactions returns a subscription to the rebroadcast feed. func (p *testTxPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxsEvent) event.Subscription { - return event.NewSubscription(func(quit <-chan struct{}) error { - <-quit - return nil - }) + return p.rebroadcastFeed.Subscribe(ch) +} + +// SendStuckTxs sends stuck transactions to the rebroadcast feed for testing. +func (p *testTxPool) SendStuckTxs(txs []*types.Transaction) { + p.rebroadcastFeed.Send(core.StuckTxsEvent{Txs: txs}) } // testHandler is a live implementation of the Ethereum protocol handler, just @@ -223,3 +228,48 @@ func (b *testHandler) close() { b.handler.Stop() b.chain.Stop() } + +func TestStuckTxBroadcastLoop(t *testing.T) { + t.Parallel() + + handler := newTestHandler() + defer handler.close() + + // Mark handler as synced so it processes stuck transactions + handler.handler.synced.Store(true) + + // Create a test transaction + tx := types.NewTransaction(0, testAddr, big.NewInt(100), 21000, big.NewInt(1000000000), nil) + signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testKey) + if err != nil { + t.Fatalf("failed to sign tx: %v", err) + } + + // Send stuck transaction event + handler.txpool.SendStuckTxs([]*types.Transaction{signedTx}) + + // Give the loop time to process + time.Sleep(100 * time.Millisecond) +} + +func TestStuckTxBroadcastLoopNotSynced(t *testing.T) { + t.Parallel() + + handler := newTestHandler() + defer handler.close() + + // Handler is not synced by default (synced.Load() == false) + + // Create a test transaction + tx := types.NewTransaction(0, testAddr, big.NewInt(100), 21000, big.NewInt(1000000000), nil) + signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testKey) + if err != nil { + t.Fatalf("failed to sign tx: %v", err) + } + + // Send stuck transaction event - should be ignored since not synced + handler.txpool.SendStuckTxs([]*types.Transaction{signedTx}) + + // Give the loop time to process (or ignore) + time.Sleep(100 * time.Millisecond) +} diff --git a/eth/peerset_test.go b/eth/peerset_test.go new file mode 100644 index 0000000000..16d4d9dc1b --- /dev/null +++ b/eth/peerset_test.go @@ -0,0 +1,63 @@ +package eth + +import ( + "crypto/rand" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +func TestPeerSetForgetTransactions(t *testing.T) { + t.Parallel() + + ps := newPeerSet() + defer ps.close() + + // Create multiple test peers + apps := make([]*p2p.MsgPipeRW, 3) + + for i := 0; i < 3; i++ { + app, net := p2p.MsgPipe() + apps[i] = app + + var id enode.ID + rand.Read(id[:]) + + peer := eth.NewPeer(eth.ETH68, p2p.NewPeer(id, "test", nil), net, nil) + + // Register the peer + if err := ps.registerPeer(peer, nil, nil); err != nil { + t.Fatalf("failed to register peer %d: %v", i, err) + } + } + + // Clean up + defer func() { + for _, app := range apps { + app.Close() + } + }() + + // Verify we have 3 peers + if ps.len() != 3 { + t.Fatalf("expected 3 peers, got %d", ps.len()) + } + + // ForgetTransactions should not panic with registered peers + // (the actual forgetting logic is tested in eth/protocols/eth/peer_test.go) + hashes := []common.Hash{{1}, {2}, {3}} + ps.ForgetTransactions(hashes) +} + +func TestPeerSetForgetTransactionsEmpty(t *testing.T) { + t.Parallel() + + ps := newPeerSet() + defer ps.close() + + // ForgetTransactions should not panic with no peers + ps.ForgetTransactions([]common.Hash{{1}, {2}, {3}}) +} diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index 9d02f29a4d..b8c9fb2d4d 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -92,3 +92,81 @@ func TestPeerSet(t *testing.T) { t.Fatalf("bad size") } } + +func TestKnownCacheRemove(t *testing.T) { + size := 10 + s := newKnownCache(size) + + // Add some items + hashes := make([]common.Hash, 5) + for i := 0; i < 5; i++ { + hashes[i] = common.Hash{byte(i)} + s.Add(hashes[i]) + } + + if s.Cardinality() != 5 { + t.Fatalf("wrong size after add, expected 5 but found %d", s.Cardinality()) + } + + // Remove some items + s.Remove(hashes[0], hashes[2], hashes[4]) + + if s.Cardinality() != 2 { + t.Fatalf("wrong size after remove, expected 2 but found %d", s.Cardinality()) + } + + // Verify the correct items were removed + if s.Contains(hashes[0]) { + t.Error("hash[0] should have been removed") + } + if !s.Contains(hashes[1]) { + t.Error("hash[1] should still be present") + } + if s.Contains(hashes[2]) { + t.Error("hash[2] should have been removed") + } + if !s.Contains(hashes[3]) { + t.Error("hash[3] should still be present") + } + if s.Contains(hashes[4]) { + t.Error("hash[4] should have been removed") + } +} + +func TestPeerForgetTransactions(t *testing.T) { + // Create a peer with a known tx cache + app, _ := p2p.MsgPipe() + defer app.Close() + + var id enode.ID + rand.Read(id[:]) + + peer := NewPeer(ETH68, p2p.NewPeer(id, "test", nil), app, nil) + defer peer.Close() + + // Add some transaction hashes to the known set + hashes := make([]common.Hash, 5) + for i := 0; i < 5; i++ { + hashes[i] = common.Hash{byte(i + 100)} + peer.knownTxs.Add(hashes[i]) + } + + if peer.knownTxs.Cardinality() != 5 { + t.Fatalf("wrong size after add, expected 5 but found %d", peer.knownTxs.Cardinality()) + } + + // Forget some transactions + peer.ForgetTransactions([]common.Hash{hashes[0], hashes[2], hashes[4]}) + + if peer.knownTxs.Cardinality() != 2 { + t.Fatalf("wrong size after forget, expected 2 but found %d", peer.knownTxs.Cardinality()) + } + + // Verify the transactions were forgotten + if peer.knownTxs.Contains(hashes[0]) { + t.Error("hash[0] should have been forgotten") + } + if !peer.knownTxs.Contains(hashes[1]) { + t.Error("hash[1] should still be known") + } +} From 9ff33db5970854866da829a78f57dc6138335b8d Mon Sep 17 00:00:00 2001 From: Jerry Date: Tue, 20 Jan 2026 12:53:01 -0800 Subject: [PATCH 5/5] Address comments --- core/txpool/legacypool/legacypool.go | 1 - core/txpool/legacypool/legacypool_test.go | 4 ++-- internal/cli/server/config.go | 6 +++--- internal/cli/server/flags.go | 4 ++-- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index e55bcfca4d..e938ee7035 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -556,7 +556,6 @@ func (pool *LegacyPool) SubscribeRebroadcastTransactions(ch chan<- core.StuckTxs // - Have not been rebroadcast recently (within RebroadcastInterval) // - Are immediately executable (gas price meets current requirements) // -// identifyStuckTransactions finds pending transactions that may be stuck and need rebroadcast. // Must be called with pool.mu.RLock held (read lock only - does not modify pool state). func (pool *LegacyPool) identifyStuckTransactions() []*types.Transaction { now := time.Now() diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 9472a8f1b2..01ae93d01f 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -5064,9 +5064,9 @@ func TestIdentifyStuckTransactions(t *testing.T) { defer pool.Close() // Transaction has current timestamp, so it's too young - pool.mu.Lock() + pool.mu.RLock() stuckTxs := pool.identifyStuckTransactions() - pool.mu.Unlock() + pool.mu.RUnlock() if len(stuckTxs) != 0 { t.Errorf("expected 0 stuck transactions for young tx, got %d", len(stuckTxs)) diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 3a448260bf..a688a37805 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -375,10 +375,10 @@ type TxPoolConfig struct { // RebroadcastMaxAge is the maximum age for rebroadcast eligibility RebroadcastMaxAge time.Duration `hcl:"-,optional" toml:"-"` - RebroadcastMaxAgeRaw string `hcl:"rebroadcastmaxage,optional" toml:"rebroadcastmaxage,optional"` + RebroadcastMaxAgeRaw string `hcl:"rebroadcast-max-age,optional" toml:"rebroadcast-max-age,optional"` // RebroadcastBatchSize is the maximum number of transactions per rebroadcast cycle - RebroadcastBatchSize int `hcl:"rebroadcastbatch,optional" toml:"rebroadcastbatch,optional"` + RebroadcastBatchSize int `hcl:"rebroadcast-batch-size,optional" toml:"rebroadcast-batch-size,optional"` } type SealerConfig struct { @@ -1021,7 +1021,7 @@ func (c *Config) fillTimeDurations() error { {"txpool.lifetime", &c.TxPool.LifeTime, &c.TxPool.LifeTimeRaw}, {"txpool.rejournal", &c.TxPool.Rejournal, &c.TxPool.RejournalRaw}, {"txpool.rebroadcast-interval", &c.TxPool.RebroadcastInterval, &c.TxPool.RebroadcastIntervalRaw}, - {"txpool.rebroadcastmaxage", &c.TxPool.RebroadcastMaxAge, &c.TxPool.RebroadcastMaxAgeRaw}, + {"txpool.rebroadcast-max-age", &c.TxPool.RebroadcastMaxAge, &c.TxPool.RebroadcastMaxAgeRaw}, {"cache.timeout", &c.Cache.TrieTimeout, &c.Cache.TrieTimeoutRaw}, {"p2p.txarrivalwait", &c.P2P.TxArrivalWait, &c.P2P.TxArrivalWaitRaw}, } diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index ccc821988b..73f38fd90d 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -328,14 +328,14 @@ func (c *Command) Flags(config *Config) *flagset.Flagset { Group: "Transaction Pool", }) f.DurationFlag(&flagset.DurationFlag{ - Name: "txpool.rebroadcastmaxage", + Name: "txpool.rebroadcast-max-age", Usage: "Maximum age for a transaction to be eligible for rebroadcast", Value: &c.cliConfig.TxPool.RebroadcastMaxAge, Default: c.cliConfig.TxPool.RebroadcastMaxAge, Group: "Transaction Pool", }) f.IntFlag(&flagset.IntFlag{ - Name: "txpool.rebroadcastbatch", + Name: "txpool.rebroadcast-batch-size", Usage: "Maximum number of transactions to rebroadcast per cycle", Value: &c.cliConfig.TxPool.RebroadcastBatchSize, Default: c.cliConfig.TxPool.RebroadcastBatchSize,