Skip to content

Commit

Permalink
feat(electrum)!: introduce TxCache
Browse files Browse the repository at this point in the history
We maintain a cache of full transactions so we avoid re-fetching from
Electrum if not needed. In addition, the `ElectrumUpdate` struct has the
anchor type as a generic which can be mapped to update any receiving
`TxGraph`.
  • Loading branch information
evanlinjin committed May 1, 2024
1 parent d7272cd commit c3295e3
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 140 deletions.
2 changes: 1 addition & 1 deletion crates/electrum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ electrum-client = { version = "0.19" }
[dev-dependencies]
bdk_testenv = { path = "../testenv", default-features = false }
electrsd = { version= "0.27.1", features = ["bitcoind_25_0", "esplora_a33e97e1", "legacy"] }
anyhow = "1"
anyhow = "1"
197 changes: 116 additions & 81 deletions crates/electrum/src/electrum_ext.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,86 @@
use bdk_chain::{
bitcoin::{OutPoint, ScriptBuf, Txid},
collections::{HashMap, HashSet},
bitcoin::{OutPoint, ScriptBuf, Transaction, Txid},
collections::{BTreeMap, HashMap, HashSet},
local_chain::CheckPoint,
tx_graph::TxGraph,
Anchor, BlockId, ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor,
BlockId, ConfirmationHeightAnchor, ConfirmationTimeHeightAnchor,
};
use core::{fmt::Debug, str::FromStr};
use electrum_client::{ElectrumApi, Error, HeaderNotification};
use std::{collections::BTreeMap, fmt::Debug, str::FromStr};
use std::sync::Arc;

/// We include a chain suffix of a certain length for the purpose of robustness.
const CHAIN_SUFFIX_LENGTH: u32 = 8;

/// Type that maintains a cache of [`Arc`]-wrapped transactions.
pub type TxCache = HashMap<Txid, Arc<Transaction>>;

