Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ var (
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")

// ErrTxPoolOverflow is returned if the transaction pool is full and can't accpet
// another remote transaction.
ErrTxPoolOverflow = errors.New("txpool is full")

// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
Expand All @@ -47,9 +43,6 @@ var (

// ErrAddressBlacklisted is returned if a transaction is sent to blacklisted address
ErrAddressBlacklisted = errors.New("address is blacklisted")
// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")

// ErrAlreadyReserved is returned if the sender address has a pending transaction
// in a different subpool. For example, this error is returned in response to any
Expand Down
205 changes: 178 additions & 27 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package legacypool

import (
"container/heap"
"errors"
"math"
"math/big"
"slices"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -53,6 +55,25 @@ const (
txMaxSize = 4 * txSlotSize // 128KB
)

var (
// ErrTxPoolOverflow is returned if the transaction pool is full and can't accept
// another remote transaction.
ErrTxPoolOverflow = errors.New("txpool is full")

// ErrInflightTxLimitReached is returned when the maximum number of in-flight
// transactions is reached for specific accounts.
ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts")

// ErrAuthorityReserved is returned if a transaction has an authorization
// signed by an address which already has in-flight transactions known to the
// pool.
ErrAuthorityReserved = errors.New("authority already reserved")

// ErrFutureReplacePending is returned if a future transaction replaces a pending
// one. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
Expand Down Expand Up @@ -186,6 +207,20 @@ func (config *Config) sanitize() Config {
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
//
// In addition to tracking transactions, the pool also tracks a set of pending SetCode
// authorizations (EIP7702). This helps minimize number of transactions that can be
// trivially churned in the pool. As a standard rule, any account with a deployed
// delegation or an in-flight authorization to deploy a delegation will only be allowed a
// single transaction slot instead of the standard number. This is due to the possibility
// of the account being sweeped by an unrelated account.
//
// Because SetCode transactions can have many authorizations included, we avoid explicitly
// checking their validity to save the state lookup. So long as the encompassing
// transaction is valid, the authorization will be accepted and tracked by the pool. In
// case the pool is tracking a pending / queued transaction from a specific account, it
// will reject new transactions with delegations from that account with standard in-flight
// transactions.
type LegacyPool struct {
config Config
chainconfig *params.ChainConfig
Expand Down Expand Up @@ -270,7 +305,7 @@ func New(config Config, chainconfig *params.ChainConfig, chain blockChain) *Lega
// pool, specifically, whether it is a Legacy, AccessList, Dynamic or Sponsored transaction.
func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
switch tx.Type() {
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.SponsoredTxType:
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.SponsoredTxType, types.SetCodeTxType:
return true
default:
return false
Expand Down Expand Up @@ -594,9 +629,10 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
opts := &txpool.ValidationOptions{
Config: pool.chainconfig,
Accept: 0 |
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType,
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType |
1<<types.SetCodeTxType,
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load(),
AcceptSponsoredTx: true,
Expand All @@ -614,21 +650,11 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
opts := &txpool.ValidationOptionsWithState{
Config: pool.chainconfig,
State: pool.currentState,
Head: pool.currentHead.Load(),
FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
// The global and account slot and queue are checked later
UsedAndLeftSlots: func(addr common.Address) (int, int) {
var have int
if list := pool.pending[addr]; list != nil {
have += list.Len()
}
if list := pool.queue[addr]; list != nil {
have += list.Len()
}
return have, math.MaxInt
},
Config: pool.chainconfig,
State: pool.currentState,
Head: pool.currentHead.Load(),
FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
UsedAndLeftSlots: nil, // Pool has own mechanism to limit the number of transactions
ExistingExpenditure: func(addr common.Address) *big.Int {
return pool.getAccountPendingCost(addr)
},
Expand All @@ -645,6 +671,56 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
return err
}

return pool.validateAuth(tx)
}

// validateAuth verifies that the transaction complies with code authorization
// restrictions brought by SetCode transaction type.
func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
from, _ := types.Sender(pool.signer, tx) // validated

// Allow at most one in-flight tx for delegated accounts or those with a
// pending authorization.
if pool.currentState.GetCodeHash(from) != types.EmptyCodeHash || len(pool.all.auths[from]) != 0 {
var (
count int
exists bool
)
pending := pool.pending[from]
if pending != nil {
count += pending.Len()
exists = pending.Contains(tx.Nonce())
}
queue := pool.queue[from]
if queue != nil {
count += queue.Len()
exists = exists || queue.Contains(tx.Nonce())
}
// Replace the existing in-flight transaction for delegated accounts
// are still supported
if count >= 1 && !exists {
return ErrInflightTxLimitReached
}
}
// Allow at most one in-flight tx for delegated accounts or those with a
// pending authorization in case of sponsor tx.
if tx.Type() == types.SponsoredTxType {
payer, err := types.Payer(pool.signer, tx)
if err != nil {
return err
}
if pool.currentState.GetCodeHash(payer) != types.EmptyCodeHash || len(pool.all.auths[payer]) != 0 {
return ErrInflightTxLimitReached
}
}
// Authorities cannot conflict with any pending or queued transactions.
if auths := tx.SetCodeAuthorities(); len(auths) > 0 {
for _, auth := range auths {
if pool.pending[auth] != nil || pool.queue[auth] != nil {
return ErrAuthorityReserved
}
}
}
return nil
}

Expand Down Expand Up @@ -712,7 +788,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// replacements to 25% of the slots
if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) {
throttleTxMeter.Mark(1)
return false, txpool.ErrTxPoolOverflow
return false, ErrTxPoolOverflow
}

// New transaction is better than our worse ones, make room for it.
Expand All @@ -724,7 +800,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
if !local && !success {
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
return false, txpool.ErrTxPoolOverflow
return false, ErrTxPoolOverflow
}

// If the new transaction is a future transaction it should never churn pending transactions
Expand All @@ -743,7 +819,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
heap.Push(&pool.priced.urgent, dropTx)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
return false, txpool.ErrFutureReplacePending
return false, ErrFutureReplacePending
}
}

Expand Down Expand Up @@ -1415,8 +1491,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
pool.all.Remove(tx.Hash())
}
log.Trace("Removed old queued transactions", "count", len(forwards))
payers := list.Payers()
Expand All @@ -1430,8 +1505,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
maxGas := txpool.CurrentBlockMaxGas(pool.chainconfig, head)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), maxGas, payerCostLimit, head.Time)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
pool.all.Remove(tx.Hash())
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
Expand Down Expand Up @@ -1639,8 +1713,8 @@ func (pool *LegacyPool) demoteUnexecutables() {
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), maxGas, payerCostLimit, head.Time)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
log.Trace("Removed unpayable pending transaction", "hash", hash)
}
pendingNofundsMeter.Mark(int64(len(drops)))

