Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions blockchain/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,8 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo

// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
// If noPending is set to true, we will only consider the floating list
func (l *txPricedList) Discard(slots int, local *accountSet) (types.Transactions, bool) {
Comment thread
yoomee1313 marked this conversation as resolved.
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep

Expand All @@ -680,5 +681,11 @@ func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions
for _, tx := range save {
heap.Push(l.items, tx)
}
return drop
if slots > 0 {
for _, tx := range drop {
heap.Push(l.items, tx)
}
return nil, false
}
return drop, true
}
161 changes: 83 additions & 78 deletions blockchain/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,13 @@ var (
// General tx metrics
invalidTxCounter = metrics.NewRegisteredCounter("txpool/invalid", nil)
underpricedTxCounter = metrics.NewRegisteredCounter("txpool/underpriced", nil)
overflowedTxMeter = metrics.NewRegisteredMeter("txpool/overflowed", nil)
refusedTxCounter = metrics.NewRegisteredCounter("txpool/refuse", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)

// throttleTxMeter counts how many transactions are rejected due to too-many-changes between
// txpool reorgs.
throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil)
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -238,6 +243,8 @@ type TxPool struct {

wg sync.WaitGroup // for shutdown sync

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

txMsgCh chan types.Transactions // A buffer for async tx intake via AddRemotes
txFeedCh chan types.Transactions // A buffer for async tx event emission via txFeed

Expand Down Expand Up @@ -519,6 +526,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.setPendingNonce(addr, txs[len(txs)-1].Nonce()+1)
}
}
pool.changesSinceReorg = 0
pool.txMu.Unlock()
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
Expand Down Expand Up @@ -979,22 +987,28 @@ func (pool *TxPool) validateAuth(tx *types.Transaction) error {
return nil
}

// getMaxTxFromQueueWhenNonceIsMissing finds and returns a trasaction with max nonce in queue when a given Tx has missing nonce.
// Otherwise it returns a given Tx itself.
func (pool *TxPool) getMaxTxFromQueueWhenNonceIsMissing(tx *types.Transaction, from *common.Address) *types.Transaction {
txs := pool.queue[*from].txs

maxTx := tx
if txs.Get(tx.Nonce()) != nil {
return maxTx
// isGapped reports whether the given transaction is immediately executable.
func (pool *TxPool) isGapped(tx *types.Transaction, from common.Address) bool {
// Short circuit if transaction falls within the scope of the pending list
// or matches the next pending nonce which can be promoted as an executable
// transaction afterwards. Note, the tx staleness is already checked in
// 'validateTx' function previously.
next := pool.getPendingNonce(from)
if tx.Nonce() <= next {
return false
}

for _, t := range txs.items {
if maxTx.Nonce() < t.Nonce() {
maxTx = t
// The transaction has a nonce gap with pending list, it's only considered
// as executable if transactions in queue can fill up the nonce gap.
queue, ok := pool.queue[from]
if !ok {
return true
}
for nonce := next; nonce < tx.Nonce(); nonce++ {
if !queue.Contains(nonce) {
return true // txs in queue can't fill up the nonce gap
}
}
return maxTx
return false
}

// add validates a transaction and inserts it into the non-executable queue for
Expand Down Expand Up @@ -1029,49 +1043,66 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
return false, err
}

// If the transaction pool is full and new Tx is valid,
// (1) discard a new Tx if there is no room for the account of the Tx
// (2) remove an old Tx with the largest nonce from queue to make a room for a new Tx with missing nonce
// (3) discard a new Tx if the new Tx does not have a missing nonce
// (4) discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll {
// (1) discard a new Tx if there is no room for the account of the Tx
from, _ := types.Sender(pool.signer, tx)
if pool.queue[from] == nil {
logger.Trace("Rejecting a new Tx, because TxPool is full and there is no room for the account", "hash", tx.Hash(), "account", from)
refusedTxCounter.Inc(1)
return false, fmt.Errorf("txpool is full: %d", uint64(pool.all.Count()))
}

maxTx := pool.getMaxTxFromQueueWhenNonceIsMissing(tx, &from)
if maxTx != tx {
// (2) remove an old Tx with the largest nonce from queue to make a room for a new Tx with missing nonce
pool.removeTx(maxTx.Hash(), true)
logger.Trace("Removing an old Tx with the max nonce to insert a new Tx with missing nonce, because TxPool is full", "account", from, "new nonce(previously missing)", tx.Nonce(), "removed max nonce", maxTx.Nonce())
} else {
// (3) discard a new Tx if the new Tx does not have a missing nonce
logger.Trace("Rejecting a new Tx, because TxPool is full and a new TX does not have missing nonce", "hash", tx.Hash())
refusedTxCounter.Inc(1)
return false, fmt.Errorf("txpool is full and the new tx does not have missing nonce: %d", uint64(pool.all.Count()))
}
from, _ := types.Sender(pool.signer, tx)

// (4) discard underpriced transactions
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll {
// If the new transaction is underpriced, don't accept it
if !local && pool.priced.Underpriced(tx, pool.locals) {
Comment thread
hyeonLewis marked this conversation as resolved.
logger.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}

// We're about to replace a transaction. The reorg does a more thorough
// analysis of what to remove and how, but it runs async. We don't want to
// do too many replacements between reorg-runs, so we cap the number of
// replacements to 25% of the slots
if pool.changesSinceReorg > int(pool.config.ExecSlotsAll/4) {
throttleTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}

// New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(pool.all.Slots()-int(pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll)+numSlots(tx), pool.locals)
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll)+numSlots(tx), pool.locals)
// Special case, we still can't make the room for the new remote one.
if !local && !success {
logger.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}

// If the new transaction is a future transaction it should never churn pending transactions
if !local && pool.isGapped(tx, from) {
var replacesPending bool
for _, dropTx := range drop {
dropSender, _ := types.Sender(pool.signer, dropTx)
if list := pool.pending[dropSender]; list != nil && list.Contains(dropTx.Nonce()) {
replacesPending = true
break
}
}
// Add all transactions back to the priced queue
if replacesPending {
for _, dropTx := range drop {
pool.priced.Put(dropTx)
}
logger.Trace("Discarding future transaction replacing pending tx", "hash", hash)
refusedTxCounter.Inc(1)
return false, ErrFutureReplacePending
}
}

// Kick out the underpriced remote transactions.
for _, tx := range drop {
logger.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash(), false)
dropped := pool.removeTx(tx.Hash(), false)

pool.changesSinceReorg += dropped
}
}
// If the transaction is replacing an already pending one, do directly
from, _ := types.Sender(pool.signer, tx) // already validated
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump, pool.rules.IsMagma)
Expand Down Expand Up @@ -1343,12 +1374,6 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return errNotAllowedAnchoringTx
}

