Skip to content

Commit

Permalink
Joe review updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Oct 6, 2023
1 parent e85cae4 commit 2acb714
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
71 changes: 46 additions & 25 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,10 +925,11 @@ type baseWallet struct {

pendingTxsMtx sync.RWMutex
pendingTxs map[chainhash.Hash]*extendedWalletTx

// receiveTxLastQuery stores the last block height at which the wallet
// was queried for recieve transactions. This is also stored in the
// txHistoryDB.
receiveTxLastQuery uint64
receiveTxLastQuery atomic.Uint64

txHistoryDB atomic.Value // txDB
txHistoryDBPath string
Expand Down Expand Up @@ -1497,29 +1498,33 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {

if btc.cloneParams.SupportTxHistory {
txHistoryDB, err := newBadgerTxDB(btc.txHistoryDBPath, btc.log.SubLogger("TXHISTORYDB"))
btc.txHistoryDB.Store(txHistoryDB)
if err != nil {
return nil, fmt.Errorf("failed to create tx history db: %v", err)
}
btc.txHistoryDB.Store(txHistoryDB)

pendingTxs, err := txHistoryDB.getPendingTxs()
if err != nil {
return nil, fmt.Errorf("failed to load unconfirmed txs: %v", err)
}

btc.pendingTxsMtx.Lock()
btc.pendingTxs = make(map[chainhash.Hash]*extendedWalletTx)
for _, tx := range pendingTxs {
var txHash chainhash.Hash
copy(txHash[:], tx.ID)
btc.pendingTxs[txHash] = tx
}
btc.pendingTxsMtx.Unlock()

lastQuery, err := txHistoryDB.getLastReceiveTxQuery()
if errors.Is(err, errNeverQueried) {
lastQuery = 0
} else if err != nil {
return nil, fmt.Errorf("failed to load last query time: %v", err)
}
btc.receiveTxLastQuery = lastQuery

btc.receiveTxLastQuery.Store(lastQuery)

wg.Add(1)
go func() {
Expand Down Expand Up @@ -3993,9 +3998,9 @@ func (btc *baseWallet) markTxAsSubmitted(id dex.Bytes) {
btc.pendingTxsMtx.Lock()
wt, found := btc.pendingTxs[hash]
if found {
copyWt := *wt
copyWt.Submitted = true
btc.pendingTxs[hash] = &copyWt
wt.mtx.Lock()
wt.Submitted = true
wt.mtx.Unlock()
}
btc.pendingTxsMtx.Unlock()

Expand Down Expand Up @@ -6173,17 +6178,19 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
{
const blockQueryBuffer = 3
var blockToQuery uint64
if btc.receiveTxLastQuery != 0 && btc.receiveTxLastQuery < tip-blockQueryBuffer {
blockToQuery = btc.receiveTxLastQuery - blockQueryBuffer
lastQuery := btc.receiveTxLastQuery.Load()

if lastQuery != 0 && lastQuery < tip-blockQueryBuffer {
blockToQuery = lastQuery - blockQueryBuffer
} else {
blockToQuery = tip - blockQueryBuffer
}
recentTxs, err := btc.node.listTransactionsSinceBlock(int32(blockToQuery))
if err != nil {
btc.log.Errorf("Error listing transactions since block %d: %v", tip-6, err)
btc.log.Errorf("Error listing transactions since block %d: %v", blockToQuery, err)
recentTxs = nil
} else {
btc.receiveTxLastQuery = tip
btc.receiveTxLastQuery.Store(tip)
err = txHistoryDB.setLastReceiveTxQuery(tip)
if err != nil {
btc.log.Errorf("Error setting last query to %d: %v", tip, err)
Expand All @@ -6209,7 +6216,13 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {

var fee uint64
if tx.Fee != nil {
fee = toSatoshi(-*tx.Fee)
// Fee always seems to be negative in btcwallet, but just
// in case.
if *tx.Fee < 0 {
fee = toSatoshi(-*tx.Fee)
} else {
fee = toSatoshi(*tx.Fee)
}
}
wt := &extendedWalletTx{
WalletTransaction: &asset.WalletTransaction{
Expand Down Expand Up @@ -6241,9 +6254,12 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
}
btc.pendingTxsMtx.RUnlock()

for hash, tx := range pendingTxsCopy {
handlePendingTx := func(hash chainhash.Hash, tx *extendedWalletTx) {
tx.mtx.Lock()
defer tx.mtx.Unlock()

if !tx.Submitted {
continue
return
}
gtr, err := btc.node.getWalletTransaction(&hash)
if errors.Is(err, asset.CoinNotFoundError) {
Expand All @@ -6257,24 +6273,24 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
// again next time.
btc.log.Errorf("Error removing tx %s from the history store: %v", hash, err)
}
continue
return
}
if err != nil {
btc.log.Errorf("Error getting transaction %s: %v", hash, err)
continue
return
}

var updated bool
if gtr.BlockHash != "" {
blockHash, err := chainhash.NewHashFromStr(gtr.BlockHash)
if err != nil {
btc.log.Errorf("Error decoding block hash %s: %v", gtr.BlockHash, err)
continue
return
}
blockHeight, err := btc.tipRedeemer.getBlockHeight(blockHash)
if err != nil {
btc.log.Errorf("Error getting block height for %s: %v", blockHash, err)
continue
return
}
if tx.BlockNumber != uint64(blockHeight) {
tx.BlockNumber = uint64(blockHeight)
Expand All @@ -6286,7 +6302,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
}

var confs uint64
if tx.BlockNumber > 0 && tip > tx.BlockNumber {
if tx.BlockNumber > 0 && tip >= tx.BlockNumber {
confs = tip - tx.BlockNumber + 1
}
if confs >= defaultRedeemConfTarget {
Expand All @@ -6298,18 +6314,23 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
err = txHistoryDB.storeTx(tx)
if err != nil {
btc.log.Errorf("Error updating tx %s: %v", hash, err)
continue
return
}
if tx.Confirmed {
btc.pendingTxsMtx.Lock()
delete(btc.pendingTxs, hash)
btc.pendingTxsMtx.Unlock()
}
btc.pendingTxsMtx.Lock()
delete(btc.pendingTxs, hash)
btc.pendingTxsMtx.Unlock()
}
}

for hash, tx := range pendingTxsCopy {
handlePendingTx(hash, tx)
}
}

// TxHistory returns all the transactions the wallet has made. This
// includes the ETH wallet and all token wallets. If refID is nil, then
// transactions starting from the most recent are returned (past is ignored).
// TxHistory returns all the transactions the wallet has made. If refID is nil,
// then transactions starting from the most recent are returned (past is ignored).
// If past is true, the transactions prior to the refID are returned, otherwise
// the transactions after the refID are returned. n is the number of
// transactions to return. If n is <= 0, all the transactions will be returned.
Expand Down
8 changes: 6 additions & 2 deletions client/asset/btc/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"sync"
"time"

"decred.org/dcrdex/client/asset"
Expand All @@ -18,13 +19,16 @@ import (
)

type extendedWalletTx struct {
mtx sync.Mutex
*asset.WalletTransaction
Confirmed bool `json:"confirmed"`
// Create bond transactions are added to the store before
// they are submitted.
Submitted bool `json:"submitted"`
}

// "b" and "c" must be the first two prefixes.
// getPendingTxs relies on this.
var blockPrefix = []byte("b")
var pendingPrefix = []byte("c")
var lastQueryKey = []byte("lq")
Expand Down Expand Up @@ -225,8 +229,8 @@ func (db *badgerTxDB) markTxAsSubmitted(txID dex.Bytes) error {
return err
}

var wt extendedWalletTx
if err := json.Unmarshal(wtB, &wt); err != nil {
var wt *extendedWalletTx
if err := json.Unmarshal(wtB, wt); err != nil {
return err
}

Expand Down

0 comments on commit 2acb714

Please sign in to comment.