Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,26 +559,108 @@ func (pool *LegacyPool) ValidateTxBasics(tx *types.Transaction) error {
return txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts)
}

// batchTxValidationState tracks the queue cost baseline for a batch addition so
// later transactions can account for the net queue growth caused by earlier ones.
// It also tracks queued nonces introduced by the current batch so replacement
// cost lookups only see queue state created within that batch.
type batchTxValidationState struct {
queueCostSnapshot map[common.Address]*big.Int
batchQueuedNonces map[common.Address]map[uint64]struct{}
}

func newBatchTxValidationState() *batchTxValidationState {
return &batchTxValidationState{
queueCostSnapshot: make(map[common.Address]*big.Int),
batchQueuedNonces: make(map[common.Address]map[uint64]struct{}),
}
}

func (batch *batchTxValidationState) queueSnapshot(pool *LegacyPool, addr common.Address) *big.Int {
if batch == nil {
return new(big.Int)
}
if snap, ok := batch.queueCostSnapshot[addr]; ok {
return snap
}
snap := new(big.Int)
if list, _ := pool.queue.get(addr); list != nil {
snap = list.totalcost.ToBig()
}
batch.queueCostSnapshot[addr] = snap
return snap
}

func (batch *batchTxValidationState) queueCostDelta(pool *LegacyPool, addr common.Address) *big.Int {
if batch == nil {
return new(big.Int)
}
current := new(big.Int)
if list, _ := pool.queue.get(addr); list != nil {
current = list.totalcost.ToBig()
}
snapshot := batch.queueSnapshot(pool, addr)

// Clamp negative deltas so queued cost reductions can't credit new batch additions.
if current.Cmp(snapshot) < 0 {
return new(big.Int)
}
return current.Sub(current, snapshot)
}

func (batch *batchTxValidationState) trackQueuedNonce(addr common.Address, nonce uint64) {
if batch == nil {
return
}
if batch.batchQueuedNonces[addr] == nil {
batch.batchQueuedNonces[addr] = make(map[uint64]struct{})
}
batch.batchQueuedNonces[addr][nonce] = struct{}{}
}

func (batch *batchTxValidationState) batchQueuedCost(pool *LegacyPool, addr common.Address, nonce uint64) *big.Int {
if batch == nil {
return nil
}
nonces := batch.batchQueuedNonces[addr]
if _, ok := nonces[nonce]; !ok {
return nil
}
if list, ok := pool.queue.get(addr); ok {
if tx := list.txs.Get(nonce); tx != nil {
return tx.Cost()
}
}
return nil
}

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
return pool.validateTxWithBatch(tx, nil)
}

func (pool *LegacyPool) validateTxWithBatch(tx *types.Transaction, batch *batchTxValidationState) error {
opts := &txpool.ValidationOptionsWithState{
State: pool.currentState,

FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
UsedAndLeftSlots: nil, // Pool has own mechanism to limit the number of transactions
ExistingExpenditure: func(addr common.Address) *big.Int {
spent := new(big.Int)
if list := pool.pending[addr]; list != nil {
return list.totalcost.ToBig()
spent = list.totalcost.ToBig()
}
return new(big.Int)
return spent.Add(spent, batch.queueCostDelta(pool, addr))
},
ExistingCost: func(addr common.Address, nonce uint64) *big.Int {
if list := pool.pending[addr]; list != nil {
if tx := list.txs.Get(nonce); tx != nil {
return tx.Cost()
}
}
if cost := batch.batchQueuedCost(pool, addr, nonce); cost != nil {
return cost
}
return nil
},
}
Expand Down Expand Up @@ -656,6 +738,10 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
// pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher.
func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
return pool.addWithBatch(tx, nil)
}

func (pool *LegacyPool) addWithBatch(tx *types.Transaction, batch *batchTxValidationState) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
Expand All @@ -665,7 +751,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
}

// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx); err != nil {
if err := pool.validateTxWithBatch(tx, batch); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxMeter.Mark(1)
return false, err
Expand Down Expand Up @@ -784,6 +870,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
if err != nil {
return false, err
}
batch.trackQueuedNonce(from, tx.Nonce())

log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
Expand Down Expand Up @@ -958,10 +1045,11 @@ func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accoun
var (
dirty = newAccountSet(pool.signer)
errs = make([]error, len(txs))
batch = newBatchTxValidationState()
valid int64
)
for i, tx := range txs {
replaced, err := pool.add(tx)
replaced, err := pool.addWithBatch(tx, batch)
errs[i] = err
if err == nil {
if !replaced {
Expand Down
29 changes: 20 additions & 9 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ func TestStateChangeDuringReset(t *testing.T) {

func testAddBalance(pool *LegacyPool, addr common.Address, amount *big.Int) {
pool.mu.Lock()
pool.currentState.AddBalance(addr, uint256.MustFromBig(amount), tracing.BalanceChangeUnspecified)
if amount.Sign() < 0 {
pool.currentState.SubBalance(addr, uint256.MustFromBig(new(big.Int).Neg(amount)), tracing.BalanceChangeUnspecified)
} else {
pool.currentState.AddBalance(addr, uint256.MustFromBig(amount), tracing.BalanceChangeUnspecified)
}
pool.mu.Unlock()
}

Expand Down Expand Up @@ -797,12 +801,13 @@ func TestPostponing(t *testing.T) {
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
accs[i] = crypto.PubkeyToAddress(keys[i].PublicKey)

testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(50100))
}
// Add a batch consecutive pending transactions for validation
// Build a batch of consecutive pending transactions and fund each account
// for the full batch so strict balance checks still admit the entire set.
txs := []*types.Transaction{}
initialBalances := make([]*big.Int, len(keys))
for i, key := range keys {
initialBalances[i] = new(big.Int)
for j := 0; j < 100; j++ {
var tx *types.Transaction
if (i+j)%2 == 0 {
Expand All @@ -811,8 +816,12 @@ func TestPostponing(t *testing.T) {
tx = transaction(uint64(j), 50000, key)
}
txs = append(txs, tx)
initialBalances[i].Add(initialBalances[i], tx.Cost())
}
}
for i, addr := range accs {
testAddBalance(pool, addr, initialBalances[i])
}
for i, err := range pool.addRemotesSync(txs) {
if err != nil {
t.Fatalf("tx %d: failed to add transactions: %v", i, err)
Expand All @@ -838,9 +847,11 @@ func TestPostponing(t *testing.T) {
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
}
// Reduce the balance of the account, and check that transactions are reorganised
for _, addr := range accs {
testAddBalance(pool, addr, big.NewInt(-1))
// Reduce the balance to just below the expensive tx cost so those txs are
// dropped while the cheaper ones are postponed into the queue.
reducedBalance := new(big.Int).Sub(txs[1].Cost(), big.NewInt(1))
for i, addr := range accs {
testAddBalance(pool, addr, new(big.Int).Sub(reducedBalance, initialBalances[i]))
}
<-pool.requestReset(nil, nil)

Expand Down Expand Up @@ -1352,7 +1363,7 @@ func TestPendingMinimumAllowance(t *testing.T) {
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(10000000))
}
// Generate and queue a batch of transactions
nonces := make(map[common.Address]uint64)
Expand Down Expand Up @@ -1745,7 +1756,7 @@ func TestStableUnderpricing(t *testing.T) {
keys := make([]*ecdsa.PrivateKey, 2)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000000))
}
// Fill up the entire queue with the same transaction price points
txs := types.Transactions{}
Expand Down