diff --git a/blockchain/tx_list.go b/blockchain/tx_list.go index 4f02a2665..7eec634b9 100644 --- a/blockchain/tx_list.go +++ b/blockchain/tx_list.go @@ -658,7 +658,8 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. -func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions { +// If noPending is set to true, we will only consider the floating list +func (l *txPricedList) Discard(slots int, local *accountSet) (types.Transactions, bool) { drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep @@ -680,5 +681,11 @@ func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions for _, tx := range save { heap.Push(l.items, tx) } - return drop + if slots > 0 { + for _, tx := range drop { + heap.Push(l.items, tx) + } + return nil, false + } + return drop, true } diff --git a/blockchain/tx_pool.go b/blockchain/tx_pool.go index 3f8f3df3b..d9cf24547 100644 --- a/blockchain/tx_pool.go +++ b/blockchain/tx_pool.go @@ -112,8 +112,13 @@ var ( // General tx metrics invalidTxCounter = metrics.NewRegisteredCounter("txpool/invalid", nil) underpricedTxCounter = metrics.NewRegisteredCounter("txpool/underpriced", nil) + overflowedTxMeter = metrics.NewRegisteredMeter("txpool/overflowed", nil) refusedTxCounter = metrics.NewRegisteredCounter("txpool/refuse", nil) slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) + + // throttleTxMeter counts how many transactions are rejected due to too-many-changes between + // txpool reorgs. + throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -238,6 +243,8 @@ type TxPool struct { wg sync.WaitGroup // for shutdown sync + changesSinceReorg int // A counter for how many drops we've performed in-between reorg. + txMsgCh chan types.Transactions // A buffer for async tx intake via AddRemotes txFeedCh chan types.Transactions // A buffer for async tx event emission via txFeed @@ -519,6 +526,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.setPendingNonce(addr, txs[len(txs)-1].Nonce()+1) } } + pool.changesSinceReorg = 0 pool.txMu.Unlock() // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid @@ -979,22 +987,28 @@ func (pool *TxPool) validateAuth(tx *types.Transaction) error { return nil } -// getMaxTxFromQueueWhenNonceIsMissing finds and returns a trasaction with max nonce in queue when a given Tx has missing nonce. -// Otherwise it returns a given Tx itself. -func (pool *TxPool) getMaxTxFromQueueWhenNonceIsMissing(tx *types.Transaction, from *common.Address) *types.Transaction { - txs := pool.queue[*from].txs - - maxTx := tx - if txs.Get(tx.Nonce()) != nil { - return maxTx +// isGapped reports whether the given transaction is immediately executable. +func (pool *TxPool) isGapped(tx *types.Transaction, from common.Address) bool { + // Short circuit if transaction falls within the scope of the pending list + // or matches the next pending nonce which can be promoted as an executable + // transaction afterwards. Note, the tx staleness is already checked in + // 'validateTx' function previously. + next := pool.getPendingNonce(from) + if tx.Nonce() <= next { + return false } - - for _, t := range txs.items { - if maxTx.Nonce() < t.Nonce() { - maxTx = t + // The transaction has a nonce gap with pending list, it's only considered + // as executable if transactions in queue can fill up the nonce gap. + queue, ok := pool.queue[from] + if !ok { + return true + } + for nonce := next; nonce < tx.Nonce(); nonce++ { + if !queue.Contains(nonce) { + return true // txs in queue can't fill up the nonce gap } } - return maxTx + return false } // add validates a transaction and inserts it into the non-executable queue for @@ -1029,49 +1043,66 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { return false, err } - // If the transaction pool is full and new Tx is valid, - // (1) discard a new Tx if there is no room for the account of the Tx - // (2) remove an old Tx with the largest nonce from queue to make a room for a new Tx with missing nonce - // (3) discard a new Tx if the new Tx does not have a missing nonce - // (4) discard underpriced transactions - if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll { - // (1) discard a new Tx if there is no room for the account of the Tx - from, _ := types.Sender(pool.signer, tx) - if pool.queue[from] == nil { - logger.Trace("Rejecting a new Tx, because TxPool is full and there is no room for the account", "hash", tx.Hash(), "account", from) - refusedTxCounter.Inc(1) - return false, fmt.Errorf("txpool is full: %d", uint64(pool.all.Count())) - } - - maxTx := pool.getMaxTxFromQueueWhenNonceIsMissing(tx, &from) - if maxTx != tx { - // (2) remove an old Tx with the largest nonce from queue to make a room for a new Tx with missing nonce - pool.removeTx(maxTx.Hash(), true) - logger.Trace("Removing an old Tx with the max nonce to insert a new Tx with missing nonce, because TxPool is full", "account", from, "new nonce(previously missing)", tx.Nonce(), "removed max nonce", maxTx.Nonce()) - } else { - // (3) discard a new Tx if the new Tx does not have a missing nonce - logger.Trace("Rejecting a new Tx, because TxPool is full and a new TX does not have missing nonce", "hash", tx.Hash()) - refusedTxCounter.Inc(1) - return false, fmt.Errorf("txpool is full and the new tx does not have missing nonce: %d", uint64(pool.all.Count())) - } + from, _ := types.Sender(pool.signer, tx) - // (4) discard underpriced transactions + // If the transaction pool is full, discard underpriced transactions + if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll { // If the new transaction is underpriced, don't accept it if !local && pool.priced.Underpriced(tx, pool.locals) { logger.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) underpricedTxCounter.Inc(1) return false, ErrUnderpriced } + + // We're about to replace a transaction. The reorg does a more thorough + // analysis of what to remove and how, but it runs async. We don't want to + // do too many replacements between reorg-runs, so we cap the number of + // replacements to 25% of the slots + if pool.changesSinceReorg > int(pool.config.ExecSlotsAll/4) { + throttleTxMeter.Mark(1) + return false, ErrTxPoolOverflow + } + // New transaction is better than our worse ones, make room for it - drop := pool.priced.Discard(pool.all.Slots()-int(pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll)+numSlots(tx), pool.locals) + drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll)+numSlots(tx), pool.locals) + // Special case, we still can't make the room for the new remote one. + if !local && !success { + logger.Trace("Discarding overflown transaction", "hash", hash) + overflowedTxMeter.Mark(1) + return false, ErrTxPoolOverflow + } + + // If the new transaction is a future transaction it should never churn pending transactions + if !local && pool.isGapped(tx, from) { + var replacesPending bool + for _, dropTx := range drop { + dropSender, _ := types.Sender(pool.signer, dropTx) + if list := pool.pending[dropSender]; list != nil && list.Contains(dropTx.Nonce()) { + replacesPending = true + break + } + } + // Add all transactions back to the priced queue + if replacesPending { + for _, dropTx := range drop { + pool.priced.Put(dropTx) + } + logger.Trace("Discarding future transaction replacing pending tx", "hash", hash) + refusedTxCounter.Inc(1) + return false, ErrFutureReplacePending + } + } + + // Kick out the underpriced remote transactions. for _, tx := range drop { logger.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxCounter.Inc(1) - pool.removeTx(tx.Hash(), false) + dropped := pool.removeTx(tx.Hash(), false) + + pool.changesSinceReorg += dropped } } // If the transaction is replacing an already pending one, do directly - from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump, pool.rules.IsMagma) @@ -1343,12 +1374,6 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error { return errNotAllowedAnchoringTx } - pool.mu.RLock() - poolSize := uint64(pool.all.Count()) - pool.mu.RUnlock() - if poolSize >= pool.config.ExecSlotsAll+pool.config.NonExecSlotsAll { - return fmt.Errorf("txpool is full: %d", poolSize) - } return pool.addTx(tx, !pool.config.NoLocals) } @@ -1363,39 +1388,14 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { - return pool.checkAndAddTxs(txs, !pool.config.NoLocals) + return pool.addTxs(txs, !pool.config.NoLocals) } // AddRemotes enqueues a batch of transactions into the pool if they are valid. // If the senders are not among the locally tracked ones, full pricing constraints // will apply. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { - return pool.checkAndAddTxs(txs, false) -} - -// checkAndAddTxs compares the size of given transactions and the capacity of TxPool. -// If given transactions exceed the capacity of TxPool, it slices the given transactions -// so it can fit into TxPool's capacity. -func (pool *TxPool) checkAndAddTxs(txs []*types.Transaction, local bool) []error { - pool.mu.RLock() - poolSize := uint64(pool.all.Count()) - pool.mu.RUnlock() - poolCapacity := int(pool.config.ExecSlotsAll + pool.config.NonExecSlotsAll - poolSize) - numTxs := len(txs) - - if poolCapacity < numTxs { - txs = txs[:poolCapacity] - } - - errs := pool.addTxs(txs, local) - - if poolCapacity < numTxs { - for i := 0; i < numTxs-poolCapacity; i++ { - errs = append(errs, ErrTxPoolOverflow) - } - } - - return errs + return pool.addTxs(txs, false) } // addTx enqueues a single transaction into the pool if it is valid. @@ -1493,11 +1493,13 @@ func (pool *TxPool) checkAndSetBeat(addr common.Address) { // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. -func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { +// +// Returns the number of transactions removed from the pending queue. +func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) int { // Fetch the transaction we wish to delete tx := pool.all.Get(hash) if tx == nil { - return + return 0 } addr, _ := types.Sender(pool.signer, tx) // already validated during insertion @@ -1518,7 +1520,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { pool.enqueueTx(tx.Hash(), tx) } pool.updatePendingNonce(addr, tx.Nonce()) - return + return 1 + len(invalids) } } // Transaction is in the future queue @@ -1526,8 +1528,10 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { future.Remove(tx) if future.Empty() { delete(pool.queue, addr) + delete(pool.beats, addr) } } + return 0 } // promoteExecutables moves transactions that have become processable from the @@ -1598,6 +1602,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) + delete(pool.beats, addr) } } // Notify subsystem for new promoted transactions.