pool.mu.RLock()
poolSize := uint64(pool.all.Count())
pool.mu.RUnlock()
if poolSize >= pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll {
return fmt.Errorf("txpool is full: %d", poolSize)
}
return pool.addTx(tx, !pool.config.NoLocals)
}

Expand All @@ -1363,39 +1388,14 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
// marking the senders as a local ones in the mean time, ensuring they go around
// the local pricing constraints.
func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
return pool.checkAndAddTxs(txs, !pool.config.NoLocals)
return pool.addTxs(txs, !pool.config.NoLocals)
}

// AddRemotes enqueues a batch of transactions into the pool if they are valid.
// If the senders are not among the locally tracked ones, full pricing constraints
// will apply.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.checkAndAddTxs(txs, false)
}

// checkAndAddTxs compares the size of given transactions and the capacity of TxPool.
// If given transactions exceed the capacity of TxPool, it slices the given transactions
// so it can fit into TxPool's capacity.
func (pool *TxPool) checkAndAddTxs(txs []*types.Transaction, local bool) []error {
pool.mu.RLock()
poolSize := uint64(pool.all.Count())
pool.mu.RUnlock()
poolCapacity := int(pool.config.ExecSlotsAll + pool.config.NonExecSlotsAll - poolSize)
numTxs := len(txs)

if poolCapacity < numTxs {
txs = txs[:poolCapacity]
}

errs := pool.addTxs(txs, local)

if poolCapacity < numTxs {
for i := 0; i < numTxs-poolCapacity; i++ {
errs = append(errs, ErrTxPoolOverflow)
}
}

return errs
return pool.addTxs(txs, false)
}

// addTx enqueues a single transaction into the pool if it is valid.
Expand Down Expand Up @@ -1493,11 +1493,13 @@ func (pool *TxPool) checkAndSetBeat(addr common.Address) {

// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
//
// Returns the number of transactions removed from the pending queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) int {
Comment thread
tnasu marked this conversation as resolved.
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return
return 0
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

Expand All @@ -1518,16 +1520,18 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.enqueueTx(tx.Hash(), tx)
}
pool.updatePendingNonce(addr, tx.Nonce())
return
return 1 + len(invalids)
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
future.Remove(tx)
if future.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
return 0
}

// promoteExecutables moves transactions that have become processable from the
Expand Down Expand Up @@ -1598,6 +1602,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
// Notify subsystem for new promoted transactions.
Expand Down