Skip to content

Commit

Permalink
mempool: Invert reorg transaction handling.
Browse files Browse the repository at this point in the history
This commit inverts the order that transactions
are added to the mempool during a reorg so that
they are added in reverse block order.
  • Loading branch information
sefbkn committed Jun 12, 2022
1 parent 9f8114e commit 27b90e0
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 31 deletions.
76 changes: 76 additions & 0 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ type TxPool struct {
staged map[chainhash.Hash]*TxDesc
stagedOutpoints map[wire.OutPoint]*dcrutil.Tx

transient map[chainhash.Hash]*dcrutil.Tx

// Votes on blocks.
votesMtx sync.RWMutex
votes map[chainhash.Hash][]mining.VoteDesc
Expand Down Expand Up @@ -1094,6 +1096,13 @@ func (mp *TxPool) fetchInputUtxos(tx *dcrutil.Tx, isTreasuryEnabled bool) (*bloc
utxoView.AddTxOut(stagedTxDesc.Tx, prevOut.Index, mining.UnminedHeight,
wire.NullBlockIndex, isTreasuryEnabled)
}

if transientTx, exists := mp.transient[prevOut.Hash]; exists {
// AddTxOut ignores out of range index values, so it is safe to call
// without bounds checking here.
utxoView.AddTxOut(transientTx, prevOut.Index, mining.UnminedHeight,
wire.NullBlockIndex, isTreasuryEnabled)
}
}

return utxoView, nil
Expand Down Expand Up @@ -1885,6 +1894,72 @@ func (mp *TxPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew, rateLimit bool)
return hashes, err
}

// isDoubleSpendOrDuplicateError returns whether or not the passed error, which
// is expected to have come from mempool, indicates a transaction was rejected
// either due to containing a double spend or already existing in the pool.
func isDoubleSpendOrDuplicateError(err error) bool {
switch {
case errors.Is(err, ErrDuplicate):
return true
case errors.Is(err, ErrAlreadyExists):
return true
case errors.Is(err, blockchain.ErrMissingTxOut):
return true
}

return false
}

// MaybeAcceptTransactions handles the insertion of a set of not new
// transactions that may have dependencies within the set. Transactions MUST be
// provided in block order, meaning that any transaction that depends on another
// within the provided set must come after its dependency in the array. Using
// this function is preferable when inserting a batch of not new transactions
// into the mempool to ensure that transactions that have existing mempool
// inputs are correctly added in reverse order. Preference should be given to
// use other mempool functions when adding new transactions to the mempool.
//
// This function is safe for concurrent access.
func (mp *TxPool) MaybeAcceptTransactions(txns []*dcrutil.Tx, rateLimit bool) error {
// Create agenda flags for checking transactions based on which ones are
// active or should otherwise always be enforced.
checkTxFlags, err := mp.determineCheckTxFlags()
if err != nil {
return err
}

var errors []error
mp.mtx.Lock()
transientPool := mp.transient
for i := 0; i < len(txns)-1; i++ {
tx := txns[i]
transientPool[*tx.Hash()] = tx
}
for i := len(txns) - 1; i >= 0; i-- {
tx := txns[i]
delete(transientPool, *tx.Hash())
_, err := mp.maybeAcceptTransaction(tx, false, rateLimit, true, true,
checkTxFlags)
if err != nil && !isDoubleSpendOrDuplicateError(err) {
mp.removeTransaction(tx, true)
continue
}
if err != nil {
errors = append(errors, err)
}
}
mp.mtx.Unlock()

var finalErr error
switch {
case len(errors) == 1:
finalErr = errors[0]
case len(errors) > 1:
finalErr = blockchain.MultiError(errors)
}
return finalErr
}

// processOrphans is the internal function which implements the public
// ProcessOrphans. See the comment for ProcessOrphans for more details.
//
Expand Down Expand Up @@ -2329,6 +2404,7 @@ func New(cfg *Config) *TxPool {
nextExpireScan: time.Now().Add(orphanExpireScanInterval),
staged: make(map[chainhash.Hash]*TxDesc),
stagedOutpoints: make(map[wire.OutPoint]*dcrutil.Tx),
transient: make(map[chainhash.Hash]*dcrutil.Tx),
}

// for a given transaction, scan the mempool to find which transactions
Expand Down
75 changes: 73 additions & 2 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,8 +1059,8 @@ func TestTicketPurchaseOrphan(t *testing.T) {
harness.chain.utxos.LookupEntry(outpoint).Spend()
_, err = harness.txPool.MaybeAcceptTransaction(tx, false, false)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid transaction %v",
err)
t.Fatalf("MaybeAcceptTransaction: failed to accept valid transaction "+
"%v", err)
}

testPoolMembership(tc, tx, false, true)
Expand Down Expand Up @@ -3250,3 +3250,74 @@ func TestSubsidySplitSemantics(t *testing.T) {
}
testPoolMembership(tc, postDCP0010Vote, false, true)
}

