diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index bd7a2c850..d0ae80d5b 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,15 +1,18 @@ +use std::collections::BTreeSet; + use async_trait::async_trait; use bdk_chain::collections::btree_map; +use bdk_chain::Anchor; use bdk_chain::{ - bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, + bitcoin::{BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, collections::BTreeMap, local_chain::{self, CheckPoint}, BlockId, ConfirmationTimeHeightAnchor, TxGraph, }; -use esplora_client::TxStatus; +use esplora_client::{Amount, TxStatus}; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::anchor_from_status; +use crate::{anchor_from_status, FullScanUpdate, SyncUpdate}; /// [`esplora_client::Error`] type Error = Box; @@ -22,36 +25,15 @@ type Error = Box; #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] pub trait EsploraAsyncExt { - /// Prepare a [`LocalChain`] update with blocks fetched from Esplora. - /// - /// * `local_tip` is the previous tip of [`LocalChain::tip`]. - /// * `request_heights` is the block heights that we are interested in fetching from Esplora. - /// - /// The result of this method can be applied to [`LocalChain::apply_update`]. - /// - /// ## Consistency - /// - /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org - /// during the call. The size of re-org we can tollerate is server dependent but will be at - /// least 10. - /// - /// [`LocalChain`]: bdk_chain::local_chain::LocalChain - /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip - /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update - async fn update_local_chain( - &self, - local_tip: CheckPoint, - request_heights: impl IntoIterator + Send> + Send, - ) -> Result; - - /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and - /// returns a [`TxGraph`] and a map of last active indices. + /// Scan keychain scripts for transactions against Esplora, returning an update that can be + /// applied to the receiving structures. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `keychain_spks`: keychains that we want to scan transactions for /// - /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated - /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in - /// parallel. + /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no + /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to + /// make in parallel. /// /// ## Note /// @@ -65,19 +47,23 @@ pub trait EsploraAsyncExt { /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). /// /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1. + /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip async fn full_scan( &self, + local_tip: CheckPoint, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `misc_spks`: scripts that we want to sync transactions for /// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we @@ -86,210 +72,216 @@ pub trait EsploraAsyncExt { /// If the scripts to sync are unknown, such as when restoring or importing a keychain that /// may include scripts that have been used, use [`full_scan`] with the keychain. /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip /// [`full_scan`]: EsploraAsyncExt::full_scan async fn sync( &self, + local_tip: CheckPoint, misc_spks: impl IntoIterator + Send> + Send, txids: impl IntoIterator + Send> + Send, outpoints: impl IntoIterator + Send> + Send, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result; } #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] impl EsploraAsyncExt for esplora_client::AsyncClient { - async fn update_local_chain( - &self, - local_tip: CheckPoint, - request_heights: impl IntoIterator + Send> + Send, - ) -> Result { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = self - .get_blocks(None) - .await? - .into_iter() - .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks - .keys() - .last() - .copied() - .expect("must have atleast one block"); - - // Fetch blocks of heights that the caller is interested in, skipping blocks that are - // already fetched when constructing `fetched_blocks`. - for height in request_heights { - // do not fetch blocks higher than remote tip - if height > new_tip_height { - continue; - } - // only fetch what is missing - if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) { - // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent - // with the chain at the time of `get_blocks` above (there could have been a deep - // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's - // not possible to have a re-org deeper than that. - entry.insert(self.get_block_hash(height).await?); - } - } - - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } - - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => { - *entry.insert(self.get_block_hash(height).await?) - } - }; - - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { - break; - } - } - - Ok(local_chain::Update { - tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from)) - .expect("must be in height order"), - introduce_older_blocks: true, - }) - } - async fn full_scan( &self, + local_tip: CheckPoint, keychain_spks: BTreeMap< K, impl IntoIterator + Send> + Send, >, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error> { - type TxsOfSpkIndex = (u32, Vec); - let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); - let stop_gap = Ord::max(stop_gap, 1); - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - let client = self.clone(); - async move { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen).await?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Result::<_, Error>::Ok((spk_index, spk_txs)); - } - } - } - }) - .collect::>(); + ) -> Result, Error> { + let update_blocks = init_chain_update(self, &local_tip).await?; + let (tx_graph, last_active_indices) = + full_scan_for_index_and_graph(self, keychain_spks, stop_gap, parallel_requests).await?; + let local_chain = + finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?; + Ok(FullScanUpdate { + local_chain, + tx_graph, + last_active_indices, + }) + } - if handles.is_empty() { - break; - } + async fn sync( + &self, + local_tip: CheckPoint, + misc_spks: impl IntoIterator + Send> + Send, + txids: impl IntoIterator + Send> + Send, + outpoints: impl IntoIterator + Send> + Send, + parallel_requests: usize, + ) -> Result { + let update_blocks = init_chain_update(self, &local_tip).await?; + let tx_graph = + sync_for_index_and_graph(self, misc_spks, txids, outpoints, parallel_requests).await?; + let local_chain = + finalize_chain_update(self, &local_tip, tx_graph.all_anchors(), update_blocks).await?; + Ok(SyncUpdate { + tx_graph, + local_chain, + }) + } +} - for (index, txs) in handles.try_collect::>().await? { - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); - if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); - } +/// Create the initial chain update. +/// +/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the +/// update can connect to the `start_tip`. +/// +/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and +/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for +/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use +/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when +/// alternating between chain-sources. +#[doc(hidden)] +pub async fn init_chain_update( + client: &esplora_client::AsyncClient, + local_tip: &CheckPoint, +) -> Result, Error> { + // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are + // consistent. + let mut fetched_blocks = client + .get_blocks(None) + .await? + .into_iter() + .map(|b| (b.time.height, b.id)) + .collect::>(); + let new_tip_height = fetched_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); - let previous_outputs = tx.vin.iter().filter_map(|vin| { - let prevout = vin.prevout.as_ref()?; - Some(( - OutPoint { - txid: vin.txid, - vout: vin.vout, - }, - TxOut { - script_pubkey: prevout.scriptpubkey.clone(), - value: Amount::from_sat(prevout.value), - }, - )) - }); - - for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); - } - } - } + // Ensure `fetched_blocks` can create an update that connects with the original chain by + // finding a "Point of Agreement". + for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { + if height > new_tip_height { + continue; + } - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; - } + let fetched_hash = match fetched_blocks.entry(height) { + btree_map::Entry::Occupied(entry) => *entry.get(), + btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height).await?), + }; + + // We have found point of agreement so the update will connect! + if fetched_hash == local_hash { + break; + } + } + + Ok(fetched_blocks) +} + +/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. +/// +/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an +/// existing checkpoint/block under `local_tip` or `update_blocks`. +#[doc(hidden)] +pub async fn finalize_chain_update( + client: &esplora_client::AsyncClient, + local_tip: &CheckPoint, + anchors: &BTreeSet<(A, Txid)>, + mut update_blocks: BTreeMap, +) -> Result { + let update_tip_height = update_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); + + // We want to have a corresponding checkpoint per height. We iterate the heights of anchors + // backwards, comparing it against our `local_tip`'s chain and our current set of + // `update_blocks` to see if a corresponding checkpoint already exists. + let anchor_heights = anchors + .iter() + .rev() + .map(|(a, _)| a.anchor_block().height) + // filter out heights that surpass the update tip + .filter(|h| *h <= update_tip_height) + // filter out duplicate heights + .filter({ + let mut prev_height = Option::::None; + move |h| match prev_height.replace(*h) { + None => true, + Some(prev_h) => prev_h != *h, } + }); - if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of + // checkpoints more efficient. + let mut curr_cp = local_tip.clone(); + + for h in anchor_heights { + if let Some(cp) = curr_cp.range(h..).last() { + curr_cp = cp.clone(); + if cp.height() == h { + continue; } } - - Ok((graph, last_active_indexes)) + if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { + entry.insert(client.get_block_hash(h).await?); + } } - async fn sync( - &self, - misc_spks: impl IntoIterator + Send> + Send, - txids: impl IntoIterator + Send> + Send, - outpoints: impl IntoIterator + Send> + Send, - parallel_requests: usize, - ) -> Result, Error> { - let mut graph = self - .full_scan( - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - usize::MAX, - parallel_requests, - ) - .await - .map(|(g, _)| g)?; - - let mut txids = txids.into_iter(); + Ok(local_chain::Update { + tip: CheckPoint::from_block_ids( + update_blocks + .into_iter() + .map(|(height, hash)| BlockId { height, hash }), + ) + .expect("must be in order"), + introduce_older_blocks: true, + }) +} + +/// This performs a full scan to get an update for the [`TxGraph`] and +/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex). +#[doc(hidden)] +pub async fn full_scan_for_index_and_graph( + client: &esplora_client::AsyncClient, + keychain_spks: BTreeMap< + K, + impl IntoIterator + Send> + Send, + >, + stop_gap: usize, + parallel_requests: usize, +) -> Result<(TxGraph, BTreeMap), Error> { + type TxsOfSpkIndex = (u32, Vec); + let parallel_requests = Ord::max(parallel_requests, 1); + let mut graph = TxGraph::::default(); + let mut last_active_indexes = BTreeMap::::new(); + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; + loop { - let handles = txids + let handles = spks .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) - .map(|txid| { - let client = self.clone(); - async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) } + .map(|(spk_index, spk)| { + let client = client.clone(); + async move { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen).await?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Result::<_, Error>::Ok((spk_index, spk_txs)); + } + } + } }) .collect::>(); @@ -297,38 +289,128 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { break; } - for (txid, status) in handles.try_collect::>().await? { - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + for (index, txs) in handles.try_collect::>().await? { + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); + } + for tx in txs { + let _ = graph.insert_tx(tx.to_tx()); + if let Some(anchor) = anchor_from_status(&tx.status) { + let _ = graph.insert_anchor(tx.txid, anchor); + } + + let previous_outputs = tx.vin.iter().filter_map(|vin| { + let prevout = vin.prevout.as_ref()?; + Some(( + OutPoint { + txid: vin.txid, + vout: vin.vout, + }, + TxOut { + script_pubkey: prevout.scriptpubkey.clone(), + value: Amount::from_sat(prevout.value), + }, + )) + }); + + for (outpoint, txout) in previous_outputs { + let _ = graph.insert_txout(outpoint, txout); + } } } + + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; + } } - for op in outpoints.into_iter() { - if graph.get_tx(op.txid).is_none() { - if let Some(tx) = self.get_tx(&op.txid).await? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&op.txid).await?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); - } + if let Some(last_active_index) = last_active_index { + last_active_indexes.insert(keychain, last_active_index); + } + } + + Ok((graph, last_active_indexes)) +} + +#[doc(hidden)] +pub async fn sync_for_index_and_graph( + client: &esplora_client::AsyncClient, + misc_spks: impl IntoIterator + Send> + Send, + txids: impl IntoIterator + Send> + Send, + outpoints: impl IntoIterator + Send> + Send, + parallel_requests: usize, +) -> Result, Error> { + let mut graph = full_scan_for_index_and_graph( + client, + [( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + )] + .into(), + usize::MAX, + parallel_requests, + ) + .await + .map(|(g, _)| g)?; + + let mut txids = txids.into_iter(); + loop { + let handles = txids + .by_ref() + .take(parallel_requests) + .filter(|&txid| graph.get_tx(txid).is_none()) + .map(|txid| { + let client = client.clone(); + async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) } + }) + .collect::>(); + + if handles.is_empty() { + break; + } + + for (txid, status) in handles.try_collect::>().await? { + if let Some(anchor) = anchor_from_status(&status) { + let _ = graph.insert_anchor(txid, anchor); } + } + } - if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? { - if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { - if let Some(tx) = self.get_tx(&txid).await? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&txid).await?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); - } + for op in outpoints.into_iter() { + if graph.get_tx(op.txid).is_none() { + if let Some(tx) = client.get_tx(&op.txid).await? { + let _ = graph.insert_tx(tx); + } + let status = client.get_tx_status(&op.txid).await?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = graph.insert_anchor(op.txid, anchor); + } + } + + if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _).await? { + if let Some(txid) = op_status.txid { + if graph.get_tx(txid).is_none() { + if let Some(tx) = client.get_tx(&txid).await? { + let _ = graph.insert_tx(tx); + } + let status = client.get_tx_status(&txid).await?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = graph.insert_anchor(txid, anchor); } } } } - Ok(graph) } + + Ok(graph) } diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 52aefedcb..adfd33c09 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,10 @@ +use std::collections::BTreeSet; use std::thread::JoinHandle; +use std::usize; use bdk_chain::collections::btree_map; use bdk_chain::collections::BTreeMap; +use bdk_chain::Anchor; use bdk_chain::{ bitcoin::{Amount, BlockHash, OutPoint, ScriptBuf, TxOut, Txid}, local_chain::{self, CheckPoint}, @@ -10,9 +13,11 @@ use bdk_chain::{ use esplora_client::TxStatus; use crate::anchor_from_status; +use crate::FullScanUpdate; +use crate::SyncUpdate; /// [`esplora_client::Error`] -type Error = Box; +pub type Error = Box; /// Trait to extend the functionality of [`esplora_client::BlockingClient`]. /// @@ -20,36 +25,15 @@ type Error = Box; /// /// [crate-level documentation]: crate pub trait EsploraExt { - /// Prepare a [`LocalChain`] update with blocks fetched from Esplora. - /// - /// * `local_tip` is the previous tip of [`LocalChain::tip`]. - /// * `request_heights` is the block heights that we are interested in fetching from Esplora. - /// - /// The result of this method can be applied to [`LocalChain::apply_update`]. - /// - /// ## Consistency - /// - /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org - /// during the call. The size of re-org we can tollerate is server dependent but will be at - /// least 10. - /// - /// [`LocalChain`]: bdk_chain::local_chain::LocalChain - /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip - /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update - fn update_local_chain( - &self, - local_tip: CheckPoint, - request_heights: impl IntoIterator, - ) -> Result; - - /// Full scan the keychain scripts specified with the blockchain (via an Esplora client) and - /// returns a [`TxGraph`] and a map of last active indices. + /// Scan keychain scripts for transactions against Esplora, returning an update that can be + /// applied to the receiving structures. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `keychain_spks`: keychains that we want to scan transactions for /// - /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated - /// transactions. `parallel_requests` specifies the max number of HTTP requests to make in - /// parallel. + /// The full scan for each keychain stops after a gap of `stop_gap` script pubkeys with no + /// associated transactions. `parallel_requests` specifies the max number of HTTP requests to + /// make in parallel. /// /// ## Note /// @@ -63,16 +47,20 @@ pub trait EsploraExt { /// and [Sparrow](https://www.sparrowwallet.com/docs/faq.html#ive-restored-my-wallet-but-some-of-my-funds-are-missing). /// /// A `stop_gap` of 0 will be treated as a `stop_gap` of 1. + /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip fn full_scan( &self, + local_tip: CheckPoint, keychain_spks: BTreeMap>, stop_gap: usize, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error>; + ) -> Result, Error>; /// Sync a set of scripts with the blockchain (via an Esplora client) for the data /// specified and return a [`TxGraph`]. /// + /// * `local_tip`: the previously seen tip from [`LocalChain::tip`]. /// * `misc_spks`: scripts that we want to sync transactions for /// * `txids`: transactions for which we want updated [`ConfirmationTimeHeightAnchor`]s /// * `outpoints`: transactions associated with these outpoints (residing, spending) that we @@ -81,251 +69,365 @@ pub trait EsploraExt { /// If the scripts to sync are unknown, such as when restoring or importing a keychain that /// may include scripts that have been used, use [`full_scan`] with the keychain. /// + /// [`LocalChain::tip`]: local_chain::LocalChain::tip /// [`full_scan`]: EsploraExt::full_scan fn sync( &self, + local_tip: CheckPoint, misc_spks: impl IntoIterator, txids: impl IntoIterator, outpoints: impl IntoIterator, parallel_requests: usize, - ) -> Result, Error>; + ) -> Result; } impl EsploraExt for esplora_client::BlockingClient { - fn update_local_chain( + fn full_scan( &self, local_tip: CheckPoint, - request_heights: impl IntoIterator, - ) -> Result { - // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are - // consistent. - let mut fetched_blocks = self - .get_blocks(None)? - .into_iter() - .map(|b| (b.time.height, b.id)) - .collect::>(); - let new_tip_height = fetched_blocks - .keys() - .last() - .copied() - .expect("must atleast have one block"); - - // Fetch blocks of heights that the caller is interested in, skipping blocks that are - // already fetched when constructing `fetched_blocks`. - for height in request_heights { - // do not fetch blocks higher than remote tip - if height > new_tip_height { - continue; - } - // only fetch what is missing - if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) { - // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent - // with the chain at the time of `get_blocks` above (there could have been a deep - // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's - // not possible to have a re-org deeper than that. - entry.insert(self.get_block_hash(height)?); - } - } - - // Ensure `fetched_blocks` can create an update that connects with the original chain by - // finding a "Point of Agreement". - for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { - if height > new_tip_height { - continue; - } - - let fetched_hash = match fetched_blocks.entry(height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?), - }; - - // We have found point of agreement so the update will connect! - if fetched_hash == local_hash { - break; - } - } - - Ok(local_chain::Update { - tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from)) - .expect("must be in height order"), - introduce_older_blocks: true, + keychain_spks: BTreeMap>, + stop_gap: usize, + parallel_requests: usize, + ) -> Result, Error> { + let update_blocks = init_chain_update_blocking(self, &local_tip)?; + let (tx_graph, last_active_indices) = full_scan_for_index_and_graph_blocking( + self, + keychain_spks, + stop_gap, + parallel_requests, + )?; + let local_chain = finalize_chain_update_blocking( + self, + &local_tip, + tx_graph.all_anchors(), + update_blocks, + )?; + Ok(FullScanUpdate { + local_chain, + tx_graph, + last_active_indices, }) } - fn full_scan( + fn sync( &self, - keychain_spks: BTreeMap>, - stop_gap: usize, + local_tip: CheckPoint, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, parallel_requests: usize, - ) -> Result<(TxGraph, BTreeMap), Error> { - type TxsOfSpkIndex = (u32, Vec); - let parallel_requests = Ord::max(parallel_requests, 1); - let mut graph = TxGraph::::default(); - let mut last_active_indexes = BTreeMap::::new(); - let stop_gap = Ord::max(stop_gap, 1); - - for (keychain, spks) in keychain_spks { - let mut spks = spks.into_iter(); - let mut last_index = Option::::None; - let mut last_active_index = Option::::None; - - loop { - let handles = spks - .by_ref() - .take(parallel_requests) - .map(|(spk_index, spk)| { - std::thread::spawn({ - let client = self.clone(); - move || -> Result { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen)?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Ok((spk_index, spk_txs)); - } - } - } - }) - }) - .collect::>>>(); + ) -> Result { + let update_blocks = init_chain_update_blocking(self, &local_tip)?; + let tx_graph = sync_for_index_and_graph_blocking( + self, + misc_spks, + txids, + outpoints, + parallel_requests, + )?; + let local_chain = finalize_chain_update_blocking( + self, + &local_tip, + tx_graph.all_anchors(), + update_blocks, + )?; + Ok(SyncUpdate { + local_chain, + tx_graph, + }) + } +} - if handles.is_empty() { - break; - } +/// Create the initial chain update. +/// +/// This atomically fetches the latest blocks from Esplora and additional blocks to ensure the +/// update can connect to the `start_tip`. +/// +/// We want to do this before fetching transactions and anchors as we cannot fetch latest blocks and +/// transactions atomically, and the checkpoint tip is used to determine last-scanned block (for +/// block-based chain-sources). Therefore it's better to be conservative when setting the tip (use +/// an earlier tip rather than a later tip) otherwise the caller may accidentally skip blocks when +/// alternating between chain-sources. +#[doc(hidden)] +pub fn init_chain_update_blocking( + client: &esplora_client::BlockingClient, + local_tip: &CheckPoint, +) -> Result, Error> { + // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are + // consistent. + let mut fetched_blocks = client + .get_blocks(None)? + .into_iter() + .map(|b| (b.time.height, b.id)) + .collect::>(); + let new_tip_height = fetched_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); - for handle in handles { - let (index, txs) = handle.join().expect("thread must not panic")?; - last_index = Some(index); - if !txs.is_empty() { - last_active_index = Some(index); - } - for tx in txs { - let _ = graph.insert_tx(tx.to_tx()); - if let Some(anchor) = anchor_from_status(&tx.status) { - let _ = graph.insert_anchor(tx.txid, anchor); - } + // Ensure `fetched_blocks` can create an update that connects with the original chain by + // finding a "Point of Agreement". + for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { + if height > new_tip_height { + continue; + } - let previous_outputs = tx.vin.iter().filter_map(|vin| { - let prevout = vin.prevout.as_ref()?; - Some(( - OutPoint { - txid: vin.txid, - vout: vin.vout, - }, - TxOut { - script_pubkey: prevout.scriptpubkey.clone(), - value: Amount::from_sat(prevout.value), - }, - )) - }); - - for (outpoint, txout) in previous_outputs { - let _ = graph.insert_txout(outpoint, txout); - } - } - } + let fetched_hash = match fetched_blocks.entry(height) { + btree_map::Entry::Occupied(entry) => *entry.get(), + btree_map::Entry::Vacant(entry) => *entry.insert(client.get_block_hash(height)?), + }; - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { - break; - } + // We have found point of agreement so the update will connect! + if fetched_hash == local_hash { + break; + } + } + + Ok(fetched_blocks) +} + +/// Fetches missing checkpoints and finalizes the [`local_chain::Update`]. +/// +/// A checkpoint is considered "missing" if an anchor (of `anchors`) points to a height without an +/// existing checkpoint/block under `local_tip` or `update_blocks`. +#[doc(hidden)] +pub fn finalize_chain_update_blocking( + client: &esplora_client::BlockingClient, + local_tip: &CheckPoint, + anchors: &BTreeSet<(A, Txid)>, + mut update_blocks: BTreeMap, +) -> Result { + let update_tip_height = update_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); + + // We want to have a corresponding checkpoint per height. We iterate the heights of anchors + // backwards, comparing it against our `local_tip`'s chain and our current set of + // `update_blocks` to see if a corresponding checkpoint already exists. + let anchor_heights = anchors + .iter() + .rev() + .map(|(a, _)| a.anchor_block().height) + // filter out heights that surpass the update tip + .filter(|h| *h <= update_tip_height) + // filter out duplicate heights + .filter({ + let mut prev_height = Option::::None; + move |h| match prev_height.replace(*h) { + None => true, + Some(prev_h) => prev_h != *h, } + }); + + // We keep track of a checkpoint node of `local_tip` to make traversing the linked-list of + // checkpoints more efficient. + let mut curr_cp = local_tip.clone(); - if let Some(last_active_index) = last_active_index { - last_active_indexes.insert(keychain, last_active_index); + for h in anchor_heights { + if let Some(cp) = curr_cp.range(h..).last() { + curr_cp = cp.clone(); + if cp.height() == h { + continue; } } - - Ok((graph, last_active_indexes)) + if let btree_map::Entry::Vacant(entry) = update_blocks.entry(h) { + entry.insert(client.get_block_hash(h)?); + } } - fn sync( - &self, - misc_spks: impl IntoIterator, - txids: impl IntoIterator, - outpoints: impl IntoIterator, - parallel_requests: usize, - ) -> Result, Error> { - let mut graph = self - .full_scan( - [( - (), - misc_spks - .into_iter() - .enumerate() - .map(|(i, spk)| (i as u32, spk)), - )] - .into(), - usize::MAX, - parallel_requests, - ) - .map(|(g, _)| g)?; - - let mut txids = txids.into_iter(); + Ok(local_chain::Update { + tip: CheckPoint::from_block_ids( + update_blocks + .into_iter() + .map(|(height, hash)| BlockId { height, hash }), + ) + .expect("must be in order"), + introduce_older_blocks: true, + }) +} + +/// This performs a full scan to get an update for the [`TxGraph`] and +/// [`KeychainTxOutIndex`](bdk_chain::keychain::KeychainTxOutIndex). +#[doc(hidden)] +pub fn full_scan_for_index_and_graph_blocking( + client: &esplora_client::BlockingClient, + keychain_spks: BTreeMap>, + stop_gap: usize, + parallel_requests: usize, +) -> Result<(TxGraph, BTreeMap), Error> { + type TxsOfSpkIndex = (u32, Vec); + let parallel_requests = Ord::max(parallel_requests, 1); + let mut tx_graph = TxGraph::::default(); + let mut last_active_indices = BTreeMap::::new(); + + for (keychain, spks) in keychain_spks { + let mut spks = spks.into_iter(); + let mut last_index = Option::::None; + let mut last_active_index = Option::::None; + loop { - let handles = txids + let handles = spks .by_ref() .take(parallel_requests) - .filter(|&txid| graph.get_tx(txid).is_none()) - .map(|txid| { + .map(|(spk_index, spk)| { std::thread::spawn({ - let client = self.clone(); - move || { - client - .get_tx_status(&txid) - .map_err(Box::new) - .map(|s| (txid, s)) + let client = client.clone(); + move || -> Result { + let mut last_seen = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_seen)?; + let tx_count = txs.len(); + last_seen = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break Ok((spk_index, spk_txs)); + } + } } }) }) - .collect::>>>(); + .collect::>>>(); if handles.is_empty() { break; } for handle in handles { - let (txid, status) = handle.join().expect("thread must not panic")?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); + let (index, txs) = handle.join().expect("thread must not panic")?; + last_index = Some(index); + if !txs.is_empty() { + last_active_index = Some(index); + } + for tx in txs { + let _ = tx_graph.insert_tx(tx.to_tx()); + if let Some(anchor) = anchor_from_status(&tx.status) { + let _ = tx_graph.insert_anchor(tx.txid, anchor); + } + + let previous_outputs = tx.vin.iter().filter_map(|vin| { + let prevout = vin.prevout.as_ref()?; + Some(( + OutPoint { + txid: vin.txid, + vout: vin.vout, + }, + TxOut { + script_pubkey: prevout.scriptpubkey.clone(), + value: Amount::from_sat(prevout.value), + }, + )) + }); + + for (outpoint, txout) in previous_outputs { + let _ = tx_graph.insert_txout(outpoint, txout); + } } } + + let last_index = last_index.expect("Must be set since handles wasn't empty."); + let gap_limit_reached = if let Some(i) = last_active_index { + last_index >= i.saturating_add(stop_gap as u32) + } else { + last_index + 1 >= stop_gap as u32 + }; + if gap_limit_reached { + break; + } } - for op in outpoints { - if graph.get_tx(op.txid).is_none() { - if let Some(tx) = self.get_tx(&op.txid)? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&op.txid)?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(op.txid, anchor); - } + if let Some(last_active_index) = last_active_index { + last_active_indices.insert(keychain, last_active_index); + } + } + + Ok((tx_graph, last_active_indices)) +} + +#[doc(hidden)] +pub fn sync_for_index_and_graph_blocking( + client: &esplora_client::BlockingClient, + misc_spks: impl IntoIterator, + txids: impl IntoIterator, + outpoints: impl IntoIterator, + parallel_requests: usize, +) -> Result, Error> { + let (mut tx_graph, _) = full_scan_for_index_and_graph_blocking( + client, + { + let mut keychains = BTreeMap::new(); + keychains.insert( + (), + misc_spks + .into_iter() + .enumerate() + .map(|(i, spk)| (i as u32, spk)), + ); + keychains + }, + usize::MAX, + parallel_requests, + )?; + + let mut txids = txids.into_iter(); + loop { + let handles = txids + .by_ref() + .take(parallel_requests) + .filter(|&txid| tx_graph.get_tx(txid).is_none()) + .map(|txid| { + std::thread::spawn({ + let client = client.clone(); + move || { + client + .get_tx_status(&txid) + .map_err(Box::new) + .map(|s| (txid, s)) + } + }) + }) + .collect::>>>(); + + if handles.is_empty() { + break; + } + + for handle in handles { + let (txid, status) = handle.join().expect("thread must not panic")?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = tx_graph.insert_anchor(txid, anchor); } + } + } - if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? { - if let Some(txid) = op_status.txid { - if graph.get_tx(txid).is_none() { - if let Some(tx) = self.get_tx(&txid)? { - let _ = graph.insert_tx(tx); - } - let status = self.get_tx_status(&txid)?; - if let Some(anchor) = anchor_from_status(&status) { - let _ = graph.insert_anchor(txid, anchor); - } + for op in outpoints { + if tx_graph.get_tx(op.txid).is_none() { + if let Some(tx) = client.get_tx(&op.txid)? { + let _ = tx_graph.insert_tx(tx); + } + let status = client.get_tx_status(&op.txid)?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = tx_graph.insert_anchor(op.txid, anchor); + } + } + + if let Some(op_status) = client.get_output_status(&op.txid, op.vout as _)? { + if let Some(txid) = op_status.txid { + if tx_graph.get_tx(txid).is_none() { + if let Some(tx) = client.get_tx(&txid)? { + let _ = tx_graph.insert_tx(tx); + } + let status = client.get_tx_status(&txid)?; + if let Some(anchor) = anchor_from_status(&status) { + let _ = tx_graph.insert_anchor(txid, anchor); } } } } - Ok(graph) } + + Ok(tx_graph) } diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 535167ff2..c422a0833 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -16,7 +16,9 @@ //! [`TxGraph`]: bdk_chain::tx_graph::TxGraph //! [`example_esplora`]: https://github.com/bitcoindevkit/bdk/tree/master/example-crates/example_esplora -use bdk_chain::{BlockId, ConfirmationTimeHeightAnchor}; +use std::collections::BTreeMap; + +use bdk_chain::{local_chain, BlockId, ConfirmationTimeHeightAnchor, TxGraph}; use esplora_client::TxStatus; pub use esplora_client; @@ -48,3 +50,21 @@ fn anchor_from_status(status: &TxStatus) -> Option None } } + +/// Update returns from a full scan. +pub struct FullScanUpdate { + /// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain). + pub local_chain: local_chain::Update, + /// The update to apply to the receiving [`TxGraph`]. + pub tx_graph: TxGraph, + /// Last active indices for the corresponding keychains (`K`). + pub last_active_indices: BTreeMap, +} + +/// Update returned from a sync. +pub struct SyncUpdate { + /// The update to apply to the receiving [`LocalChain`](local_chain::LocalChain). + pub local_chain: local_chain::Update, + /// The update to apply to the receiving [`TxGraph`]. + pub tx_graph: TxGraph, +} diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index c71c214e9..5946bb4d8 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -2,7 +2,7 @@ use bdk_esplora::EsploraAsyncExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use esplora_client::{self, Builder}; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; @@ -52,8 +52,12 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } - let graph_update = client + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + + let sync_update = client .sync( + cp_tip.clone(), misc_spks.into_iter(), vec![].into_iter(), vec![].into_iter(), @@ -61,6 +65,24 @@ pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { ) .await?; + assert!( + { + let update_cps = sync_update + .local_chain + .tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + let superset_cps = cp_tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + superset_cps.is_superset(&update_cps) + }, + "update should not alter original checkpoint tip since we already started with all checkpoints", + ); + + let graph_update = sync_update.tx_graph; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. for tx in graph_update.full_txs() { @@ -140,14 +162,24 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + // A scan with a gap limit of 3 won't find the transaction, but a scan with a gap limit of 4 // will. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 3, 1).await?; - assert!(graph_update.full_txs().next().is_none()); - assert!(active_indices.is_empty()); - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 4, 1).await?; - assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr); - assert_eq!(active_indices[&0], 3); + let full_scan_update = client + .full_scan(cp_tip.clone(), keychains.clone(), 3, 1) + .await?; + assert!(full_scan_update.tx_graph.full_txs().next().is_none()); + assert!(full_scan_update.last_active_indices.is_empty()); + let full_scan_update = client + .full_scan(cp_tip.clone(), keychains.clone(), 4, 1) + .await?; + assert_eq!( + full_scan_update.tx_graph.full_txs().next().unwrap().txid, + txid_4th_addr + ); + assert_eq!(full_scan_update.last_active_indices[&0], 3); // Now receive a coin on the last address. let txid_last_addr = env.bitcoind.client.send_to_address( @@ -167,16 +199,26 @@ pub async fn test_async_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 5, 1).await?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + let full_scan_update = client + .full_scan(cp_tip.clone(), keychains.clone(), 5, 1) + .await?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); - assert_eq!(active_indices[&0], 3); - let (graph_update, active_indices) = client.full_scan(keychains, 6, 1).await?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + assert_eq!(full_scan_update.last_active_indices[&0], 3); + let full_scan_update = client.full_scan(cp_tip, keychains, 6, 1).await?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 2); assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr)); - assert_eq!(active_indices[&0], 9); + assert_eq!(full_scan_update.last_active_indices[&0], 9); Ok(()) } diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 35e38a778..d35fab658 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -3,7 +3,7 @@ use bdk_chain::BlockId; use bdk_esplora::EsploraExt; use electrsd::bitcoind::anyhow; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; -use esplora_client::{self, Builder}; +use esplora_client::{self, BlockHash, Builder}; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; @@ -68,13 +68,35 @@ pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } - let graph_update = client.sync( + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + + let sync_update = client.sync( + cp_tip.clone(), misc_spks.into_iter(), vec![].into_iter(), vec![].into_iter(), 1, )?; + assert!( + { + let update_cps = sync_update + .local_chain + .tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + let superset_cps = cp_tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + superset_cps.is_superset(&update_cps) + }, + "update should not alter original checkpoint tip since we already started with all checkpoints", + ); + + let graph_update = sync_update.tx_graph; // Check to see if we have the floating txouts available from our two created transactions' // previous outputs in order to calculate transaction fees. for tx in graph_update.full_txs() { @@ -155,14 +177,20 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { sleep(Duration::from_millis(10)) } + // use a full checkpoint linked list (since this is not what we are testing) + let cp_tip = env.make_checkpoint_tip(); + // A scan with a stop_gap of 3 won't find the transaction, but a scan with a gap limit of 4 // will. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 3, 1)?; - assert!(graph_update.full_txs().next().is_none()); - assert!(active_indices.is_empty()); - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 4, 1)?; - assert_eq!(graph_update.full_txs().next().unwrap().txid, txid_4th_addr); - assert_eq!(active_indices[&0], 3); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 3, 1)?; + assert!(full_scan_update.tx_graph.full_txs().next().is_none()); + assert!(full_scan_update.last_active_indices.is_empty()); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 4, 1)?; + assert_eq!( + full_scan_update.tx_graph.full_txs().next().unwrap().txid, + txid_4th_addr + ); + assert_eq!(full_scan_update.last_active_indices[&0], 3); // Now receive a coin on the last address. let txid_last_addr = env.bitcoind.client.send_to_address( @@ -182,16 +210,24 @@ pub fn test_update_tx_graph_stop_gap() -> anyhow::Result<()> { // A scan with gap limit 5 won't find the second transaction, but a scan with gap limit 6 will. // The last active indice won't be updated in the first case but will in the second one. - let (graph_update, active_indices) = client.full_scan(keychains.clone(), 5, 1)?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains.clone(), 5, 1)?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 1); assert!(txs.contains(&txid_4th_addr)); - assert_eq!(active_indices[&0], 3); - let (graph_update, active_indices) = client.full_scan(keychains, 6, 1)?; - let txs: HashSet<_> = graph_update.full_txs().map(|tx| tx.txid).collect(); + assert_eq!(full_scan_update.last_active_indices[&0], 3); + let full_scan_update = client.full_scan(cp_tip.clone(), keychains, 6, 1)?; + let txs: HashSet<_> = full_scan_update + .tx_graph + .full_txs() + .map(|tx| tx.txid) + .collect(); assert_eq!(txs.len(), 2); assert!(txs.contains(&txid_4th_addr) && txs.contains(&txid_last_addr)); - assert_eq!(active_indices[&0], 9); + assert_eq!(full_scan_update.last_active_indices[&0], 9); Ok(()) } @@ -317,14 +353,38 @@ fn update_local_chain() -> anyhow::Result<()> { for (i, t) in test_cases.into_iter().enumerate() { println!("Case {}: {}", i, t.name); let mut chain = t.chain; + let cp_tip = chain.tip(); - let update = client - .update_local_chain(chain.tip(), t.request_heights.iter().copied()) - .map_err(|err| { - anyhow::format_err!("[{}:{}] `update_local_chain` failed: {}", i, t.name, err) + let new_blocks = + bdk_esplora::init_chain_update_blocking(&client, &cp_tip).map_err(|err| { + anyhow::format_err!("[{}:{}] `init_chain_update` failed: {}", i, t.name, err) })?; - let update_blocks = update + let mock_anchors = t + .request_heights + .iter() + .map(|&h| { + let anchor_blockhash: BlockHash = bdk_chain::bitcoin::hashes::Hash::hash( + &format!("hash_at_height_{}", h).into_bytes(), + ); + let txid: Txid = bdk_chain::bitcoin::hashes::Hash::hash( + &format!("txid_at_height_{}", h).into_bytes(), + ); + let anchor = BlockId { + height: h, + hash: anchor_blockhash, + }; + (anchor, txid) + }) + .collect::>(); + + let chain_update = bdk_esplora::finalize_chain_update_blocking( + &client, + &cp_tip, + &mock_anchors, + new_blocks, + )?; + let update_blocks = chain_update .tip .iter() .map(|cp| cp.block_id()) @@ -346,14 +406,15 @@ fn update_local_chain() -> anyhow::Result<()> { ) .collect::>(); - assert_eq!( - update_blocks, exp_update_blocks, + assert!( + update_blocks.is_superset(&exp_update_blocks), "[{}:{}] unexpected update", - i, t.name + i, + t.name ); let _ = chain - .apply_update(update) + .apply_update(chain_update) .unwrap_or_else(|err| panic!("[{}:{}] update failed to apply: {}", i, t.name, err)); // all requested heights must exist in the final chain diff --git a/crates/testenv/src/lib.rs b/crates/testenv/src/lib.rs index 4ae6ea6e3..b0147d0fc 100644 --- a/crates/testenv/src/lib.rs +++ b/crates/testenv/src/lib.rs @@ -1,6 +1,8 @@ use bdk_chain::{ bitcoin::{ - address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, secp256k1::rand::random, transaction, Address, Amount, Block, BlockHash, CompactTarget, ScriptBuf, ScriptHash, Transaction, TxIn, TxOut, Txid + address::NetworkChecked, block::Header, hash_types::TxMerkleNode, hashes::Hash, + secp256k1::rand::random, transaction, Address, Amount, Block, BlockHash, CompactTarget, + ScriptBuf, ScriptHash, Transaction, TxIn, TxOut, Txid, }, local_chain::CheckPoint, BlockId, diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 9232081b6..3aa5b6d80 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeMap, io::{self, Write}, sync::Mutex, }; @@ -60,6 +60,7 @@ enum EsploraCommands { esplora_args: EsploraArgs, }, } + impl EsploraCommands { fn esplora_args(&self) -> EsploraArgs { match self { @@ -149,20 +150,24 @@ fn main() -> anyhow::Result<()> { }; let client = esplora_cmd.esplora_args().client(args.network)?; - // Prepare the `IndexedTxGraph` update based on whether we are scanning or syncing. + // Prepare the `IndexedTxGraph` and `LocalChain` updates based on whether we are scanning or + // syncing. + // // Scanning: We are iterating through spks of all keychains and scanning for transactions for // each spk. We start with the lowest derivation index spk and stop scanning after `stop_gap` // number of consecutive spks have no transaction history. A Scan is done in situations of // wallet restoration. It is a special case. Applications should use "sync" style updates // after an initial scan. + // // Syncing: We only check for specified spks, utxos and txids to update their confirmation // status or fetch missing transactions. - let indexed_tx_graph_changeset = match &esplora_cmd { + let (local_chain_changeset, indexed_tx_graph_changeset) = match &esplora_cmd { EsploraCommands::Scan { stop_gap, scan_options, .. } => { + let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); let keychain_spks = graph .lock() .expect("mutex must not be poisoned") @@ -189,23 +194,33 @@ fn main() -> anyhow::Result<()> { // is reached. It returns a `TxGraph` update (`graph_update`) and a structure that // represents the last active spk derivation indices of keychains // (`keychain_indices_update`). - let (mut graph_update, last_active_indices) = client - .full_scan(keychain_spks, *stop_gap, scan_options.parallel_requests) + let mut update = client + .full_scan( + local_tip, + keychain_spks, + *stop_gap, + scan_options.parallel_requests, + ) .context("scanning for transactions")?; // We want to keep track of the latest time a transaction was seen unconfirmed. let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = graph_update.update_last_seen_unconfirmed(now); + let _ = update.tx_graph.update_last_seen_unconfirmed(now); let mut graph = graph.lock().expect("mutex must not be poisoned"); + let mut chain = chain.lock().expect("mutex must not be poisoned"); // Because we did a stop gap based scan we are likely to have some updates to our // deriviation indices. Usually before a scan you are on a fresh wallet with no // addresses derived so we need to derive up to last active addresses the scan found // before adding the transactions. - let (_, index_changeset) = graph.index.reveal_to_target_multi(&last_active_indices); - let mut indexed_tx_graph_changeset = graph.apply_update(graph_update); - indexed_tx_graph_changeset.append(index_changeset.into()); - indexed_tx_graph_changeset + (chain.apply_update(update.local_chain)?, { + let (_, index_changeset) = graph + .index + .reveal_to_target_multi(&update.last_active_indices); + let mut indexed_tx_graph_changeset = graph.apply_update(update.tx_graph); + indexed_tx_graph_changeset.append(index_changeset.into()); + indexed_tx_graph_changeset + }) } EsploraCommands::Sync { mut unused_spks, @@ -231,12 +246,13 @@ fn main() -> anyhow::Result<()> { let mut outpoints: Box> = Box::new(core::iter::empty()); let mut txids: Box> = Box::new(core::iter::empty()); + let local_tip = chain.lock().expect("mutex must not be poisoned").tip(); + // Get a short lock on the structures to get spks, utxos, and txs that we are interested // in. { let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - let chain_tip = chain.tip().block_id(); if *all_spks { let all_spks = graph @@ -276,7 +292,7 @@ fn main() -> anyhow::Result<()> { let init_outpoints = graph.index.outpoints().iter().cloned(); let utxos = graph .graph() - .filter_chain_unspents(&*chain, chain_tip, init_outpoints) + .filter_chain_unspents(&*chain, local_tip.block_id(), init_outpoints) .map(|(_, utxo)| utxo) .collect::>(); outpoints = Box::new( @@ -299,7 +315,7 @@ fn main() -> anyhow::Result<()> { // `EsploraExt::update_tx_graph_without_keychain`. let unconfirmed_txids = graph .graph() - .list_chain_txs(&*chain, chain_tip) + .list_chain_txs(&*chain, local_tip.block_id()) .filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed()) .map(|canonical_tx| canonical_tx.tx_node.txid) .collect::>(); @@ -311,48 +327,30 @@ fn main() -> anyhow::Result<()> { } } - let mut graph_update = - client.sync(spks, txids, outpoints, scan_options.parallel_requests)?; + let mut update = client.sync( + local_tip, + spks, + txids, + outpoints, + scan_options.parallel_requests, + )?; // Update last seen unconfirmed let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = graph_update.update_last_seen_unconfirmed(now); + let _ = update.tx_graph.update_last_seen_unconfirmed(now); - graph.lock().unwrap().apply_update(graph_update) + ( + chain.lock().unwrap().apply_update(update.local_chain)?, + graph.lock().unwrap().apply_update(update.tx_graph), + ) } }; println!(); - // Now that we're done updating the `IndexedTxGraph`, it's time to update the `LocalChain`! We - // want the `LocalChain` to have data about all the anchors in the `TxGraph` - for this reason, - // we want retrieve the blocks at the heights of the newly added anchors that are missing from - // our view of the chain. - let (missing_block_heights, tip) = { - let chain = &*chain.lock().unwrap(); - let missing_block_heights = indexed_tx_graph_changeset - .graph - .missing_heights_from(chain) - .collect::>(); - let tip = chain.tip(); - (missing_block_heights, tip) - }; - - println!("prev tip: {}", tip.height()); - println!("missing block heights: {:?}", missing_block_heights); - - // Here, we actually fetch the missing blocks and create a `local_chain::Update`. - let chain_changeset = { - let chain_update = client - .update_local_chain(tip, missing_block_heights) - .context("scanning for blocks")?; - println!("new tip: {}", chain_update.tip.height()); - chain.lock().unwrap().apply_update(chain_update)? - }; - // We persist the changes let mut db = db.lock().unwrap(); - db.stage((chain_changeset, indexed_tx_graph_changeset)); + db.stage((local_chain_changeset, indexed_tx_graph_changeset)); db.commit()?; Ok(()) } diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index 14e9e38dd..5aa10fbca 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -53,18 +53,17 @@ async fn main() -> Result<(), anyhow::Error> { (k, k_spks) }) .collect(); - let (mut update_graph, last_active_indices) = client - .full_scan(keychain_spks, STOP_GAP, PARALLEL_REQUESTS) - .await?; + let mut update = client + .full_scan(prev_tip, keychain_spks, STOP_GAP, PARALLEL_REQUESTS) + .await?; let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = update_graph.update_last_seen_unconfirmed(now); - let missing_heights = update_graph.missing_heights(wallet.local_chain()); - let chain_update = client.update_local_chain(prev_tip, missing_heights).await?; + let _ = update.tx_graph.update_last_seen_unconfirmed(now); + let update = Update { - last_active_indices, - graph: update_graph, - chain: Some(chain_update), + last_active_indices: update.last_active_indices, + graph: update.tx_graph, + chain: Some(update.local_chain), }; wallet.apply_update(update)?; wallet.commit()?; diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 1815650d4..248c3c6bb 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -36,7 +36,6 @@ fn main() -> Result<(), anyhow::Error> { let client = esplora_client::Builder::new("https://blockstream.info/testnet/api").build_blocking(); - let prev_tip = wallet.latest_checkpoint(); let keychain_spks = wallet .all_unbounded_spk_iters() .into_iter() @@ -53,20 +52,20 @@ fn main() -> Result<(), anyhow::Error> { }) .collect(); - let (mut update_graph, last_active_indices) = - client.full_scan(keychain_spks, STOP_GAP, PARALLEL_REQUESTS)?; - + let mut update = client.full_scan( + wallet.latest_checkpoint(), + keychain_spks, + STOP_GAP, + PARALLEL_REQUESTS, + )?; let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs(); - let _ = update_graph.update_last_seen_unconfirmed(now); - let missing_heights = update_graph.missing_heights(wallet.local_chain()); - let chain_update = client.update_local_chain(prev_tip, missing_heights)?; - let update = Update { - last_active_indices, - graph: update_graph, - chain: Some(chain_update), - }; - - wallet.apply_update(update)?; + let _ = update.tx_graph.update_last_seen_unconfirmed(now); + + wallet.apply_update(Update { + last_active_indices: update.last_active_indices, + graph: update.tx_graph, + chain: Some(update.local_chain), + })?; wallet.commit()?; println!();