Expand Down Expand Up @@ -1677,6 +1751,41 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
}

// Clear removing all tracked txs from the pool
// and rotating the journal.
func (pool *LegacyPool) Clear() {
pool.mu.Lock()
defer pool.mu.Unlock()

// unreserve each tracked account. Ideally, we could just clear the
// reservation map in the parent txpool context. However, if we clear in
// parent context, to avoid exposing the subpool lock, we have to lock the
// reservations and then lock each subpool.
//
// This creates the potential for a deadlock situation:
//
// * TxPool.Clear locks the reservations
// * a new transaction is received which locks the subpool mutex
// * TxPool.Clear attempts to lock subpool mutex
//
// The transaction addition may attempt to reserve the sender addr which
// can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed.
for _, tx := range pool.all.locals {
senderAddr, _ := types.Sender(pool.signer, tx)
pool.reserve(senderAddr, false)
}
for _, tx := range pool.all.remotes {
senderAddr, _ := types.Sender(pool.signer, tx)
pool.reserve(senderAddr, false)
}
pool.all = newLookup()
pool.priced = newPricedList(pool.all)
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)
pool.pendingNonces = newNoncer(pool.currentState)
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down Expand Up @@ -1776,13 +1885,15 @@ type lookup struct {
lock sync.RWMutex
locals map[common.Hash]*types.Transaction
remotes map[common.Hash]*types.Transaction
auths map[common.Address][]common.Hash // All accounts with a pooled authorization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my opinion no need to change as I see go-ethereum implements the same logic, I feel like we can make this map[common.Address]int (something like reference count to keep track auth per account). I think []common.Hash is only useful to detect if there is bug in implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useful to detect if there is bug in implementation.

hmm, I cannot make sure there is no bug exists

}

// newLookup returns a new lookup structure.
func newLookup() *lookup {
return &lookup{
locals: make(map[common.Hash]*types.Transaction),
remotes: make(map[common.Hash]*types.Transaction),
auths: make(map[common.Address][]common.Hash),
}
}

Expand Down Expand Up @@ -1873,6 +1984,7 @@ func (t *lookup) Add(tx *types.Transaction, local bool) {
t.lock.Lock()
defer t.lock.Unlock()

t.addAuthorities(tx)
t.slots += numSlots(tx)
slotsGauge.Update(int64(t.slots))

Expand All @@ -1896,6 +2008,7 @@ func (t *lookup) Remove(hash common.Hash) {
log.Error("No transaction found to be deleted", "hash", hash)
return
}
t.removeAuthorities(tx)
t.slots -= numSlots(tx)
slotsGauge.Update(int64(t.slots))

Expand Down Expand Up @@ -1939,6 +2052,44 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int, isVenoki bool) types.Transa
return found
}

// addAuthorities tracks the supplied tx in relation to each authority it
// specifies.
func (t *lookup) addAuthorities(tx *types.Transaction) {
for _, addr := range tx.SetCodeAuthorities() {
list, ok := t.auths[addr]
if !ok {
list = []common.Hash{}
}
if slices.Contains(list, tx.Hash()) {
// Don't add duplicates.
continue
}
list = append(list, tx.Hash())
t.auths[addr] = list
}
}

// removeAuthorities stops tracking the supplied tx in relation to its
// authorities.
func (t *lookup) removeAuthorities(tx *types.Transaction) {
hash := tx.Hash()
for _, addr := range tx.SetCodeAuthorities() {
list := t.auths[addr]
// Remove tx from tracker.
if i := slices.Index(list, hash); i >= 0 {
list = append(list[:i], list[i+1:]...)
} else {
log.Error("Authority with untracked tx", "addr", addr, "hash", hash)
}
if len(list) == 0 {
// If list is newly empty, delete it entirely.
delete(t.auths, addr)
continue
}
t.auths[addr] = list
}
}

// numSlots calculates the number of slots needed for a single transaction.
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
Expand Down
Loading