Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mempool: Invert reorg transaction handling. #2956

Merged
merged 1 commit into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ type TxPool struct {
staged map[chainhash.Hash]*TxDesc
stagedOutpoints map[wire.OutPoint]*dcrutil.Tx

transient map[chainhash.Hash]*dcrutil.Tx

// Votes on blocks.
votesMtx sync.RWMutex
votes map[chainhash.Hash][]mining.VoteDesc
Expand Down Expand Up @@ -1094,6 +1096,13 @@ func (mp *TxPool) fetchInputUtxos(tx *dcrutil.Tx, isTreasuryEnabled bool) (*bloc
utxoView.AddTxOut(stagedTxDesc.Tx, prevOut.Index, mining.UnminedHeight,
wire.NullBlockIndex, isTreasuryEnabled)
}

if transientTx, exists := mp.transient[prevOut.Hash]; exists {
// AddTxOut ignores out of range index values, so it is safe to call
// without bounds checking here.
utxoView.AddTxOut(transientTx, prevOut.Index, mining.UnminedHeight,
wire.NullBlockIndex, isTreasuryEnabled)
}
}

return utxoView, nil
Expand Down Expand Up @@ -1885,6 +1894,72 @@ func (mp *TxPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew, rateLimit bool)
return hashes, err
}

// isDoubleSpendOrDuplicateError returns whether or not the passed error, which
// is expected to have come from mempool, indicates a transaction was rejected
// either due to containing a double spend or already existing in the pool.
func isDoubleSpendOrDuplicateError(err error) bool {
switch {
case errors.Is(err, ErrDuplicate):
return true
case errors.Is(err, ErrAlreadyExists):
return true
case errors.Is(err, blockchain.ErrMissingTxOut):
return true
}

return false
}

// MaybeAcceptTransactions handles the insertion of a set of not new
// transactions that may have dependencies within the set. Transactions MUST be
// provided in block order, meaning that any transaction that depends on another
// within the provided set must come after its dependency in the array. Using
// this function is preferable when inserting a batch of not new transactions
// into the mempool to ensure that transactions that have existing mempool
// inputs are correctly added in reverse order. Preference should be given to
// use other mempool functions when adding new transactions to the mempool.
//
// This function is safe for concurrent access.
func (mp *TxPool) MaybeAcceptTransactions(txns []*dcrutil.Tx, rateLimit bool) error {
// Create agenda flags for checking transactions based on which ones are
// active or should otherwise always be enforced.
checkTxFlags, err := mp.determineCheckTxFlags()
if err != nil {
return err
}

var errors []error
mp.mtx.Lock()
transientPool := mp.transient
for i := 0; i < len(txns)-1; i++ {
tx := txns[i]
transientPool[*tx.Hash()] = tx
}
for i := len(txns) - 1; i >= 0; i-- {
tx := txns[i]
delete(transientPool, *tx.Hash())
_, err := mp.maybeAcceptTransaction(tx, false, rateLimit, true, true,
checkTxFlags)
if err != nil && !isDoubleSpendOrDuplicateError(err) {
mp.removeTransaction(tx, true)
continue
}
if err != nil {
errors = append(errors, err)
}
}
mp.mtx.Unlock()

var finalErr error
switch {
case len(errors) == 1:
finalErr = errors[0]
case len(errors) > 1:
finalErr = blockchain.MultiError(errors)
}
return finalErr
}

// processOrphans is the internal function which implements the public
// ProcessOrphans. See the comment for ProcessOrphans for more details.
//
Expand Down Expand Up @@ -2329,6 +2404,7 @@ func New(cfg *Config) *TxPool {
nextExpireScan: time.Now().Add(orphanExpireScanInterval),
staged: make(map[chainhash.Hash]*TxDesc),
stagedOutpoints: make(map[wire.OutPoint]*dcrutil.Tx),
transient: make(map[chainhash.Hash]*dcrutil.Tx),
}

// for a given transaction, scan the mempool to find which transactions
Expand Down
75 changes: 73 additions & 2 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,8 +1059,8 @@ func TestTicketPurchaseOrphan(t *testing.T) {
harness.chain.utxos.LookupEntry(outpoint).Spend()
_, err = harness.txPool.MaybeAcceptTransaction(tx, false, false)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid transaction %v",
err)
t.Fatalf("MaybeAcceptTransaction: failed to accept valid transaction "+
"%v", err)
}

testPoolMembership(tc, tx, false, true)
Expand Down Expand Up @@ -3250,3 +3250,74 @@ func TestSubsidySplitSemantics(t *testing.T) {
}
testPoolMembership(tc, postDCP0010Vote, false, true)
}

