From 2acb714cbdf2b441bf1da339f2837a6b863c17c3 Mon Sep 17 00:00:00 2001 From: martonp Date: Fri, 6 Oct 2023 03:55:12 -0400 Subject: [PATCH] Joe review updates. --- client/asset/btc/btc.go | 71 ++++++++++++++++++++++++++-------------- client/asset/btc/txdb.go | 8 +++-- 2 files changed, 52 insertions(+), 27 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index b6e6dc9228..9d532276eb 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -925,10 +925,11 @@ type baseWallet struct { pendingTxsMtx sync.RWMutex pendingTxs map[chainhash.Hash]*extendedWalletTx + // receiveTxLastQuery stores the last block height at which the wallet // was queried for recieve transactions. This is also stored in the // txHistoryDB. - receiveTxLastQuery uint64 + receiveTxLastQuery atomic.Uint64 txHistoryDB atomic.Value // txDB txHistoryDBPath string @@ -1497,21 +1498,24 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) { if btc.cloneParams.SupportTxHistory { txHistoryDB, err := newBadgerTxDB(btc.txHistoryDBPath, btc.log.SubLogger("TXHISTORYDB")) - btc.txHistoryDB.Store(txHistoryDB) if err != nil { return nil, fmt.Errorf("failed to create tx history db: %v", err) } + btc.txHistoryDB.Store(txHistoryDB) pendingTxs, err := txHistoryDB.getPendingTxs() if err != nil { return nil, fmt.Errorf("failed to load unconfirmed txs: %v", err) } + + btc.pendingTxsMtx.Lock() btc.pendingTxs = make(map[chainhash.Hash]*extendedWalletTx) for _, tx := range pendingTxs { var txHash chainhash.Hash copy(txHash[:], tx.ID) btc.pendingTxs[txHash] = tx } + btc.pendingTxsMtx.Unlock() lastQuery, err := txHistoryDB.getLastReceiveTxQuery() if errors.Is(err, errNeverQueried) { @@ -1519,7 +1523,8 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) { } else if err != nil { return nil, fmt.Errorf("failed to load last query time: %v", err) } - btc.receiveTxLastQuery = lastQuery + + btc.receiveTxLastQuery.Store(lastQuery) wg.Add(1) go func() { @@ -3993,9 +3998,9 @@ func (btc *baseWallet) markTxAsSubmitted(id dex.Bytes) { btc.pendingTxsMtx.Lock() wt, found := btc.pendingTxs[hash] if found { - copyWt := *wt - copyWt.Submitted = true - btc.pendingTxs[hash] = ©Wt + wt.mtx.Lock() + wt.Submitted = true + wt.mtx.Unlock() } btc.pendingTxsMtx.Unlock() @@ -6173,17 +6178,19 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { { const blockQueryBuffer = 3 var blockToQuery uint64 - if btc.receiveTxLastQuery != 0 && btc.receiveTxLastQuery < tip-blockQueryBuffer { - blockToQuery = btc.receiveTxLastQuery - blockQueryBuffer + lastQuery := btc.receiveTxLastQuery.Load() + + if lastQuery != 0 && lastQuery < tip-blockQueryBuffer { + blockToQuery = lastQuery - blockQueryBuffer } else { blockToQuery = tip - blockQueryBuffer } recentTxs, err := btc.node.listTransactionsSinceBlock(int32(blockToQuery)) if err != nil { - btc.log.Errorf("Error listing transactions since block %d: %v", tip-6, err) + btc.log.Errorf("Error listing transactions since block %d: %v", blockToQuery, err) recentTxs = nil } else { - btc.receiveTxLastQuery = tip + btc.receiveTxLastQuery.Store(tip) err = txHistoryDB.setLastReceiveTxQuery(tip) if err != nil { btc.log.Errorf("Error setting last query to %d: %v", tip, err) @@ -6209,7 +6216,13 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { var fee uint64 if tx.Fee != nil { - fee = toSatoshi(-*tx.Fee) + // Fee always seems to be negative in btcwallet, but just + // in case. + if *tx.Fee < 0 { + fee = toSatoshi(-*tx.Fee) + } else { + fee = toSatoshi(*tx.Fee) + } } wt := &extendedWalletTx{ WalletTransaction: &asset.WalletTransaction{ @@ -6241,9 +6254,12 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { } btc.pendingTxsMtx.RUnlock() - for hash, tx := range pendingTxsCopy { + handlePendingTx := func(hash chainhash.Hash, tx *extendedWalletTx) { + tx.mtx.Lock() + defer tx.mtx.Unlock() + if !tx.Submitted { - continue + return } gtr, err := btc.node.getWalletTransaction(&hash) if errors.Is(err, asset.CoinNotFoundError) { @@ -6257,11 +6273,11 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { // again next time. btc.log.Errorf("Error removing tx %s from the history store: %v", hash, err) } - continue + return } if err != nil { btc.log.Errorf("Error getting transaction %s: %v", hash, err) - continue + return } var updated bool @@ -6269,12 +6285,12 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { blockHash, err := chainhash.NewHashFromStr(gtr.BlockHash) if err != nil { btc.log.Errorf("Error decoding block hash %s: %v", gtr.BlockHash, err) - continue + return } blockHeight, err := btc.tipRedeemer.getBlockHeight(blockHash) if err != nil { btc.log.Errorf("Error getting block height for %s: %v", blockHash, err) - continue + return } if tx.BlockNumber != uint64(blockHeight) { tx.BlockNumber = uint64(blockHeight) @@ -6286,7 +6302,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { } var confs uint64 - if tx.BlockNumber > 0 && tip > tx.BlockNumber { + if tx.BlockNumber > 0 && tip >= tx.BlockNumber { confs = tip - tx.BlockNumber + 1 } if confs >= defaultRedeemConfTarget { @@ -6298,18 +6314,23 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) { err = txHistoryDB.storeTx(tx) if err != nil { btc.log.Errorf("Error updating tx %s: %v", hash, err) - continue + return + } + if tx.Confirmed { + btc.pendingTxsMtx.Lock() + delete(btc.pendingTxs, hash) + btc.pendingTxsMtx.Unlock() } - btc.pendingTxsMtx.Lock() - delete(btc.pendingTxs, hash) - btc.pendingTxsMtx.Unlock() } } + + for hash, tx := range pendingTxsCopy { + handlePendingTx(hash, tx) + } } -// TxHistory returns all the transactions the wallet has made. This -// includes the ETH wallet and all token wallets. If refID is nil, then -// transactions starting from the most recent are returned (past is ignored). +// TxHistory returns all the transactions the wallet has made. If refID is nil, +// then transactions starting from the most recent are returned (past is ignored). // If past is true, the transactions prior to the refID are returned, otherwise // the transactions after the refID are returned. n is the number of // transactions to return. If n is <= 0, all the transactions will be returned. diff --git a/client/asset/btc/txdb.go b/client/asset/btc/txdb.go index 3d0f01f479..92178e2be0 100644 --- a/client/asset/btc/txdb.go +++ b/client/asset/btc/txdb.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math" + "sync" "time" "decred.org/dcrdex/client/asset" @@ -18,6 +19,7 @@ import ( ) type extendedWalletTx struct { + mtx sync.Mutex *asset.WalletTransaction Confirmed bool `json:"confirmed"` // Create bond transactions are added to the store before @@ -25,6 +27,8 @@ type extendedWalletTx struct { Submitted bool `json:"submitted"` } +// "b" and "c" must be the first two prefixes. +// getPendingTxs relies on this. var blockPrefix = []byte("b") var pendingPrefix = []byte("c") var lastQueryKey = []byte("lq") @@ -225,8 +229,8 @@ func (db *badgerTxDB) markTxAsSubmitted(txID dex.Bytes) error { return err } - var wt extendedWalletTx - if err := json.Unmarshal(wtB, &wt); err != nil { + var wt *extendedWalletTx + if err := json.Unmarshal(wtB, wt); err != nil { return err }