From 27b90e0d7e2096807adc3de2bd3ab4589cf2c8b6 Mon Sep 17 00:00:00 2001 From: Sef Boukenken Date: Tue, 22 Mar 2022 00:25:18 -0400 Subject: [PATCH] mempool: Invert reorg transaction handling. This commit inverts the order that transactions are added to the mempool during a reorg so that they are added in reverse block order. --- internal/mempool/mempool.go | 76 ++++++++++++++++++++++++++++++++ internal/mempool/mempool_test.go | 75 ++++++++++++++++++++++++++++++- server.go | 33 ++------------ 3 files changed, 153 insertions(+), 31 deletions(-) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 0923c47ac3..138d579a28 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -277,6 +277,8 @@ type TxPool struct { staged map[chainhash.Hash]*TxDesc stagedOutpoints map[wire.OutPoint]*dcrutil.Tx + transient map[chainhash.Hash]*dcrutil.Tx + // Votes on blocks. votesMtx sync.RWMutex votes map[chainhash.Hash][]mining.VoteDesc @@ -1094,6 +1096,13 @@ func (mp *TxPool) fetchInputUtxos(tx *dcrutil.Tx, isTreasuryEnabled bool) (*bloc utxoView.AddTxOut(stagedTxDesc.Tx, prevOut.Index, mining.UnminedHeight, wire.NullBlockIndex, isTreasuryEnabled) } + + if transientTx, exists := mp.transient[prevOut.Hash]; exists { + // AddTxOut ignores out of range index values, so it is safe to call + // without bounds checking here. + utxoView.AddTxOut(transientTx, prevOut.Index, mining.UnminedHeight, + wire.NullBlockIndex, isTreasuryEnabled) + } } return utxoView, nil @@ -1885,6 +1894,72 @@ func (mp *TxPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew, rateLimit bool) return hashes, err } +// isDoubleSpendOrDuplicateError returns whether or not the passed error, which +// is expected to have come from mempool, indicates a transaction was rejected +// either due to containing a double spend or already existing in the pool. +func isDoubleSpendOrDuplicateError(err error) bool { + switch { + case errors.Is(err, ErrDuplicate): + return true + case errors.Is(err, ErrAlreadyExists): + return true + case errors.Is(err, blockchain.ErrMissingTxOut): + return true + } + + return false +} + +// MaybeAcceptTransactions handles the insertion of a set of not new +// transactions that may have dependencies within the set. Transactions MUST be +// provided in block order, meaning that any transaction that depends on another +// within the provided set must come after its dependency in the array. Using +// this function is preferable when inserting a batch of not new transactions +// into the mempool to ensure that transactions that have existing mempool +// inputs are correctly added in reverse order. Preference should be given to +// use other mempool functions when adding new transactions to the mempool. +// +// This function is safe for concurrent access. +func (mp *TxPool) MaybeAcceptTransactions(txns []*dcrutil.Tx, rateLimit bool) error { + // Create agenda flags for checking transactions based on which ones are + // active or should otherwise always be enforced. + checkTxFlags, err := mp.determineCheckTxFlags() + if err != nil { + return err + } + + var errors []error + mp.mtx.Lock() + transientPool := mp.transient + for i := 0; i < len(txns)-1; i++ { + tx := txns[i] + transientPool[*tx.Hash()] = tx + } + for i := len(txns) - 1; i >= 0; i-- { + tx := txns[i] + delete(transientPool, *tx.Hash()) + _, err := mp.maybeAcceptTransaction(tx, false, rateLimit, true, true, + checkTxFlags) + if err != nil && !isDoubleSpendOrDuplicateError(err) { + mp.removeTransaction(tx, true) + continue + } + if err != nil { + errors = append(errors, err) + } + } + mp.mtx.Unlock() + + var finalErr error + switch { + case len(errors) == 1: + finalErr = errors[0] + case len(errors) > 1: + finalErr = blockchain.MultiError(errors) + } + return finalErr +} + // processOrphans is the internal function which implements the public // ProcessOrphans. See the comment for ProcessOrphans for more details. // @@ -2329,6 +2404,7 @@ func New(cfg *Config) *TxPool { nextExpireScan: time.Now().Add(orphanExpireScanInterval), staged: make(map[chainhash.Hash]*TxDesc), stagedOutpoints: make(map[wire.OutPoint]*dcrutil.Tx), + transient: make(map[chainhash.Hash]*dcrutil.Tx), } // for a given transaction, scan the mempool to find which transactions diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 101fea73ec..a2dd2abe3d 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -1059,8 +1059,8 @@ func TestTicketPurchaseOrphan(t *testing.T) { harness.chain.utxos.LookupEntry(outpoint).Spend() _, err = harness.txPool.MaybeAcceptTransaction(tx, false, false) if err != nil { - t.Fatalf("ProcessTransaction: failed to accept valid transaction %v", - err) + t.Fatalf("MaybeAcceptTransaction: failed to accept valid transaction "+ + "%v", err) } testPoolMembership(tc, tx, false, true) @@ -3250,3 +3250,74 @@ func TestSubsidySplitSemantics(t *testing.T) { } testPoolMembership(tc, postDCP0010Vote, false, true) } + +// TestMaybeAcceptTransactions attempts to add a collection of transactions +// provided in block order and accepted into the mempool in reverse block order. +// It uses the mining view side effects to verify that transactions were added +// in the correct order. +func TestMaybeAcceptTransactions(t *testing.T) { + harness, spendableOuts, err := newPoolHarness(chaincfg.MainNetParams()) + if err != nil { + t.Fatalf("unable to create mining harness: %v", err) + } + + applyTxFee := func(fee int64) func(*wire.MsgTx) { + return func(tx *wire.MsgTx) { + tx.TxOut[0].Value -= fee + } + } + + txA, _ := harness.CreateSignedTx([]spendableOutput{ + spendableOuts[0], + }, 1, applyTxFee(1000)) + + txB, _ := harness.CreateSignedTx([]spendableOutput{ + txOutToSpendableOut(txA, 0, wire.TxTreeRegular), + }, 1, applyTxFee(1000)) + + txC, _ := harness.CreateSignedTx([]spendableOutput{ + txOutToSpendableOut(txB, 0, wire.TxTreeRegular), + }, 1, applyTxFee(1000)) + + // Add transactions to mempool in block order. + txPool := harness.txPool + for _, tx := range []*dcrutil.Tx{txA, txB, txC} { + _, err := txPool.ProcessTransaction(tx, false, true, true, 0) + if err != nil { + t.Fatalf("failed to accept valid transaction: %v", err) + return + } + } + + testExpectedAncestorFee := func(tx *dcrutil.Tx, expectedFee int64) { + txHash := tx.Hash() + miningView := txPool.MiningView() + ancestorStats, exists := miningView.AncestorStats(txHash) + if !exists { + t.Fatalf("expected ancestor stats for transaction %v", txHash) + return + } + if ancestorStats.Fees != expectedFee { + t.Fatalf("unexpected ancestor fees for transaction %v -- "+ + "got %v, want %v", txHash, ancestorStats.Fees, expectedFee) + return + } + } + + testExpectedAncestorFee(txC, 2000) + + // Remove leading transactions from mempool. + txPool.RemoveTransaction(txA, false) + txPool.RemoveTransaction(txB, false) + + testExpectedAncestorFee(txC, 0) + + // Accept transactions provided in block order. + err = txPool.MaybeAcceptTransactions([]*dcrutil.Tx{txA, txB}, true) + if err != nil { + t.Fatalf("failed to accept valid transaction: %v", err) + return + } + + testExpectedAncestorFee(txC, 2000) +} diff --git a/server.go b/server.go index a12997a479..4c378eed14 100644 --- a/server.go +++ b/server.go @@ -2473,22 +2473,6 @@ func headerApprovesParent(header *wire.BlockHeader) bool { return dcrutil.IsFlagSet16(header.VoteBits, dcrutil.BlockValid) } -// isDoubleSpendOrDuplicateError returns whether or not the passed error, which -// is expected to have come from mempool, indicates a transaction was rejected -// either due to containing a double spend or already existing in the pool. -func isDoubleSpendOrDuplicateError(err error) bool { - switch { - case errors.Is(err, mempool.ErrDuplicate): - return true - case errors.Is(err, mempool.ErrAlreadyExists): - return true - case errors.Is(err, blockchain.ErrMissingTxOut): - return true - } - - return false -} - // proactivelyEvictSigCacheEntries fetches the block that is // txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to // SigCache to evict the entries associated with the transactions in that block. @@ -2724,14 +2708,9 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // case, anything that happens to be in the pool which depends on the // transaction is still valid. if !headerApprovesParent(&block.MsgBlock().Header) { - for _, tx := range parentBlock.Transactions()[1:] { - _, err := txMemPool.MaybeAcceptTransaction(tx, false, true) - if err != nil && !isDoubleSpendOrDuplicateError(err) { - txMemPool.RemoveTransaction(tx, true) - } - } + txns := parentBlock.Transactions()[1:] + txMemPool.MaybeAcceptTransactions(txns, true) } - if r := s.rpcServer; r != nil { // Filter and update the rebroadcast inventory. s.PruneRebroadcastInventory() @@ -2777,6 +2756,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // A block has been disconnected from the main block chain. case blockchain.NTBlockDisconnected: + // NOTE: The chain lock is released for this notification. ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData) if !ok { syncLog.Warnf("Block disconnected notification is not " + @@ -2829,12 +2809,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat // those outputs, and, in that case, anything that happens to be in the // pool which depends on the transaction is still valid. handleDisconnectedBlockTxns := func(txns []*dcrutil.Tx) { - for _, tx := range txns { - _, err := txMemPool.MaybeAcceptTransaction(tx, false, true) - if err != nil && !isDoubleSpendOrDuplicateError(err) { - txMemPool.RemoveTransaction(tx, true) - } - } + txMemPool.MaybeAcceptTransactions(txns, true) } handleDisconnectedBlockTxns(block.Transactions()[1:])