Skip to content

Commit c63ea2c

Browse files
Matt Acciaimattac21
authored andcommitted
fix race condition between RemoveTx and runReorg loop
make public RemoveTx be thread safe by acquiring the pool lock and private removeTx does not
1 parent db40651 commit c63ea2c

File tree

3 files changed

+79
-6
lines changed

3 files changed

+79
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- [\#591](https://github.com/cosmos/evm/pull/591) CheckTxHandler should handle "invalid nonce" tx
1717
- [\#643](https://github.com/cosmos/evm/pull/643) Support for mnemonic source (file, stdin,etc) flag in key add command.
1818
- [\#645](https://github.com/cosmos/evm/pull/645) Align precise bank keeper for correct decimal conversion in evmd.
19+
* [\#658](https://github.com/cosmos/evm/pull/658) Fix race condition between legacypool's RemoveTx and runReorg.
1920

2021
### IMPROVEMENTS
2122

mempool/txpool/legacypool/legacypool.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func (pool *LegacyPool) loop() {
377377
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
378378
list := pool.queue[addr].Flatten()
379379
for _, tx := range list {
380-
pool.RemoveTx(tx.Hash(), true, true)
380+
pool.removeTx(tx.Hash(), true, true)
381381
}
382382
queuedEvictionMeter.Mark(int64(len(list)))
383383
}
@@ -430,7 +430,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) {
430430
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
431431
drop := pool.all.TxsBelowTip(tip)
432432
for _, tx := range drop {
433-
pool.RemoveTx(tx.Hash(), false, true)
433+
pool.removeTx(tx.Hash(), false, true)
434434
}
435435
pool.priced.Removed(len(drop))
436436
}
@@ -770,7 +770,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
770770
underpricedTxMeter.Mark(1)
771771

772772
sender, _ := types.Sender(pool.signer, tx)
773-
dropped := pool.RemoveTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc
773+
dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc
774774

775775
pool.changesSinceReorg += dropped
776776
}
@@ -1087,6 +1087,23 @@ func (pool *LegacyPool) Has(hash common.Hash) bool {
10871087
//
10881088
// Returns the number of transactions removed from the pending queue.
10891089
func (pool *LegacyPool) RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int {
1090+
pool.mu.Lock()
1091+
defer pool.mu.Unlock()
1092+
return pool.removeTx(hash, outofbound, unreserve)
1093+
}
1094+
1095+
// removeTx removes a single transaction from the queue, moving all subsequent
1096+
// transactions back to the future queue.
1097+
//
1098+
// If unreserve is false, the account will not be relinquished to the main txpool
1099+
// even if there are no more references to it. This is used to handle a race when
1100+
// a tx being added, and it evicts a previously scheduled tx from the same account,
1101+
// which could lead to a premature release of the lock.
1102+
//
1103+
// Returns the number of transactions removed from the pending queue.
1104+
//
1105+
// The transaction pool lock must be held.
1106+
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int {
10901107
// Fetch the transaction we wish to delete
10911108
tx := pool.all.Get(hash)
10921109
if tx == nil {
@@ -1533,7 +1550,7 @@ func (pool *LegacyPool) truncateQueue() {
15331550
// Drop all transactions if they are less than the overflow
15341551
if size := uint64(list.Len()); size <= drop {
15351552
for _, tx := range list.Flatten() {
1536-
pool.RemoveTx(tx.Hash(), true, true)
1553+
pool.removeTx(tx.Hash(), true, true)
15371554
}
15381555
drop -= size
15391556
queuedRateLimitMeter.Mark(int64(size))
@@ -1542,7 +1559,7 @@ func (pool *LegacyPool) truncateQueue() {
15421559
// Otherwise drop only last few transactions
15431560
txs := list.Flatten()
15441561
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
1545-
pool.RemoveTx(txs[i].Hash(), true, true)
1562+
pool.removeTx(txs[i].Hash(), true, true)
15461563
drop--
15471564
queuedRateLimitMeter.Mark(1)
15481565
}

mempool/txpool/legacypool/legacypool_test.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ func TestChainFork(t *testing.T) {
543543
if _, err := pool.add(tx); err != nil {
544544
t.Error("didn't expect error", err)
545545
}
546-
pool.RemoveTx(tx.Hash(), true, true)
546+
pool.removeTx(tx.Hash(), true, true)
547547

548548
// reset the pool's internal state
549549
resetState()
@@ -2591,6 +2591,61 @@ func TestSetCodeTransactionsReorg(t *testing.T) {
25912591
}
25922592
}
25932593

2594+
// TestRemoveTxTruncatePoolRace is a regression test for a race condition
2595+
// between removing txs and runReorg loop. Run this with the -race flag to
2596+
// ensure that there is no race condition between the two functions.
2597+
func TestRemoveTxTruncatePoolRace(t *testing.T) {
2598+
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
2599+
blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed))
2600+
2601+
pool := New(testTxPoolConfig, blockchain)
2602+
err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
2603+
if err != nil {
2604+
t.Fatalf("failed to init pool: %v", err)
2605+
}
2606+
2607+
// fill the pool with txs
2608+
fillPool(t, pool)
2609+
2610+
// make a copy of all hashes in the pool so that we do not have to iterate
2611+
// over pending and queue while we call RemoveTx, potentially triggering
2612+
// the race condition ourselves
2613+
var hashes []common.Hash
2614+
for _, txs := range pool.pending {
2615+
for _, tx := range txs.Flatten() {
2616+
hashes = append(hashes, tx.Hash())
2617+
}
2618+
}
2619+
for _, txs := range pool.queue {
2620+
for _, tx := range txs.Flatten() {
2621+
hashes = append(hashes, tx.Hash())
2622+
}
2623+
}
2624+
2625+
var wg sync.WaitGroup
2626+
2627+
// manually trigger the reorg loop to run (5 times just to ensure that we
2628+
// will trigger the race condition)
2629+
wg.Add(1)
2630+
go func() {
2631+
defer wg.Done()
2632+
for range 5 {
2633+
pool.runReorg(make(chan struct{}), nil, nil, nil)
2634+
}
2635+
}()
2636+
2637+
// call RemoveTx on every tx in the pool
2638+
wg.Add(1)
2639+
go func() {
2640+
defer wg.Done()
2641+
for _, hash := range hashes {
2642+
_ = pool.RemoveTx(hash, false, true)
2643+
}
2644+
}()
2645+
2646+
wg.Wait()
2647+
}
2648+
25942649
// Benchmarks the speed of validating the contents of the pending queue of the
25952650
// transaction pool.
25962651
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }

0 commit comments

Comments
 (0)