diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 5f8dd4fac889..44cca09d31d8 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -559,19 +559,98 @@ func (pool *LegacyPool) ValidateTxBasics(tx *types.Transaction) error { return txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts) } +// batchTxValidationState tracks the queue cost baseline for a batch addition so +// later transactions can account for the net queue growth caused by earlier ones. +// It also tracks queued nonces introduced by the current batch so replacement +// cost lookups only see queue state created within that batch. +type batchTxValidationState struct { + queueCostSnapshot map[common.Address]*big.Int + batchQueuedNonces map[common.Address]map[uint64]struct{} +} + +func newBatchTxValidationState() *batchTxValidationState { + return &batchTxValidationState{ + queueCostSnapshot: make(map[common.Address]*big.Int), + batchQueuedNonces: make(map[common.Address]map[uint64]struct{}), + } +} + +func (batch *batchTxValidationState) queueSnapshot(pool *LegacyPool, addr common.Address) *big.Int { + if batch == nil { + return new(big.Int) + } + if snap, ok := batch.queueCostSnapshot[addr]; ok { + return snap + } + snap := new(big.Int) + if list, _ := pool.queue.get(addr); list != nil { + snap = list.totalcost.ToBig() + } + batch.queueCostSnapshot[addr] = snap + return snap +} + +func (batch *batchTxValidationState) queueCostDelta(pool *LegacyPool, addr common.Address) *big.Int { + if batch == nil { + return new(big.Int) + } + current := new(big.Int) + if list, _ := pool.queue.get(addr); list != nil { + current = list.totalcost.ToBig() + } + snapshot := batch.queueSnapshot(pool, addr) + + // Clamp negative deltas so queued cost reductions can't credit new batch additions. + if current.Cmp(snapshot) < 0 { + return new(big.Int) + } + return current.Sub(current, snapshot) +} + +func (batch *batchTxValidationState) trackQueuedNonce(addr common.Address, nonce uint64) { + if batch == nil { + return + } + if batch.batchQueuedNonces[addr] == nil { + batch.batchQueuedNonces[addr] = make(map[uint64]struct{}) + } + batch.batchQueuedNonces[addr][nonce] = struct{}{} +} + +func (batch *batchTxValidationState) batchQueuedCost(pool *LegacyPool, addr common.Address, nonce uint64) *big.Int { + if batch == nil { + return nil + } + nonces := batch.batchQueuedNonces[addr] + if _, ok := nonces[nonce]; !ok { + return nil + } + if list, ok := pool.queue.get(addr); ok { + if tx := list.txs.Get(nonce); tx != nil { + return tx.Cost() + } + } + return nil +} + // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (pool *LegacyPool) validateTx(tx *types.Transaction) error { + return pool.validateTxWithBatch(tx, nil) +} + +func (pool *LegacyPool) validateTxWithBatch(tx *types.Transaction, batch *batchTxValidationState) error { opts := &txpool.ValidationOptionsWithState{ State: pool.currentState, FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps UsedAndLeftSlots: nil, // Pool has own mechanism to limit the number of transactions ExistingExpenditure: func(addr common.Address) *big.Int { + spent := new(big.Int) if list := pool.pending[addr]; list != nil { - return list.totalcost.ToBig() + spent = list.totalcost.ToBig() } - return new(big.Int) + return spent.Add(spent, batch.queueCostDelta(pool, addr)) }, ExistingCost: func(addr common.Address, nonce uint64) *big.Int { if list := pool.pending[addr]; list != nil { @@ -579,6 +658,9 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error { return tx.Cost() } } + if cost := batch.batchQueuedCost(pool, addr, nonce); cost != nil { + return cost + } return nil }, } @@ -656,6 +738,10 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error { // pending promotion and execution. If the transaction is a replacement for an already // pending or queued one, it overwrites the previous transaction if its price is higher. func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { + return pool.addWithBatch(tx, nil) +} + +func (pool *LegacyPool) addWithBatch(tx *types.Transaction, batch *batchTxValidationState) (replaced bool, err error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { @@ -665,7 +751,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { } // If the transaction fails basic validation, discard it - if err := pool.validateTx(tx); err != nil { + if err := pool.validateTxWithBatch(tx, batch); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxMeter.Mark(1) return false, err @@ -784,6 +870,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { if err != nil { return false, err } + batch.trackQueuedNonce(from, tx.Nonce()) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replaced, nil @@ -958,10 +1045,11 @@ func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accoun var ( dirty = newAccountSet(pool.signer) errs = make([]error, len(txs)) + batch = newBatchTxValidationState() valid int64 ) for i, tx := range txs { - replaced, err := pool.add(tx) + replaced, err := pool.addWithBatch(tx, batch) errs[i] = err if err == nil { if !replaced { diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index fb994d82086d..01d21ff5019c 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -395,7 +395,11 @@ func TestStateChangeDuringReset(t *testing.T) { func testAddBalance(pool *LegacyPool, addr common.Address, amount *big.Int) { pool.mu.Lock() - pool.currentState.AddBalance(addr, uint256.MustFromBig(amount), tracing.BalanceChangeUnspecified) + if amount.Sign() < 0 { + pool.currentState.SubBalance(addr, uint256.MustFromBig(new(big.Int).Neg(amount)), tracing.BalanceChangeUnspecified) + } else { + pool.currentState.AddBalance(addr, uint256.MustFromBig(amount), tracing.BalanceChangeUnspecified) + } pool.mu.Unlock() } @@ -797,12 +801,13 @@ func TestPostponing(t *testing.T) { for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() accs[i] = crypto.PubkeyToAddress(keys[i].PublicKey) - - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(50100)) } - // Add a batch consecutive pending transactions for validation + // Build a batch of consecutive pending transactions and fund each account + // for the full batch so strict balance checks still admit the entire set. txs := []*types.Transaction{} + initialBalances := make([]*big.Int, len(keys)) for i, key := range keys { + initialBalances[i] = new(big.Int) for j := 0; j < 100; j++ { var tx *types.Transaction if (i+j)%2 == 0 { @@ -811,8 +816,12 @@ func TestPostponing(t *testing.T) { tx = transaction(uint64(j), 50000, key) } txs = append(txs, tx) + initialBalances[i].Add(initialBalances[i], tx.Cost()) } } + for i, addr := range accs { + testAddBalance(pool, addr, initialBalances[i]) + } for i, err := range pool.addRemotesSync(txs) { if err != nil { t.Fatalf("tx %d: failed to add transactions: %v", i, err) @@ -838,9 +847,11 @@ func TestPostponing(t *testing.T) { if pool.all.Count() != len(txs) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) } - // Reduce the balance of the account, and check that transactions are reorganised - for _, addr := range accs { - testAddBalance(pool, addr, big.NewInt(-1)) + // Reduce the balance to just below the expensive tx cost so those txs are + // dropped while the cheaper ones are postponed into the queue. + reducedBalance := new(big.Int).Sub(txs[1].Cost(), big.NewInt(1)) + for i, addr := range accs { + testAddBalance(pool, addr, new(big.Int).Sub(reducedBalance, initialBalances[i])) } <-pool.requestReset(nil, nil) @@ -1352,7 +1363,7 @@ func TestPendingMinimumAllowance(t *testing.T) { keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(10000000)) } // Generate and queue a batch of transactions nonces := make(map[common.Address]uint64) @@ -1745,7 +1756,7 @@ func TestStableUnderpricing(t *testing.T) { keys := make([]*ecdsa.PrivateKey, 2) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000000)) } // Fill up the entire queue with the same transaction price points txs := types.Transactions{}