Skip to content

Commit

Permalink
client/asset/{btc,dcr}: Decred Tx History
Browse files Browse the repository at this point in the history
This diff implements TxHistory for the DCR native wallet. The BadgerTxDB
used for BTC is exported and reused in DCR.
  • Loading branch information
martonp committed Jan 4, 2024
1 parent 5702c9a commit 634a71e
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 152 deletions.
65 changes: 35 additions & 30 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ type baseWallet struct {
bondReserves atomic.Uint64

pendingTxsMtx sync.RWMutex
pendingTxs map[chainhash.Hash]*extendedWalletTx
pendingTxs map[chainhash.Hash]*ExtendedWalletTx

// receiveTxLastQuery stores the last block height at which the wallet
// was queried for recieve transactions. This is also stored in the
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func newUnconnectedWallet(cfg *BTCCloneCFG, walletCfg *WalletConfig) (*baseWalle
calcTxSize: txSizeCalculator,
txVersion: txVersion,
Network: cfg.Network,
pendingTxs: make(map[chainhash.Hash]*extendedWalletTx),
pendingTxs: make(map[chainhash.Hash]*ExtendedWalletTx),
txHistoryDBPath: filepath.Join(walletDir, "txhistory.db"),
ar: addressRecyler,
}
Expand Down Expand Up @@ -1249,7 +1249,7 @@ func OpenSPVWallet(cfg *BTCCloneCFG, walletConstructor BTCWalletConstructor) (*E
return nil, err
}

txHistoryDB, err := newBadgerTxDB(btc.txHistoryDBPath, btc.log.SubLogger("TXHISTORYDB"))
txHistoryDB, err := NewBadgerTxDB(btc.txHistoryDBPath, btc.log.SubLogger("TXHISTORYDB"))
if err != nil {
return nil, fmt.Errorf("failed to create tx history db: %v", err)
}
Expand Down Expand Up @@ -1379,7 +1379,7 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {
atomic.StoreInt64(&btc.tipAtConnect, btc.currentTip.Height)

if txHistoryDB := btc.txDB(); txHistoryDB != nil {
pendingTxs, err := txHistoryDB.getPendingTxs()
pendingTxs, err := txHistoryDB.GetPendingTxs()
if err != nil {
return nil, fmt.Errorf("failed to load unconfirmed txs: %v", err)
}
Expand All @@ -1392,8 +1392,8 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {
}
btc.pendingTxsMtx.Unlock()

lastQuery, err := txHistoryDB.getLastReceiveTxQuery()
if errors.Is(err, errNeverQueried) {
lastQuery, err := txHistoryDB.GetLastReceiveTxQuery()
if errors.Is(err, ErrNeverQueried) {
lastQuery = 0
} else if err != nil {
return nil, fmt.Errorf("failed to load last query time: %v", err)
Expand All @@ -1404,8 +1404,7 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {
wg.Add(1)
go func() {
defer wg.Done()
txHistoryDB.run(ctx)
txHistoryDB.close()
txHistoryDB.Run(ctx)
}()
}

Expand Down Expand Up @@ -3426,13 +3425,11 @@ func (btc *baseWallet) markTxAsSubmitted(id dex.Bytes) {
btc.pendingTxsMtx.Lock()
wt, found := btc.pendingTxs[txHash]
if found {
wt.mtx.Lock()
wt.Submitted = true
wt.mtx.Unlock()
}
btc.pendingTxsMtx.Unlock()

err := txHistoryDB.markTxAsSubmitted(id)
err := txHistoryDB.MarkTxAsSubmitted(id)
if err != nil {
btc.log.Errorf("failed to mark tx as submitted in tx history db: %v", err)
}
Expand All @@ -3444,7 +3441,13 @@ func (btc *baseWallet) removeTxFromHistory(id dex.Bytes) {
return
}

err := txHistoryDB.removeTx(id)
var txHash chainhash.Hash
copy(txHash[:], id)
btc.pendingTxsMtx.Lock()
delete(btc.pendingTxs, txHash)
btc.pendingTxsMtx.Unlock()

err := txHistoryDB.RemoveTx(id)
if err != nil {
btc.log.Errorf("failed to remove tx from tx history db: %v", err)
}
Expand All @@ -3456,7 +3459,7 @@ func (btc *baseWallet) addTxToHistory(txType asset.TransactionType, id dex.Bytes
return
}

wt := &extendedWalletTx{
wt := &ExtendedWalletTx{
WalletTransaction: &asset.WalletTransaction{
Type: txType,
ID: id,
Expand All @@ -3473,7 +3476,7 @@ func (btc *baseWallet) addTxToHistory(txType asset.TransactionType, id dex.Bytes
btc.pendingTxs[txHash] = wt
btc.pendingTxsMtx.Unlock()

err := txHistoryDB.storeTx(wt)
err := txHistoryDB.StoreTx(wt)
if err != nil {
btc.log.Errorf("failed to store tx in tx history db: %v", err)
}
Expand Down Expand Up @@ -5166,7 +5169,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
recentTxs = nil
} else {
btc.receiveTxLastQuery.Store(tip)
err = txHistoryDB.setLastReceiveTxQuery(tip)
err = txHistoryDB.SetLastReceiveTxQuery(tip)
if err != nil {
btc.log.Errorf("Error setting last query to %d: %v", tip, err)
}
Expand All @@ -5180,7 +5183,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
continue
}
txID := dex.Bytes(txHash[:])
_, err = txHistoryDB.getTx(txID)
_, err = txHistoryDB.GetTx(txID)
if err == nil {
continue
}
Expand All @@ -5199,7 +5202,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
fee = toSatoshi(*tx.Fee)
}
}
wt := &extendedWalletTx{
wt := &ExtendedWalletTx{
WalletTransaction: &asset.WalletTransaction{
Type: asset.Receive,
ID: txID,
Expand All @@ -5209,7 +5212,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
Submitted: true,
}

err = txHistoryDB.storeTx(wt)
err = txHistoryDB.StoreTx(wt)
if err != nil {
btc.log.Errorf("Error storing tx %s: %v", tx.TxID, err)
}
Expand All @@ -5222,23 +5225,22 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
}
}

pendingTxsCopy := make(map[chainhash.Hash]*extendedWalletTx, len(btc.pendingTxs))
// Not just the map must be copied here, but the ExtendedWalletTx
// as well.
pendingTxsCopy := make(map[chainhash.Hash]ExtendedWalletTx, len(btc.pendingTxs))
btc.pendingTxsMtx.RLock()
for hash, tx := range btc.pendingTxs {
pendingTxsCopy[hash] = tx
pendingTxsCopy[hash] = *tx
}
btc.pendingTxsMtx.RUnlock()

handlePendingTx := func(hash chainhash.Hash, tx *extendedWalletTx) {
tx.mtx.Lock()
defer tx.mtx.Unlock()

handlePendingTx := func(hash chainhash.Hash, tx *ExtendedWalletTx) {
if !tx.Submitted {
return
}
gtr, err := btc.node.getWalletTransaction(&hash)
if errors.Is(err, asset.CoinNotFoundError) {
err = txHistoryDB.removeTx(hash[:])
err = txHistoryDB.RemoveTx(hash[:])
if err == nil {
btc.pendingTxsMtx.Lock()
delete(btc.pendingTxs, hash)
Expand Down Expand Up @@ -5286,21 +5288,24 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
}

if updated {
err = txHistoryDB.storeTx(tx)
err = txHistoryDB.StoreTx(tx)
if err != nil {
btc.log.Errorf("Error updating tx %s: %v", hash, err)
return
}

btc.pendingTxsMtx.Lock()
if tx.Confirmed {
btc.pendingTxsMtx.Lock()
delete(btc.pendingTxs, hash)
btc.pendingTxsMtx.Unlock()
} else {
btc.pendingTxs[hash] = tx
}
btc.pendingTxsMtx.Unlock()
}
}

for hash, tx := range pendingTxsCopy {
handlePendingTx(hash, tx)
handlePendingTx(hash, &tx)
}
}

Expand All @@ -5315,7 +5320,7 @@ func (btc *ExchangeWalletSPV) TxHistory(n int, refID *dex.Bytes, past bool) ([]*
return nil, fmt.Errorf("tx database not initialized")
}

return txHistoryDB.getTxs(n, refID, past)
return txHistoryDB.GetTxs(n, refID, past)
}

// lockedSats is the total value of locked outputs, as locked with LockUnspent.
Expand Down
Loading

0 comments on commit 634a71e

Please sign in to comment.