// TestMaybeAcceptTransactions attempts to add a collection of transactions
// provided in block order and accepted into the mempool in reverse block order.
// It uses the mining view side effects to verify that transactions were added
// in the correct order.
func TestMaybeAcceptTransactions(t *testing.T) {
harness, spendableOuts, err := newPoolHarness(chaincfg.MainNetParams())
if err != nil {
t.Fatalf("unable to create mining harness: %v", err)
}

applyTxFee := func(fee int64) func(*wire.MsgTx) {
return func(tx *wire.MsgTx) {
tx.TxOut[0].Value -= fee
}
}

txA, _ := harness.CreateSignedTx([]spendableOutput{
spendableOuts[0],
}, 1, applyTxFee(1000))

txB, _ := harness.CreateSignedTx([]spendableOutput{
txOutToSpendableOut(txA, 0, wire.TxTreeRegular),
}, 1, applyTxFee(1000))

txC, _ := harness.CreateSignedTx([]spendableOutput{
txOutToSpendableOut(txB, 0, wire.TxTreeRegular),
}, 1, applyTxFee(1000))

// Add transactions to mempool in block order.
txPool := harness.txPool
for _, tx := range []*dcrutil.Tx{txA, txB, txC} {
_, err := txPool.ProcessTransaction(tx, false, true, true, 0)
if err != nil {
t.Fatalf("failed to accept valid transaction: %v", err)
return
}
}

testExpectedAncestorFee := func(tx *dcrutil.Tx, expectedFee int64) {
txHash := tx.Hash()
miningView := txPool.MiningView()
ancestorStats, exists := miningView.AncestorStats(txHash)
if !exists {
t.Fatalf("expected ancestor stats for transaction %v", txHash)
return
}
if ancestorStats.Fees != expectedFee {
t.Fatalf("unexpected ancestor fees for transaction %v -- "+
"got %v, want %v", txHash, ancestorStats.Fees, expectedFee)
return
}
}

testExpectedAncestorFee(txC, 2000)

// Remove leading transactions from mempool.
txPool.RemoveTransaction(txA, false)
txPool.RemoveTransaction(txB, false)

testExpectedAncestorFee(txC, 0)

// Accept transactions provided in block order.
err = txPool.MaybeAcceptTransactions([]*dcrutil.Tx{txA, txB}, true)
if err != nil {
t.Fatalf("failed to accept valid transaction: %v", err)
return
}

testExpectedAncestorFee(txC, 2000)
}
33 changes: 4 additions & 29 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2473,22 +2473,6 @@ func headerApprovesParent(header *wire.BlockHeader) bool {
return dcrutil.IsFlagSet16(header.VoteBits, dcrutil.BlockValid)
}

// isDoubleSpendOrDuplicateError returns whether or not the passed error, which
// is expected to have come from mempool, indicates a transaction was rejected
// either due to containing a double spend or already existing in the pool.
func isDoubleSpendOrDuplicateError(err error) bool {
switch {
case errors.Is(err, mempool.ErrDuplicate):
return true
case errors.Is(err, mempool.ErrAlreadyExists):
return true
case errors.Is(err, blockchain.ErrMissingTxOut):
return true
}

return false
}

// proactivelyEvictSigCacheEntries fetches the block that is
// txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to
// SigCache to evict the entries associated with the transactions in that block.
Expand Down Expand Up @@ -2724,14 +2708,9 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// case, anything that happens to be in the pool which depends on the
// transaction is still valid.
if !headerApprovesParent(&block.MsgBlock().Header) {
for _, tx := range parentBlock.Transactions()[1:] {
_, err := txMemPool.MaybeAcceptTransaction(tx, false, true)
if err != nil && !isDoubleSpendOrDuplicateError(err) {
txMemPool.RemoveTransaction(tx, true)
}
}
txns := parentBlock.Transactions()[1:]
txMemPool.MaybeAcceptTransactions(txns, true)
}

if r := s.rpcServer; r != nil {
// Filter and update the rebroadcast inventory.
s.PruneRebroadcastInventory()
Expand Down Expand Up @@ -2777,6 +2756,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat

// A block has been disconnected from the main block chain.
case blockchain.NTBlockDisconnected:
// NOTE: The chain lock is released for this notification.
ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData)
if !ok {
syncLog.Warnf("Block disconnected notification is not " +
Expand Down Expand Up @@ -2829,12 +2809,7 @@ func (s *server) handleBlockchainNotification(notification *blockchain.Notificat
// those outputs, and, in that case, anything that happens to be in the
// pool which depends on the transaction is still valid.
handleDisconnectedBlockTxns := func(txns []*dcrutil.Tx) {
for _, tx := range txns {
_, err := txMemPool.MaybeAcceptTransaction(tx, false, true)
if err != nil && !isDoubleSpendOrDuplicateError(err) {
txMemPool.RemoveTransaction(tx, true)
}
}
txMemPool.MaybeAcceptTransactions(txns, true)
}
handleDisconnectedBlockTxns(block.Transactions()[1:])

Expand Down