/// Combination of chain and transactions updates from electrum
///
/// We have to update the chain and the txids at the same time since we anchor the txids to
/// the same chain tip that we check before and after we gather the txids.
#[derive(Debug)]
pub struct ElectrumUpdate {
pub struct ElectrumUpdate<A = ConfirmationHeightAnchor> {
/// Chain update
pub chain_update: CheckPoint,
/// Tracks electrum updates in TxGraph
pub graph_update: TxGraph<ConfirmationTimeHeightAnchor>,
pub graph_update: TxGraph<A>,
}

impl<A: Clone + Ord> ElectrumUpdate<A> {
/// Transform the [`ElectrumUpdate`] to have [`bdk_chain::Anchor`]s of another type.
///
/// Refer to [`TxGraph::map_anchors`].
pub fn map_anchors<A2: Clone + Ord, F>(self, f: F) -> ElectrumUpdate<A2>
where
F: FnMut(A) -> A2,
{
ElectrumUpdate {
chain_update: self.chain_update,
graph_update: self.graph_update.map_anchors(f),
}
}
}

impl ElectrumUpdate {
/// Transforms the [`TxGraph`]'s [`bdk_chain::Anchor`] type to [`ConfirmationTimeHeightAnchor`].
pub fn into_confirmation_time_update(
self,
client: &impl ElectrumApi,
) -> Result<ElectrumUpdate<ConfirmationTimeHeightAnchor>, Error> {
let relevant_heights = self
.graph_update
.all_anchors()
.iter()
.map(|(a, _)| a.confirmation_height)
.collect::<HashSet<_>>();

let height_to_time = relevant_heights
.clone()
.into_iter()
.zip(
client
.batch_block_header(relevant_heights)?
.into_iter()
.map(|bh| bh.time as u64),
)
.collect::<HashMap<u32, u64>>();

let chain_update = self.chain_update;
let graph_update =
self.graph_update
.clone()
.map_anchors(|a| ConfirmationTimeHeightAnchor {
anchor_block: a.anchor_block,
confirmation_height: a.confirmation_height,
confirmation_time: height_to_time[&a.confirmation_height],
});

Ok(ElectrumUpdate {
chain_update,
graph_update,
})
}
}

/// Trait to extend [`electrum_client::Client`] functionality.
Expand All @@ -35,11 +95,11 @@ pub trait ElectrumExt {
/// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `batch_size` specifies the max number of script pubkeys to request for in a
/// single batch request.
fn full_scan<K: Ord + Clone, A: Anchor>(
fn full_scan<K: Ord + Clone>(
&self,
tx_cache: &mut TxCache,
prev_tip: CheckPoint,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
full_txs: Option<&TxGraph<A>>,
stop_gap: usize,
batch_size: usize,
) -> Result<(ElectrumUpdate, BTreeMap<K, u32>), Error>;
Expand All @@ -61,42 +121,42 @@ pub trait ElectrumExt {
/// may include scripts that have been used, use [`full_scan`] with the keychain.
///
/// [`full_scan`]: ElectrumExt::full_scan
fn sync<A: Anchor>(
fn sync(
&self,
tx_cache: &mut TxCache,
prev_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
full_txs: Option<&TxGraph<A>>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
batch_size: usize,
) -> Result<ElectrumUpdate, Error>;
}

impl<E: ElectrumApi> ElectrumExt for E {
fn full_scan<K: Ord + Clone, A: Anchor>(
fn full_scan<K: Ord + Clone>(
&self,
tx_cache: &mut TxCache,
prev_tip: CheckPoint,
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, ScriptBuf)>>,
full_txs: Option<&TxGraph<A>>,
stop_gap: usize,
batch_size: usize,
) -> Result<(ElectrumUpdate, BTreeMap<K, u32>), Error> {
let mut request_spks = keychain_spks
.into_iter()
.map(|(k, s)| (k, s.into_iter()))
.collect::<BTreeMap<K, _>>();

// We keep track of already-scanned spks just in case a reorg happens and we need to do a
// rescan. We need to keep track of this as iterators in `keychain_spks` are "unbounded" so
// cannot be collected. In addition, we keep track of whether an spk has an active tx
// history for determining the `last_active_index`.
// * key: (keychain, spk_index) that identifies the spk.
// * val: (script_pubkey, has_tx_history).
let mut scanned_spks = BTreeMap::<(K, u32), (ScriptBuf, bool)>::new();

let (electrum_update, keychain_update) = loop {
let (tip, _) = construct_update_tip(self, prev_tip.clone())?;
let mut tx_graph = TxGraph::<ConfirmationHeightAnchor>::default();
if let Some(txs) = full_txs {
let _ =
tx_graph.apply_update(txs.clone().map_anchors(|a| ConfirmationHeightAnchor {
anchor_block: a.anchor_block(),
confirmation_height: a.confirmation_height_upper_bound(),
}));
}
let mut graph_update = TxGraph::<ConfirmationHeightAnchor>::default();
let cps = tip
.iter()
.take(10)
Expand All @@ -108,7 +168,8 @@ impl<E: ElectrumApi> ElectrumExt for E {
scanned_spks.append(&mut populate_with_spks(
self,
&cps,
&mut tx_graph,
tx_cache,
&mut graph_update,
&mut scanned_spks
.iter()
.map(|(i, (spk, _))| (i.clone(), spk.clone())),
Expand All @@ -121,7 +182,8 @@ impl<E: ElectrumApi> ElectrumExt for E {
populate_with_spks(
self,
&cps,
&mut tx_graph,
tx_cache,
&mut graph_update,
keychain_spks,
stop_gap,
batch_size,
Expand All @@ -140,8 +202,6 @@ impl<E: ElectrumApi> ElectrumExt for E {

let chain_update = tip;

let graph_update = into_confirmation_time_tx_graph(self, &tx_graph)?;

let keychain_update = request_spks
.into_keys()
.filter_map(|k| {
Expand All @@ -165,11 +225,11 @@ impl<E: ElectrumApi> ElectrumExt for E {
Ok((electrum_update, keychain_update))
}

fn sync<A: Anchor>(
fn sync(
&self,
tx_cache: &mut TxCache,
prev_tip: CheckPoint,
misc_spks: impl IntoIterator<Item = ScriptBuf>,
full_txs: Option<&TxGraph<A>>,
txids: impl IntoIterator<Item = Txid>,
outpoints: impl IntoIterator<Item = OutPoint>,
batch_size: usize,
Expand All @@ -179,10 +239,10 @@ impl<E: ElectrumApi> ElectrumExt for E {
.enumerate()
.map(|(i, spk)| (i as u32, spk));

let (mut electrum_update, _) = self.full_scan(
let (electrum_update, _) = self.full_scan(
tx_cache,
prev_tip.clone(),
[((), spk_iter)].into(),
full_txs,
usize::MAX,
batch_size,
)?;
Expand All @@ -195,11 +255,8 @@ impl<E: ElectrumApi> ElectrumExt for E {
.collect::<BTreeMap<u32, CheckPoint>>();

let mut tx_graph = TxGraph::<ConfirmationHeightAnchor>::default();
populate_with_txids(self, &cps, &mut tx_graph, txids)?;
populate_with_txids(self, &cps, tx_cache, &mut tx_graph, txids)?;
populate_with_outpoints(self, &cps, &mut tx_graph, outpoints)?;
let _ = electrum_update
.graph_update
.apply_update(into_confirmation_time_tx_graph(self, &tx_graph)?);

Ok(electrum_update)
}
Expand Down Expand Up @@ -383,11 +440,12 @@ fn populate_with_outpoints(
fn populate_with_txids(
client: &impl ElectrumApi,
cps: &BTreeMap<u32, CheckPoint>,
tx_graph: &mut TxGraph<ConfirmationHeightAnchor>,
tx_cache: &mut TxCache,
graph_update: &mut TxGraph<ConfirmationHeightAnchor>,
txids: impl IntoIterator<Item = Txid>,
) -> Result<(), Error> {
for txid in txids {
let tx = match client.transaction_get(&txid) {
let tx = match fetch_tx(client, tx_cache, txid) {
Ok(tx) => tx,
Err(electrum_client::Error::Protocol(_)) => continue,
Err(other_err) => return Err(other_err),
Expand All @@ -408,20 +466,36 @@ fn populate_with_txids(
None => continue,
};

if tx_graph.get_tx(txid).is_none() {
let _ = tx_graph.insert_tx(tx);
if graph_update.get_tx(txid).is_none() {
// TODO: We need to be able to insert an `Arc` of a transaction.
let _ = graph_update.insert_tx(tx);
}
if let Some(anchor) = anchor {
let _ = tx_graph.insert_anchor(txid, anchor);
let _ = graph_update.insert_anchor(txid, anchor);
}
}
Ok(())
}

fn fetch_tx<C: ElectrumApi>(
client: &C,
tx_cache: &mut TxCache,
txid: Txid,
) -> Result<Arc<Transaction>, Error> {
use bdk_chain::collections::hash_map::Entry;
Ok(match tx_cache.entry(txid) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => entry
.insert(Arc::new(client.transaction_get(&txid)?))
.clone(),
})
}

fn populate_with_spks<I: Ord + Clone>(
client: &impl ElectrumApi,
cps: &BTreeMap<u32, CheckPoint>,
tx_graph: &mut TxGraph<ConfirmationHeightAnchor>,
tx_cache: &mut TxCache,
graph_update: &mut TxGraph<ConfirmationHeightAnchor>,
spks: &mut impl Iterator<Item = (I, ScriptBuf)>,
stop_gap: usize,
batch_size: usize,
Expand Down Expand Up @@ -453,51 +527,12 @@ fn populate_with_spks<I: Ord + Clone>(
unused_spk_count = 0;
}

for tx in spk_history {
let mut update = TxGraph::<ConfirmationHeightAnchor>::default();

if tx_graph.get_tx(tx.tx_hash).is_none() {
let full_tx = client.transaction_get(&tx.tx_hash)?;
update = TxGraph::<ConfirmationHeightAnchor>::new([full_tx]);
for tx_res in spk_history {
let _ = graph_update.insert_tx(fetch_tx(client, tx_cache, tx_res.tx_hash)?);
if let Some(anchor) = determine_tx_anchor(cps, tx_res.height, tx_res.tx_hash) {
let _ = graph_update.insert_anchor(tx_res.tx_hash, anchor);
}

if let Some(anchor) = determine_tx_anchor(cps, tx.height, tx.tx_hash) {
let _ = update.insert_anchor(tx.tx_hash, anchor);
}

let _ = tx_graph.apply_update(update);
}
}
}
}

fn into_confirmation_time_tx_graph(
client: &impl ElectrumApi,
tx_graph: &TxGraph<ConfirmationHeightAnchor>,
) -> Result<TxGraph<ConfirmationTimeHeightAnchor>, Error> {
let relevant_heights = tx_graph
.all_anchors()
.iter()
.map(|(a, _)| a.confirmation_height)
.collect::<HashSet<_>>();

let height_to_time = relevant_heights
.clone()
.into_iter()
.zip(
client
.batch_block_header(relevant_heights)?
.into_iter()
.map(|bh| bh.time as u64),
)
.collect::<HashMap<u32, u64>>();

let new_graph = tx_graph
.clone()
.map_anchors(|a| ConfirmationTimeHeightAnchor {
anchor_block: a.anchor_block,
confirmation_height: a.confirmation_height,
confirmation_time: height_to_time[&a.confirmation_height],
});
Ok(new_graph)
}
Loading

0 comments on commit c3295e3

Please sign in to comment.