Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx-Sync: Track spent WatchedOutputs and re-add if unconfirmed #2946

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion lightning-transaction-sync/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use lightning::chain::{Confirm, WatchedOutput};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use bitcoin::{Txid, BlockHash, Transaction, OutPoint};
use bitcoin::block::Header;

Expand All @@ -13,6 +14,9 @@ pub(crate) struct SyncState {
// Outputs that were previously processed, but must not be forgotten yet as
// as we still need to monitor any spends on-chain.
pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
// Outputs for which we previously saw a spend on-chain but kept around until the spends reach
// sufficient depth.
pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>,
// The tip hash observed during our last sync.
pub last_sync_hash: Option<BlockHash>,
// Indicates whether we need to resync, e.g., after encountering an error.
Expand All @@ -24,6 +28,7 @@ impl SyncState {
Self {
watched_transactions: HashSet::new(),
watched_outputs: HashMap::new(),
outputs_spends_pending_threshold_conf: Vec::new(),
last_sync_hash: None,
pending_sync: false,
}
Expand All @@ -38,6 +43,17 @@ impl SyncState {
}

self.watched_transactions.insert(txid);

// If a previously-confirmed output spend is unconfirmed, re-add the watched output to
// the tracking map.
self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| {
if txid == *conf_txid {
self.watched_outputs.insert(*prev_outpoint, output.clone());
false
} else {
true
}
})
}
}

Expand All @@ -57,10 +73,18 @@ impl SyncState {
self.watched_transactions.remove(&ctx.tx.txid());

for input in &ctx.tx.input {
self.watched_outputs.remove(&input.previous_output);
if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output));
}
}
}
}

pub fn prune_output_spends(&mut self, cur_height: u32) {
self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| {
cur_height < conf_height + ANTI_REORG_DELAY - 1
});
}
}


Expand Down Expand Up @@ -104,6 +128,7 @@ impl FilterQueue {
#[derive(Debug)]
pub(crate) struct ConfirmedTx {
pub tx: Transaction,
pub txid: Txid,
pub block_header: Header,
pub block_height: u32,
pub pos: usize,
Expand Down
13 changes: 12 additions & 1 deletion lightning-transaction-sync/src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ where
for c in &confirmables {
c.best_block_updated(&tip_header, tip_height);
}

// Prune any sufficiently confirmed output spends
sync_state.prune_output_spends(tip_height);
}

match self.get_confirmed_transactions(&sync_state) {
Expand Down Expand Up @@ -254,7 +257,7 @@ where

// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
let mut confirmed_txs = Vec::new();
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
let mut watched_script_pubkeys = Vec::with_capacity(
sync_state.watched_transactions.len() + sync_state.watched_outputs.len());
let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
Expand Down Expand Up @@ -302,6 +305,9 @@ where

for (i, script_history) in tx_results.iter().enumerate() {
let (txid, tx) = &watched_txs[i];
if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
continue;
}
let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid);
if let Some(history) = filtered_history.next()
{
Expand All @@ -321,6 +327,10 @@ where
}

let txid = possible_output_spend.tx_hash;
if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
continue;
}

match self.client.transaction_get(&txid) {
Ok(tx) => {
let mut is_spend = false;
Expand Down Expand Up @@ -416,6 +426,7 @@ where
}
let confirmed_tx = ConfirmedTx {
tx: tx.clone(),
txid,
block_header, block_height: prob_conf_height,
pos,
};
Expand Down
39 changes: 30 additions & 9 deletions lightning-transaction-sync/src/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ where
}
}

match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) {
Ok(()) => {}
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
Expand Down Expand Up @@ -238,7 +238,7 @@ where

#[maybe_async]
fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash,
) -> Result<(), InternalError> {

// Inform the interface of the new block.
Expand All @@ -249,6 +249,9 @@ where
for c in confirmables {
c.best_block_updated(&tip_header, tip_height);
}

// Prune any sufficiently confirmed output spends
sync_state.prune_output_spends(tip_height);
}
} else {
return Err(InternalError::Inconsistency);
Expand All @@ -264,10 +267,13 @@ where
// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.

let mut confirmed_txs = Vec::new();
let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();

for txid in &sync_state.watched_transactions {
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
continue;
}
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
confirmed_txs.push(confirmed_tx);
}
}
Expand All @@ -278,9 +284,19 @@ where
{
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
if spending_tx_status.confirmed {
// Skip inserting duplicate ConfirmedTx entry
continue;
} else {
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
return Err(InternalError::Inconsistency);
}
}

if let Some(confirmed_tx) = maybe_await!(self
.get_confirmed_tx(
&spending_txid,
spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
))?
Expand All @@ -303,7 +319,7 @@ where

#[maybe_async]
fn get_confirmed_tx(
&self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
&self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
) -> Result<Option<ConfirmedTx>, InternalError> {
if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
let block_header = merkle_block.header;
Expand All @@ -318,22 +334,27 @@ where
let mut matches = Vec::new();
let mut indexes = Vec::new();
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
return Err(InternalError::Failed);
}

// unwrap() safety: len() > 0 is checked above
let pos = *indexes.first().unwrap() as usize;
if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
if tx.txid() != txid {
log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
return Err(InternalError::Failed);
}

if let Some(block_height) = known_block_height {
// We can take a shortcut here if a previous call already gave us the height.
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
}

let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if let Some(block_height) = block_status.height {
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
} else {
// If any previously-confirmed block suddenly is no longer confirmed, we found
// an inconsistency and should start over.
Expand Down
Loading