// TestMaybeAcceptTransactions attempts to add a collection of transactions
// provided in block order and accepted into the mempool in reverse block order.
// It uses the mining view side effects to verify that transactions were added
// in the correct order.
func TestMaybeAcceptTransactions(t *testing.T) {
harness, spendableOuts, err := newPoolHarness(chaincfg.MainNetParams())
if err != nil {
t.Fatalf("unable to create mining harness: %v", err)
}

applyTxFee := func(fee int64) func(*wire.MsgTx) {
return func(tx *wire.MsgTx) {
tx.TxOut[0].Value -= fee
}
}

txA, _ := harness.CreateSignedTx([]spendableOutput{
spendableOuts[0],
}, 1, applyTxFee(1000))

txB, _ := harness.CreateSignedTx([]spendableOutput{
txOutToSpendableOut(txA, 0, wire.TxTreeRegular),
}, 1, applyTxFee(1000))

txC, _ := harness.CreateSignedTx([]spendableOutput{
txOutToSpendableOut(txB, 0, wire.TxTreeRegular),
}, 1, applyTxFee(1000))

// Add transactions to mempool in block order.
txPool := harness.txPool
for _, tx := range []*dcrutil.Tx{txA, txB, txC} {
_, err := txPool.ProcessTransaction(tx, false, true, true, 0)
if err != nil {
t.Fatalf("failed to accept valid transaction: %v", err)
return
}
}

testExpectedAncestorFee := func(tx *dcrutil.Tx, expectedFee int64) {
txHash := tx.Hash()
miningView := txPool.MiningView()
ancestorStats, exists := miningView.AncestorStats(txHash)
if !exists {
t.Fatalf("expected ancestor stats for transaction %v", txHash)
return
}
if ancestorStats.Fees != expectedFee {
t.Fatalf("unexpected ancestor fees for transaction %v -- "+
"got %v, want %v", txHash, ancestorStats.Fees, expectedFee)
return
}
}

testExpectedAncestorFee(txC, 2000)

// Remove leading transactions from mempool.
txPool.RemoveTransaction(txA, false)
txPool.RemoveTransaction(txB, false)

testExpectedAncestorFee(txC, 0)

// Accept transactions provided in block order.
err = txPool.MaybeAcceptTransactions([]*dcrutil.Tx{txA, txB}, true)
if err != nil {
t.Fatalf("failed to accept valid transaction: %v", err)
return
}

testExpectedAncestorFee(txC, 2000)
}
33 changes: 4 additions & 29 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2473,22 +2473,6 @@ func headerApprovesParent(header *wire.BlockHeader) bool {
return dcrutil.IsFlagSet16(header.VoteBits, dcrutil.BlockValid)
}

// isDoubleSpendOrDuplicateError returns whether or not the passed error, which
// is expected to have come from mempool, indicates a transaction was rejected
// either due to containing a double spend or already existing in the pool.
func isDoubleSpendOrDuplicateError(err error) bool {
switch {
case errors.Is(err, mempool.ErrDuplicate):
return true
case errors.Is(err, mempool.ErrAlreadyExists):
return true
case errors.Is(err, blockchain.ErrMissingTxOut):
return true
}

return false
}

// proactivelyEvictSigCacheEntries fetches the block that is
// txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to
// SigCache to evict the entries associated with the transactions in that block.
Expand Down Expand Up @@ -2724,14 +2708,9 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// case, anything that happens to be in the pool which depends on the
// transaction is still valid.
if !headerApprovesParent(&block.MsgBlock().Header) {
for _, tx := range parentBlock.Transactions()[1:] {
_, err := txMemPool.MaybeAcceptTransaction(tx, false, true)
if err != nil && !isDoubleSpendOrDuplicateError(err) {
txMemPool.RemoveTransaction(tx, true)
}
}
txns := parentBlock.Transactions()[1:]
txMemPool.MaybeAcceptTransactions(txns, true)
}

if r := s.rpcServer; r != nil {
// Filter and update the rebroadcast inventory.
s.PruneRebroadcastInventory()
Expand Down Expand Up @@ -2777,6 +2756,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat

// A block has been disconnected from the main block chain.
case blockchain.NTBlockDisconnected:
// NOTE: The chain lock is released for this notification.
ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData)
if !ok {
syncLog.Warnf("Block disconnected notification is not " +
Expand Down Expand Up @@ -2829,12 +2809,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// those outputs, and, in that case, anything that happens to be in the
// pool which depends on the transaction is still valid.
handleDisconnectedBlockTxns := func(txns []*dcrutil.Tx) {
for _, tx := range txns {
_, err := txMemPool.MaybeAcceptTransaction(tx, false, true)
if err != nil && !isDoubleSpendOrDuplicateError(err) {
txMemPool.RemoveTransaction(tx, true)
}
}
txMemPool.MaybeAcceptTransactions(txns, true)
}
handleDisconnectedBlockTxns(block.Transactions()[1:])

Expand Down

0 comments on commit 27b90e0

Please sign in to comment.