From 078f91bee46a9403820f26aa68f6399ecc43b426 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 3 Sep 2025 17:43:36 -0700 Subject: [PATCH 01/16] lnwallet: add new helper functions to scale confirmations based on amt --- lnwallet/confscale.go | 59 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 lnwallet/confscale.go diff --git a/lnwallet/confscale.go b/lnwallet/confscale.go new file mode 100644 index 0000000000..8a8239602f --- /dev/null +++ b/lnwallet/confscale.go @@ -0,0 +1,59 @@ +package lnwallet + +import ( + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnwire" +) + +const ( + // minRequiredConfs is the minimum number of confirmations we'll + // require for channel operations. + minRequiredConfs = 1 + + // maxRequiredConfs is the maximum number of confirmations we'll + // require for channel operations. + maxRequiredConfs = 6 + + // maxChannelSize is the maximum expected channel size in satoshis. + // This matches MaxBtcFundingAmount (0.16777215 BTC). + maxChannelSize = 16777215 +) + +// ScaleNumConfs returns a linearly scaled number of confirmations based on the +// provided channel amount and push amount (for funding transactions). The push +// amount represents additional risk when receiving funds. +func ScaleNumConfs(chanAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi) uint16 { + // For wumbo channels, always require maximum confirmations. + if chanAmt > maxChannelSize { + return maxRequiredConfs + } + + // Calculate total stake: channel amount + push amount. The push amount + // represents value at risk for the receiver. + maxChannelSizeMsat := lnwire.NewMSatFromSatoshis(maxChannelSize) + stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt + + // Scale confirmations linearly based on stake. + conf := uint64(maxRequiredConfs) * uint64(stake) / + uint64(maxChannelSizeMsat) + + // Bound the result between minRequiredConfs and maxRequiredConfs. + if conf < minRequiredConfs { + conf = minRequiredConfs + } + if conf > maxRequiredConfs { + conf = maxRequiredConfs + } + + return uint16(conf) +} + +// FundingConfsForAmounts returns the number of confirmations to wait for a +// funding transaction, taking into account both the channel amount and any +// pushed amount (which represents additional risk). +func FundingConfsForAmounts(chanAmt btcutil.Amount, + pushAmt lnwire.MilliSatoshi) uint16 { + + return ScaleNumConfs(chanAmt, pushAmt) +} + From 7e65f1df6cce68f38e42c40aea9171dd9bd2c441 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 3 Sep 2025 17:44:03 -0700 Subject: [PATCH 02/16] server: use new FundingConfsForAmounts helper func --- server.go | 44 +++++++++++--------------------------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/server.go b/server.go index 6d0b28f7ee..0c2add901c 100644 --- a/server.go +++ b/server.go @@ -1468,16 +1468,6 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, DefaultMinHtlcIn: cc.MinHtlcIn, NumRequiredConfs: func(chanAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi) uint16 { - // For large channels we increase the number - // of confirmations we require for the - // channel to be considered open. As it is - // always the responder that gets to choose - // value, the pushAmt is value being pushed - // to us. This means we have more to lose - // in the case this gets re-orged out, and - // we will require more confirmations before - // we consider it open. - // In case the user has explicitly specified // a default value for the number of // confirmations, we use it. @@ -1486,29 +1476,17 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, return defaultConf } - minConf := uint64(3) - maxConf := uint64(6) - - // If this is a wumbo channel, then we'll require the - // max amount of confirmations. - if chanAmt > MaxFundingAmount { - return uint16(maxConf) - } - - // If not we return a value scaled linearly - // between 3 and 6, depending on channel size. - // TODO(halseth): Use 1 as minimum? - maxChannelSize := uint64( - lnwire.NewMSatFromSatoshis(MaxFundingAmount)) - stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt - conf := maxConf * uint64(stake) / maxChannelSize - if conf < minConf { - conf = minConf - } - if conf > maxConf { - conf = maxConf - } - return uint16(conf) + // Otherwise, scale the number of confirmations based on + // the channel amount and push amount. For large + // channels we increase the number of + // confirmations we require for the channel to be + // considered open. As it is always the + // responder that gets to choose value, the + // pushAmt is value being pushed to us. This + // means we have more to lose in the case this + // gets re-orged out, and we will require more + // confirmations before we consider it open. + return lnwallet.FundingConfsForAmounts(chanAmt, pushAmt) }, RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 { // We scale the remote CSV delay (the time the From 6971f2441c3b376249c1a8bb1c03bfc56b275917 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 3 Sep 2025 17:44:32 -0700 Subject: [PATCH 03/16] lnwallet: define helper func to coop close conf scaling We have two versions: for itests, we just use one conf, but in prod, we'll scale the number of confirmations. --- lnwallet/confscale_integration.go | 13 +++++++++++++ lnwallet/confscale_prod.go | 25 +++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 lnwallet/confscale_integration.go create mode 100644 lnwallet/confscale_prod.go diff --git a/lnwallet/confscale_integration.go b/lnwallet/confscale_integration.go new file mode 100644 index 0000000000..72692a7360 --- /dev/null +++ b/lnwallet/confscale_integration.go @@ -0,0 +1,13 @@ +//go:build integration +// +build integration + +package lnwallet + +import "github.com/btcsuite/btcd/btcutil" + +// CloseConfsForCapacity returns the number of confirmations to wait +// before signaling a cooperative close. Under integration tests, we +// always return 1 to keep tests fast and deterministic. +func CloseConfsForCapacity(capacity btcutil.Amount) uint32 { //nolint:revive + return 1 +} \ No newline at end of file diff --git a/lnwallet/confscale_prod.go b/lnwallet/confscale_prod.go new file mode 100644 index 0000000000..f0a2cd586e --- /dev/null +++ b/lnwallet/confscale_prod.go @@ -0,0 +1,25 @@ +//go:build !integration +// +build !integration + +package lnwallet + +import "github.com/btcsuite/btcd/btcutil" + +// CloseConfsForCapacity returns the number of confirmations to wait +// before signaling a cooperative close, scaled by channel capacity. +// We enforce a minimum of 3 confirmations to provide better reorg protection, +// even for small channels. +func CloseConfsForCapacity(capacity btcutil.Amount) uint32 { //nolint:revive + // For cooperative closes, we don't have a push amount to consider, + // so we pass 0 for the pushAmt parameter. + scaledConfs := uint32(ScaleNumConfs(capacity, 0)) + + // Enforce a minimum of 3 confirmations for reorg safety. + // This protects against shallow reorgs which are more common. + const minCoopCloseConfs = 3 + if scaledConfs < minCoopCloseConfs { + return minCoopCloseConfs + } + + return scaledConfs +} \ No newline at end of file From 609201e28a0bca79e2bde7d1dcee17fb89ea935e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 3 Sep 2025 17:44:40 -0700 Subject: [PATCH 04/16] lnwallet: add tests for new conf scaling helper funcs --- lnwallet/confscale_test.go | 338 +++++++++++++++++++++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100644 lnwallet/confscale_test.go diff --git a/lnwallet/confscale_test.go b/lnwallet/confscale_test.go new file mode 100644 index 0000000000..786769e451 --- /dev/null +++ b/lnwallet/confscale_test.go @@ -0,0 +1,338 @@ +package lnwallet + +import ( + "testing" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" + "pgregory.net/rapid" +) + +// TestScaleNumConfsProperties tests various properties that ScaleNumConfs +// should satisfy using property-based testing. +func TestScaleNumConfsProperties(t *testing.T) { + t.Parallel() + + // The result should always be bounded between the minimum and maximum + // number of confirmations regardless of input values. + t.Run("bounded_result", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Generate random channel amount and push amount. + chanAmt := rapid.Uint64Range( + 0, maxChannelSize*10, + ).Draw(t, "chanAmt") + pushAmtSats := rapid.Uint64Range( + 0, chanAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + result := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + + // Check bounds + require.GreaterOrEqual( + t, result, uint16(minRequiredConfs), + "result should be >= minRequiredConfs", + ) + require.LessOrEqual( + t, result, uint16(maxRequiredConfs), + "result should be <= maxRequiredConfs", + ) + }) + }) + + // Larger channel amounts and push amounts should require equal or more + // confirmations, ensuring the function is monotonically increasing. + t.Run("monotonicity", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Generate two channel amounts where amt1 <= amt2. + amt1 := rapid.Uint64Range( + 0, maxChannelSize, + ).Draw(t, "amt1") + amt2 := rapid.Uint64Range( + amt1, maxChannelSize, + ).Draw(t, "amt2") + + // Generate push amounts proportional to channel size. + pushAmt1Sats := rapid.Uint64Range( + 0, amt1, + ).Draw(t, "pushAmt1") + pushAmt2Sats := rapid.Uint64Range( + pushAmt1Sats, amt2, + ).Draw(t, "pushAmt2") + + pushAmt1 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt1Sats), + ) + pushAmt2 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt2Sats), + ) + + confs1 := ScaleNumConfs(btcutil.Amount(amt1), pushAmt1) + confs2 := ScaleNumConfs(btcutil.Amount(amt2), pushAmt2) + + // Larger or equal stake should require equal or more + // confirmations. + require.GreaterOrEqual( + t, confs2, confs1, + "larger amount should require equal or "+ + "more confirmations", + ) + }) + }) + + // Wumbo channels (those exceeding the max standard channel size) should + // always require the maximum number of confirmations for safety. + t.Run("wumbo_max_confs", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Generate wumbo channel amount (above maxChannelSize). + wumboAmt := rapid.Uint64Range( + maxChannelSize+1, maxChannelSize*100, + ).Draw(t, "wumboAmt") + pushAmtSats := rapid.Uint64Range( + 0, wumboAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + result := ScaleNumConfs( + btcutil.Amount(wumboAmt), pushAmt, + ) + + require.Equal( + t, uint16(maxRequiredConfs), result, + "wumbo channels should always get "+ + "max confirmations", + ) + }) + }) + + // Zero channel amounts should always result in the minimum number of + // confirmations since there's no value at risk. + t.Run("zero_gets_min", func(t *testing.T) { + result := ScaleNumConfs(0, 0) + require.Equal( + t, uint16(minRequiredConfs), result, + "zero amount should get minimum confirmations", + ) + }) + + // The function should be deterministic, always returning the same + // output for the same input values. + t.Run("determinism", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + chanAmt := rapid.Uint64Range( + 0, maxChannelSize*2, + ).Draw(t, "chanAmt") + pushAmtSats := rapid.Uint64Range( + 0, chanAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + // Call multiple times with same inputs. + result1 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + result2 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + result3 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + + require.Equal( + t, result1, result2, + "function should be deterministic", + ) + require.Equal( + t, result2, result3, + "function should be deterministic", + ) + }) + }) + + // Adding a push amount to a channel should require equal or more + // confirmations compared to the same channel without a push amount. + t.Run("push_amount_effect", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Fix channel amount, vary push amount + chanAmt := rapid.Uint64Range( + 1, maxChannelSize, + ).Draw(t, "chanAmt") + pushAmt1Sats := rapid.Uint64Range( + 0, chanAmt/2, + ).Draw(t, "pushAmt1") + pushAmt2Sats := rapid.Uint64Range( + pushAmt1Sats, chanAmt, + ).Draw(t, "pushAmt2") + + pushAmt1 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt1Sats), + ) + pushAmt2 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt2Sats), + ) + + confs1 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt1, + ) + confs2 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt2, + ) + + // More push amount should require equal or more + // confirmations. + require.GreaterOrEqual( + t, confs2, confs1, + "larger push amount should "+ + "require equal or more confirmations", + ) + }) + }) +} + +// TestScaleNumConfsKnownValues tests ScaleNumConfs with specific known values +// to ensure the scaling formula works as expected. +func TestScaleNumConfsKnownValues(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + chanAmt btcutil.Amount + pushAmt lnwire.MilliSatoshi + expected uint16 + }{ + { + name: "zero amounts", + chanAmt: 0, + pushAmt: 0, + expected: minRequiredConfs, + }, + { + name: "tiny channel", + chanAmt: 1000, + pushAmt: 0, + expected: minRequiredConfs, + }, + { + name: "small channel no push", + chanAmt: 100_000, + pushAmt: 0, + expected: minRequiredConfs, + }, + { + name: "half max channel no push", + chanAmt: maxChannelSize / 2, + pushAmt: 0, + expected: 2, + }, + { + name: "max channel no push", + chanAmt: maxChannelSize, + pushAmt: 0, + expected: maxRequiredConfs, + }, + { + name: "wumbo channel", + chanAmt: maxChannelSize * 2, + pushAmt: 0, + expected: maxRequiredConfs, + }, + { + name: "small channel with push", + chanAmt: 100_000, + pushAmt: lnwire.NewMSatFromSatoshis(50_000), + expected: minRequiredConfs, + }, + { + name: "medium channel with significant push", + chanAmt: maxChannelSize / 4, + pushAmt: lnwire.NewMSatFromSatoshis(maxChannelSize / 4), + expected: 2, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + result := ScaleNumConfs(tc.chanAmt, tc.pushAmt) + + require.Equal( + t, tc.expected, result, + "chanAmt=%d, pushAmt=%d", tc.chanAmt, + tc.pushAmt, + ) + }) + } +} + +// TestFundingConfsForAmounts verifies that FundingConfsForAmounts is a simple +// wrapper around ScaleNumConfs. +func TestFundingConfsForAmounts(t *testing.T) { + t.Parallel() + + rapid.Check(t, func(t *rapid.T) { + chanAmt := rapid.Uint64Range( + 0, maxChannelSize*2, + ).Draw(t, "chanAmt") + pushAmtSats := rapid.Uint64Range( + 0, chanAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + // Both functions should return the same result. + scaleResult := ScaleNumConfs(btcutil.Amount(chanAmt), pushAmt) + fundingResult := FundingConfsForAmounts( + btcutil.Amount(chanAmt), pushAmt, + ) + + require.Equal( + t, scaleResult, fundingResult, + "FundingConfsForAmounts should return "+ + "same result as ScaleNumConfs", + ) + }) +} + +// TestCloseConfsForCapacity verifies that CloseConfsForCapacity correctly +// wraps ScaleNumConfs with zero push amount and enforces a minimum of 3 +// confirmations for reorg safety. +func TestCloseConfsForCapacity(t *testing.T) { + t.Parallel() + + rapid.Check(t, func(t *rapid.T) { + capacity := rapid.Uint64Range( + 0, maxChannelSize*2, + ).Draw(t, "capacity") + + // CloseConfsForCapacity should be equivalent to ScaleNumConfs + // with 0 push, but with a minimum of 3 confirmations enforced + // for reorg safety. + closeConfs := CloseConfsForCapacity(btcutil.Amount(capacity)) + scaleConfs := ScaleNumConfs(btcutil.Amount(capacity), 0) + + // The result should be at least the scaled value, but with a + // minimum of 3 confirmations. + const minCoopCloseConfs = 3 + expectedConfs := uint32(scaleConfs) + if expectedConfs < minCoopCloseConfs { + expectedConfs = minCoopCloseConfs + } + + require.Equal( + t, expectedConfs, closeConfs, + "CloseConfsForCapacity should match "+ + "ScaleNumConfs with 0 push amount, "+ + "but with minimum of 3 confs", + ) + }) +} From d8adbc7a9bc115d9512f5e8ce3c525252ae45a29 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 3 Sep 2025 17:45:31 -0700 Subject: [PATCH 05/16] peer+rpcserver: use new conf scaling for notifications --- peer/brontide.go | 32 ++++++++++++++++++-------------- rpcserver.go | 16 ++++++++-------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 9191cbb2ee..6fbba297d4 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -4444,14 +4444,19 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { localOut := chanCloser.LocalCloseOutput() remoteOut := chanCloser.RemoteCloseOutput() auxOut := chanCloser.AuxOutputs() - go WaitForChanToClose( - chanCloser.NegotiationHeight(), notifier, errChan, - &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() { - // Respond to the local subsystem which requested the - // channel closure. - if closeReq != nil { - closeReq.Updates <- &ChannelCloseUpdate{ - ClosingTxid: closingTxid[:], + // Determine the number of confirmations to wait before + // signaling a successful cooperative close, scaled by + // channel capacity (see CloseConfsForCapacity). + numConfs := lnwallet.CloseConfsForCapacity(chanCloser.Channel().Capacity) + + go WaitForChanToClose( + chanCloser.NegotiationHeight(), notifier, errChan, + &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, numConfs, func() { + // Respond to the local subsystem which requested the + // channel closure. + if closeReq != nil { + closeReq.Updates <- &ChannelCloseUpdate{ + ClosingTxid: closingTxid[:], Success: true, LocalCloseOutput: localOut, RemoteCloseOutput: remoteOut, @@ -4468,16 +4473,15 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { // finally the callback will be executed. If any error is encountered within // the function, then it will be sent over the errChan. func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, - errChan chan error, chanPoint *wire.OutPoint, - closingTxID *chainhash.Hash, closeScript []byte, cb func()) { + errChan chan error, chanPoint *wire.OutPoint, + closingTxID *chainhash.Hash, closeScript []byte, numConfs uint32, cb func()) { peerLog.Infof("Waiting for confirmation of close of ChannelPoint(%v) "+ "with txid: %v", chanPoint, closingTxID) - // TODO(roasbeef): add param for num needed confs - confNtfn, err := notifier.RegisterConfirmationsNtfn( - closingTxID, closeScript, 1, bestHeight, - ) + confNtfn, err := notifier.RegisterConfirmationsNtfn( + closingTxID, closeScript, numConfs, bestHeight, + ) if err != nil { if errChan != nil { errChan <- err diff --git a/rpcserver.go b/rpcserver.go index d3d3c51801..83330e3081 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2800,14 +2800,14 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, errChan = make(chan error, 1) notifier := r.server.cc.ChainNotifier - go peer.WaitForChanToClose( - uint32(bestHeight), notifier, errChan, chanPoint, - &closingTxid, closingTx.TxOut[0].PkScript, func() { - // Respond to the local subsystem which - // requested the channel closure. - updateChan <- &peer.ChannelCloseUpdate{ - ClosingTxid: closingTxid[:], - Success: true, + go peer.WaitForChanToClose( + uint32(bestHeight), notifier, errChan, chanPoint, + &closingTxid, closingTx.TxOut[0].PkScript, 1, func() { + // Respond to the local subsystem which + // requested the channel closure. + updateChan <- &peer.ChannelCloseUpdate{ + ClosingTxid: closingTxid[:], + Success: true, // Force closure transactions don't have // additional local/remote outputs. } From d32e34217e6bdb952c1a8152906bc15e5a403d0e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:40:13 -0700 Subject: [PATCH 06/16] lncfg: add new dev config option for scaling channel close confs This'll be useful for the set up upcoming itests. --- lncfg/dev.go | 7 +++++++ lncfg/dev_integration.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/lncfg/dev.go b/lncfg/dev.go index f048d69b7a..8e0c9dda45 100644 --- a/lncfg/dev.go +++ b/lncfg/dev.go @@ -5,6 +5,7 @@ package lncfg import ( "time" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwallet/chanfunding" ) @@ -58,3 +59,9 @@ func (d *DevConfig) GetMaxWaitNumBlocksFundingConf() uint32 { func (d *DevConfig) GetUnsafeConnect() bool { return false } + +// ChannelCloseConfs returns the config value for channel close confirmations +// override, which is always None for production build. +func (d *DevConfig) ChannelCloseConfs() fn.Option[uint32] { + return fn.None[uint32]() +} diff --git a/lncfg/dev_integration.go b/lncfg/dev_integration.go index 8ac85f5d9e..05ecdb27be 100644 --- a/lncfg/dev_integration.go +++ b/lncfg/dev_integration.go @@ -5,6 +5,7 @@ package lncfg import ( "time" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwallet/chanfunding" ) @@ -27,6 +28,7 @@ type DevConfig struct { UnsafeDisconnect bool `long:"unsafedisconnect" description:"Allows the rpcserver to intentionally disconnect from peers with open channels."` MaxWaitNumBlocksFundingConf uint32 `long:"maxwaitnumblocksfundingconf" description:"Maximum blocks to wait for funding confirmation before discarding non-initiated channels."` UnsafeConnect bool `long:"unsafeconnect" description:"Allow the rpcserver to connect to a peer even if there's already a connection."` + ForceChannelCloseConfs uint32 `long:"force-channel-close-confs" description:"Force a specific number of confirmations for channel closes (dev/test only)"` } // ChannelReadyWait returns the config value `ProcessChannelReadyWait`. @@ -71,3 +73,12 @@ func (d *DevConfig) GetMaxWaitNumBlocksFundingConf() uint32 { func (d *DevConfig) GetUnsafeConnect() bool { return d.UnsafeConnect } + +// ChannelCloseConfs returns the forced confirmation count if set, or None if +// the default behavior should be used. +func (d *DevConfig) ChannelCloseConfs() fn.Option[uint32] { + if d.ForceChannelCloseConfs == 0 { + return fn.None[uint32]() + } + return fn.Some(d.ForceChannelCloseConfs) +} From be5d6c24942da3fbcd108cdcc4ce421abe90bae5 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:43:53 -0700 Subject: [PATCH 07/16] multi: add new ChannelCloseConfs param, thread thru as needed In this commit, we add a new param that'll allow us to scale up the number of confirmations before we act on a new close. We'll use this later to improve the current on chain handling logic. --- contractcourt/chain_arbitrator.go | 8 ++++++++ contractcourt/chain_watcher.go | 6 ++++++ peer/brontide.go | 6 ++++++ server.go | 8 +++++--- 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 05eb46a68b..b4b2fa26fd 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -230,6 +230,12 @@ type ChainArbitratorConfig struct { // AuxResolver is an optional interface that can be used to modify the // way contracts are resolved. AuxResolver fn.Option[lnwallet.AuxContractResolver] + + // ChannelCloseConfs is an optional override for the number of + // confirmations required for channel closes. When set, this overrides + // the normal capacity-based scaling. This is only available in + // dev/integration builds for testing purposes. + ChannelCloseConfs fn.Option[uint32] } // ChainArbitrator is a sub-system that oversees the on-chain resolution of all @@ -1138,6 +1144,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error extractStateNumHint: lnwallet.GetStateNumHint, auxLeafStore: c.cfg.AuxLeafStore, auxResolver: c.cfg.AuxResolver, + chanCloseConfs: c.cfg.ChannelCloseConfs, }, ) if err != nil { @@ -1315,6 +1322,7 @@ func (c *ChainArbitrator) loadOpenChannels() error { extractStateNumHint: lnwallet.GetStateNumHint, auxLeafStore: c.cfg.AuxLeafStore, auxResolver: c.cfg.AuxResolver, + chanCloseConfs: c.cfg.ChannelCloseConfs, }, ) if err != nil { diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 9c566fd6b5..1600aa2d74 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -229,6 +229,12 @@ type chainWatcherConfig struct { // auxResolver is used to supplement contract resolution. auxResolver fn.Option[lnwallet.AuxContractResolver] + + // chanCloseConfs is an optional override for the number of + // confirmations required for channel closes. When set, this overrides + // the normal capacity-based scaling. This is only available in + // dev/integration builds for testing purposes. + chanCloseConfs fn.Option[uint32] } // chainWatcher is a system that's assigned to every active channel. The duty diff --git a/peer/brontide.go b/peer/brontide.go index 6fbba297d4..7d69aeefb9 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -370,6 +370,12 @@ type Config struct { // closure initiated by the remote peer. CoopCloseTargetConfs uint32 + // ChannelCloseConfs is an optional override for the number of + // confirmations required for channel closes. When set, this overrides + // the normal capacity-based scaling. This is only available in + // dev/integration builds for testing purposes. + ChannelCloseConfs fn.Option[uint32] + // ServerPubKey is the serialized, compressed public key of our lnd node. // It is used to determine which policy (channel edge) to pass to the // ChannelLink. diff --git a/server.go b/server.go index 0c2add901c..3bbae4f765 100644 --- a/server.go +++ b/server.go @@ -1361,9 +1361,10 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, return &pc.Incoming }, - AuxLeafStore: implCfg.AuxLeafStore, - AuxSigner: implCfg.AuxSigner, - AuxResolver: implCfg.AuxContractResolver, + AuxLeafStore: implCfg.AuxLeafStore, + AuxSigner: implCfg.AuxSigner, + AuxResolver: implCfg.AuxContractResolver, + ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(), }, dbs.ChanStateDB) // Select the configuration and funding parameters for Bitcoin. @@ -4362,6 +4363,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, MaxOutgoingCltvExpiry: s.cfg.MaxOutgoingCltvExpiry, MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation, CoopCloseTargetConfs: s.cfg.CoopCloseTargetConfs, + ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(), MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte( s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), ChannelCommitInterval: s.cfg.ChannelCommitInterval, From 717614962adc4f493a828ed686bad5109cceac0a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:44:44 -0700 Subject: [PATCH 08/16] peer: send out a notification after the 1st conf, then wait for the rest We wnt to add better handling, but not break any UIs or wallets. So we'll continue to send out a notification after a single confirmation, then send another after things are fully confirmed. --- peer/brontide.go | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 7d69aeefb9..a1aa1a2c20 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -4442,27 +4442,35 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { // If this is a locally requested shutdown, update the caller with a // new event detailing the current pending state of this request. if closeReq != nil { + // TODO(roasbeef): don't actually need this? closeReq.Updates <- &PendingUpdate{ - Txid: closingTxid[:], + Txid: closingTxid[:], + IsLocalCloseTx: fn.Some(true), } } localOut := chanCloser.LocalCloseOutput() remoteOut := chanCloser.RemoteCloseOutput() auxOut := chanCloser.AuxOutputs() - // Determine the number of confirmations to wait before - // signaling a successful cooperative close, scaled by - // channel capacity (see CloseConfsForCapacity). - numConfs := lnwallet.CloseConfsForCapacity(chanCloser.Channel().Capacity) - - go WaitForChanToClose( - chanCloser.NegotiationHeight(), notifier, errChan, - &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, numConfs, func() { - // Respond to the local subsystem which requested the - // channel closure. - if closeReq != nil { - closeReq.Updates <- &ChannelCloseUpdate{ - ClosingTxid: closingTxid[:], + + // Determine the number of confirmations to wait before signaling a + // successful cooperative close, scaled by channel capacity (see + // CloseConfsForCapacity). Check if we have a config override for + // testing purposes. + numConfs := p.cfg.ChannelCloseConfs.UnwrapOrFunc(func() uint32 { + // No override, use normal capacity-based scaling. + return lnwallet.CloseConfsForCapacity(chanCloser.Channel().Capacity) + }) + + // Register for full confirmation to send the final update. + go WaitForChanToClose( + chanCloser.NegotiationHeight(), notifier, errChan, + &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, numConfs, func() { + // Respond to the local subsystem which requested the + // channel closure. + if closeReq != nil { + closeReq.Updates <- &ChannelCloseUpdate{ + ClosingTxid: closingTxid[:], Success: true, LocalCloseOutput: localOut, RemoteCloseOutput: remoteOut, @@ -4479,15 +4487,15 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { // finally the callback will be executed. If any error is encountered within // the function, then it will be sent over the errChan. func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, - errChan chan error, chanPoint *wire.OutPoint, - closingTxID *chainhash.Hash, closeScript []byte, numConfs uint32, cb func()) { + errChan chan error, chanPoint *wire.OutPoint, + closingTxID *chainhash.Hash, closeScript []byte, numConfs uint32, cb func()) { peerLog.Infof("Waiting for confirmation of close of ChannelPoint(%v) "+ "with txid: %v", chanPoint, closingTxID) - confNtfn, err := notifier.RegisterConfirmationsNtfn( - closingTxID, closeScript, numConfs, bestHeight, - ) + confNtfn, err := notifier.RegisterConfirmationsNtfn( + closingTxID, closeScript, numConfs, bestHeight, + ) if err != nil { if errChan != nil { errChan <- err From f6f716ab75675c5c20333067695a8be21a689013 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:50:19 -0700 Subject: [PATCH 09/16] contractcourt: update close logic to handle re-orgs of depth n-1, where n is num confs In this commit, we update the close logic to handle re-ogs up to the final amount of confirmations. This is done generically, so we're able to handle events such as: coop close confirm, re-org, breach confirm, re-org, force close confirm, re-org, etc. The upcoming set of new tests will exercise all of these cases. We modify the block beat handling to unify the control flow. As it's possible we get the beat, then see the spend, or the oher way around. --- contractcourt/chain_watcher.go | 298 ++++++++++++++++++++++++++++----- 1 file changed, 258 insertions(+), 40 deletions(-) diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 1600aa2d74..234fc7c38b 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -88,6 +88,38 @@ type BreachCloseInfo struct { CloseSummary channeldb.ChannelCloseSummary } +// spendConfirmationState represents the state of spend confirmation tracking +// in the closeObserver state machine. We wait for N confirmations before +// processing any spend to protect against shallow reorgs. +type spendConfirmationState uint8 + +const ( + // spendStateNone indicates no spend has been detected yet. + spendStateNone spendConfirmationState = iota + + // spendStatePending indicates a spend has been detected and we're + // waiting for the required number of confirmations. + spendStatePending + + // spendStateConfirmed indicates the spend has reached the required + // confirmations and has been processed. + spendStateConfirmed +) + +// String returns a human-readable representation of the state. +func (s spendConfirmationState) String() string { + switch s { + case spendStateNone: + return "None" + case spendStatePending: + return "Pending" + case spendStateConfirmed: + return "Confirmed" + default: + return "Unknown" + } +} + // CommitSet is a collection of the set of known valid commitments at a given // instant. If ConfCommitKey is set, then the commitment identified by the // HtlcSetKey has hit the chain. This struct will be used to examine all live @@ -652,51 +684,225 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) { } // closeObserver is a dedicated goroutine that will watch for any closes of the -// channel that it's watching on chain. In the event of an on-chain event, the -// close observer will assembled the proper materials required to claim the -// funds of the channel on-chain (if required), then dispatch these as -// notifications to all subscribers. +// channel that it's watching on chain. It implements a state machine to handle +// spend detection and confirmation with reorg protection. The states are: +// +// - None (confNtfn == nil): No spend detected yet, waiting for spend +// notification +// +// - Pending (confNtfn != nil): Spend detected, waiting for N confirmations +// +// - Confirmed: Spend confirmed with N blocks, close has been processed func (c *chainWatcher) closeObserver() { defer c.wg.Done() - defer c.fundingSpendNtfn.Cancel() + + registerForSpend := func() (*chainntnfs.SpendEvent, error) { + fundingPkScript, err := deriveFundingPkScript(c.cfg.chanState) + if err != nil { + return nil, err + } + + heightHint := c.cfg.chanState.DeriveHeightHint() + + return c.cfg.notifier.RegisterSpendNtfn( + &c.cfg.chanState.FundingOutpoint, + fundingPkScript, + heightHint, + ) + } + + spendNtfn := c.fundingSpendNtfn + defer spendNtfn.Cancel() + + // We use these variables to implement a state machine to track the + // state of the spend confirmation process: + // * When confNtfn is nil, we're in state "None" waiting for a spend. + // * When confNtfn is set, we're in state "Pending" waiting for + // confirmations. + // + // After confirmations, we transition to state "Confirmed" and clean up. + var ( + pendingSpend *chainntnfs.SpendDetail + confNtfn *chainntnfs.ConfirmationEvent + ) log.Infof("Close observer for ChannelPoint(%v) active", c.cfg.chanState.FundingOutpoint) + // handleSpendDetection processes a newly detected spend by registering + // for confirmations. Returns the new confNtfn or error. + handleSpendDetection := func( + spend *chainntnfs.SpendDetail, + ) (*chainntnfs.ConfirmationEvent, error) { + + // If we already have a pending spend, check if it's the same + // transaction. This can happen if both the spend notification + // and blockbeat detect the same spend. + if pendingSpend != nil { + if *pendingSpend.SpenderTxHash == *spend.SpenderTxHash { + log.Debugf("ChannelPoint(%v): ignoring "+ + "duplicate spend detection for tx %v", + c.cfg.chanState.FundingOutpoint, + spend.SpenderTxHash) + return confNtfn, nil + } + + // Different spend detected. Cancel existing confNtfn + // and replace with new one. + log.Warnf("ChannelPoint(%v): detected different "+ + "spend tx %v, replacing pending tx %v", + c.cfg.chanState.FundingOutpoint, + spend.SpenderTxHash, + pendingSpend.SpenderTxHash) + + if confNtfn != nil { + confNtfn.Cancel() + } + } + + numConfs := c.requiredConfsForSpend() + txid := spend.SpenderTxHash + + newConfNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn( + txid, spend.SpendingTx.TxOut[0].PkScript, + numConfs, uint32(spend.SpendingHeight), + ) + if err != nil { + return nil, fmt.Errorf("register confirmations: %w", + err) + } + + log.Infof("ChannelPoint(%v): waiting for %d confirmations "+ + "of spend tx %v", c.cfg.chanState.FundingOutpoint, + numConfs, txid) + + return newConfNtfn, nil + } + for { + // We only listen to confirmation channels when we have a + // pending spend. By setting these to nil when not needed, Go's + // select ignores those cases, effectively implementing our + // state machine. + var ( + confChan <-chan *chainntnfs.TxConfirmation + negativeConfChan <-chan int32 + ) + if confNtfn != nil { + confChan = confNtfn.Confirmed + negativeConfChan = confNtfn.NegativeConf + } + select { - // A new block is received, we will check whether this block - // contains a spending tx that we are interested in. case beat := <-c.BlockbeatChan: log.Debugf("ChainWatcher(%v) received blockbeat %v", c.cfg.chanState.FundingOutpoint, beat.Height()) - // Process the block. - c.handleBlockbeat(beat) - - // If the funding outpoint is spent, we now go ahead and handle - // it. Note that we cannot rely solely on the `block` event - // above to trigger a close event, as deep down, the receiving - // of block notifications and the receiving of spending - // notifications are done in two different goroutines, so the - // expected order: [receive block -> receive spend] is not - // guaranteed . - case spend, ok := <-c.fundingSpendNtfn.Spend: - // If the channel was closed, then this means that the - // notifier exited, so we will as well. + spend := c.handleBlockbeat(beat) + if spend == nil { + continue + } + + // STATE TRANSITION: None -> Pending (from blockbeat). + log.Infof("ChannelPoint(%v): detected spend from "+ + "blockbeat, transitioning to %v", + c.cfg.chanState.FundingOutpoint, + spendStatePending) + + newConfNtfn, err := handleSpendDetection(spend) + if err != nil { + log.Errorf("Unable to handle spend "+ + "detection: %v", err) + return + } + pendingSpend = spend + confNtfn = newConfNtfn + + // STATE TRANSITION: None -> Pending. + // We've detected a spend, but don't process it yet. Instead, + // register for confirmations to protect against shallow reorgs. + case spend, ok := <-spendNtfn.Spend: + if !ok { + return + } + + log.Infof("ChannelPoint(%v): detected spend from "+ + "notification, transitioning to %v", + c.cfg.chanState.FundingOutpoint, + spendStatePending) + + newConfNtfn, err := handleSpendDetection(spend) + if err != nil { + log.Errorf("Unable to handle spend "+ + "detection: %v", err) + return + } + pendingSpend = spend + confNtfn = newConfNtfn + + // STATE TRANSITION: Pending -> Confirmed + // The spend has reached required confirmations. It's now safe + // to process since we've protected against shallow reorgs. + case conf, ok := <-confChan: + if !ok { + log.Errorf("Confirmation channel closed " + + "unexpectedly") + return + } + + log.Infof("ChannelPoint(%v): spend confirmed at "+ + "height %d, transitioning to %v", + c.cfg.chanState.FundingOutpoint, + conf.BlockHeight, spendStateConfirmed) + + err := c.handleCommitSpend(pendingSpend) + if err != nil { + log.Errorf("Failed to handle confirmed "+ + "spend: %v", err) + } + + confNtfn.Cancel() + confNtfn = nil + pendingSpend = nil + + // STATE TRANSITION: Pending -> None + // A reorg removed the spend tx. We reset to initial state and + // wait for ANY new spend (could be the same tx re-mined, or a + // different tx like an RBF replacement). + case reorgDepth, ok := <-negativeConfChan: if !ok { + log.Errorf("Negative conf channel closed " + + "unexpectedly") return } - err := c.handleCommitSpend(spend) + log.Infof("ChannelPoint(%v): spend reorged out at "+ + "depth %d, transitioning back to %v", + c.cfg.chanState.FundingOutpoint, reorgDepth, + spendStateNone) + + confNtfn.Cancel() + confNtfn = nil + pendingSpend = nil + + spendNtfn.Cancel() + var err error + spendNtfn, err = registerForSpend() if err != nil { - log.Errorf("Failed to handle commit spend: %v", - err) + log.Errorf("Unable to re-register for "+ + "spend: %v", err) + return } + log.Infof("ChannelPoint(%v): re-registered for spend "+ + "detection", c.cfg.chanState.FundingOutpoint) + // The chainWatcher has been signalled to exit, so we'll do so // now. case <-c.quit: + if confNtfn != nil { + confNtfn.Cancel() + } return } } @@ -992,6 +1198,18 @@ func (c *chainWatcher) toSelfAmount(tx *wire.MsgTx) btcutil.Amount { return btcutil.Amount(fn.Sum(vals)) } +// requiredConfsForSpend determines the number of confirmations required before +// processing a spend of the funding output. Uses config override if set +// (typically for testing), otherwise scales with channel capacity to balance +// security vs user experience for channels of different sizes. +func (c *chainWatcher) requiredConfsForSpend() uint32 { + return c.cfg.chanCloseConfs.UnwrapOrFunc(func() uint32 { + return lnwallet.CloseConfsForCapacity( + c.cfg.chanState.Capacity, + ) + }) +} + // dispatchCooperativeClose processed a detect cooperative channel closure. // We'll use the spending transaction to locate our output within the // transaction, then clean up the database state. We'll also dispatch a @@ -1009,8 +1227,8 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet localAmt := c.toSelfAmount(broadcastTx) // Once this is known, we'll mark the state as fully closed in the - // database. We can do this as a cooperatively closed channel has all - // its outputs resolved after only one confirmation. + // database. For cooperative closes, we wait for a confirmation depth + // determined by channel capacity before dispatching this event. closeSummary := &channeldb.ChannelCloseSummary{ ChanPoint: c.cfg.chanState.FundingOutpoint, ChainHash: c.cfg.chanState.ChainHash, @@ -1419,9 +1637,10 @@ func (c *chainWatcher) handleCommitSpend( case wire.MaxTxInSequenceNum: fallthrough case mempool.MaxRBFSequence: - // TODO(roasbeef): rare but possible, need itest case for - err := c.dispatchCooperativeClose(commitSpend) - if err != nil { + // This is a cooperative close. Dispatch it directly - the + // confirmation waiting and reorg handling is done in the + // closeObserver state machine before we reach this point. + if err := c.dispatchCooperativeClose(commitSpend); err != nil { return fmt.Errorf("handle coop close: %w", err) } @@ -1526,9 +1745,9 @@ func (c *chainWatcher) chanPointConfirmed() bool { } // handleBlockbeat takes a blockbeat and queries for a spending tx for the -// funding output. If the spending tx is found, it will be handled based on the -// closure type. -func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) { +// funding output. If found, it returns the spend details so closeObserver can +// process it. Returns nil if no spend was detected. +func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) *chainntnfs.SpendDetail { // Notify the chain watcher has processed the block. defer c.NotifyBlockProcessed(beat, nil) @@ -1540,24 +1759,23 @@ func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) { // If the funding output hasn't confirmed in this block, we // will check it again in the next block. if !c.chanPointConfirmed() { - return + return nil } } // Perform a non-blocking read to check whether the funding output was - // spent. + // spent. The actual spend handling is done in closeObserver's state + // machine to avoid blocking the block processing pipeline. spend := c.checkFundingSpend() if spend == nil { log.Tracef("No spend found for ChannelPoint(%v) in block %v", c.cfg.chanState.FundingOutpoint, beat.Height()) - return + return nil } - // The funding output was spent, we now handle it by sending a close - // event to the channel arbitrator. - err := c.handleCommitSpend(spend) - if err != nil { - log.Errorf("Failed to handle commit spend: %v", err) - } + log.Debugf("Detected spend of ChannelPoint(%v) in block %v", + c.cfg.chanState.FundingOutpoint, beat.Height()) + + return spend } From dbc896c72b446fdeba2af865d147fe7d1782ac4b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:50:39 -0700 Subject: [PATCH 10/16] lntest: add new wait for conf helper method to ChainNotifier --- lntest/harness_assertion.go | 5 +++-- lntest/mock/chainnotifier.go | 40 +++++++++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/lntest/harness_assertion.go b/lntest/harness_assertion.go index 544576bef9..958278d3cc 100644 --- a/lntest/harness_assertion.go +++ b/lntest/harness_assertion.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" @@ -552,8 +553,8 @@ func (h HarnessTest) WaitForChannelCloseEvent( require.NoError(h, err) resp, ok := event.Update.(*lnrpc.CloseStatusUpdate_ChanClose) - require.Truef(h, ok, "expected channel close update, instead got %v", - event.Update) + require.Truef(h, ok, "expected channel close update, instead got %T: %v", + event.Update, spew.Sdump(event.Update)) txid, err := chainhash.NewHash(resp.ChanClose.ClosingTxid) require.NoErrorf(h, err, "wrong format found in closing txid: %v", diff --git a/lntest/mock/chainnotifier.go b/lntest/mock/chainnotifier.go index ddce8defa2..09464ba85c 100644 --- a/lntest/mock/chainnotifier.go +++ b/lntest/mock/chainnotifier.go @@ -1,6 +1,9 @@ package mock import ( + "testing" + "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" @@ -8,9 +11,10 @@ import ( // ChainNotifier is a mock implementation of the ChainNotifier interface. type ChainNotifier struct { - SpendChan chan *chainntnfs.SpendDetail - EpochChan chan *chainntnfs.BlockEpoch - ConfChan chan *chainntnfs.TxConfirmation + SpendChan chan *chainntnfs.SpendDetail + EpochChan chan *chainntnfs.BlockEpoch + ConfChan chan *chainntnfs.TxConfirmation + ConfRegistered chan struct{} } // RegisterConfirmationsNtfn returns a ConfirmationEvent that contains a channel @@ -19,6 +23,14 @@ func (c *ChainNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, numConfs, heightHint uint32, opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) { + // Signal that a confirmation registration occurred. + if c.ConfRegistered != nil { + select { + case c.ConfRegistered <- struct{}{}: + default: + } + } + return &chainntnfs.ConfirmationEvent{ Confirmed: c.ConfChan, Cancel: func() {}, @@ -61,3 +73,25 @@ func (c *ChainNotifier) Started() bool { func (c *ChainNotifier) Stop() error { return nil } + +// WaitForConfRegistrationAndSend waits for a confirmation registration to occur +// and then sends a confirmation notification. This is a helper function for tests +// that need to ensure the chain watcher has registered for confirmations before +// sending the confirmation. +func (c *ChainNotifier) WaitForConfRegistrationAndSend(t *testing.T) { + t.Helper() + + // Wait for the chain watcher to register for confirmations. + select { + case <-c.ConfRegistered: + case <-time.After(time.Second * 2): + t.Fatalf("timeout waiting for conf registration") + } + + // Send the confirmation to satisfy the confirmation requirement. + select { + case c.ConfChan <- &chainntnfs.TxConfirmation{}: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send confirmation") + } +} From 4e2d08f8c69ffee42ba8bc16f32997a6dbe0335e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:51:00 -0700 Subject: [PATCH 11/16] contractcourt: add new chainWatcherTestHarness We'll use this for all the upcoming tests. --- contractcourt/chain_watcher_test_harness.go | 693 ++++++++++++++++++++ 1 file changed, 693 insertions(+) create mode 100644 contractcourt/chain_watcher_test_harness.go diff --git a/contractcourt/chain_watcher_test_harness.go b/contractcourt/chain_watcher_test_harness.go new file mode 100644 index 0000000000..0b7c0c9741 --- /dev/null +++ b/contractcourt/chain_watcher_test_harness.go @@ -0,0 +1,693 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainio" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn/v2" + lnmock "github.com/lightningnetwork/lnd/lntest/mock" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/mock" +) + +// testReporter is a minimal interface for test reporting that is satisfied by +// both *testing.T and *rapid.T, allowing the harness to work with property-based +// tests. +type testReporter interface { + Helper() + Fatalf(format string, args ...any) +} + +// chainWatcherTestHarness provides a test harness for chain watcher tests +// with utilities for simulating spends, confirmations, and reorganizations. +type chainWatcherTestHarness struct { + t testReporter + + // aliceChannel and bobChannel are the test channels. + aliceChannel *lnwallet.LightningChannel + bobChannel *lnwallet.LightningChannel + + // chainWatcher is the chain watcher under test. + chainWatcher *chainWatcher + + // notifier is the mock chain notifier. + notifier *mockChainNotifier + + // chanEvents is the channel event subscription. + chanEvents *ChainEventSubscription + + // currentHeight tracks the current block height. + currentHeight int32 + + // blockbeatProcessed is a channel that signals when a blockbeat has been processed. + blockbeatProcessed chan struct{} +} + +// mockChainNotifier extends the standard mock with additional channels for +// testing cooperative close reorgs. +type mockChainNotifier struct { + *lnmock.ChainNotifier + + // confEvents tracks active confirmation event subscriptions. + confEvents []*mockConfirmationEvent + + // confRegistered is a channel that signals when a new confirmation + // event has been registered. + confRegistered chan struct{} + + // spendEvents tracks active spend event subscriptions. + spendEvents []*chainntnfs.SpendEvent + + // spendRegistered is a channel that signals when a new spend + // event has been registered. + spendRegistered chan struct{} +} + +// mockConfirmationEvent represents a mock confirmation event subscription. +type mockConfirmationEvent struct { + txid chainhash.Hash + numConfs uint32 + confirmedChan chan *chainntnfs.TxConfirmation + negConfChan chan int32 + cancelled bool +} + +// RegisterSpendNtfn creates a new mock spend event. +func (m *mockChainNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + + // The base mock already has SpendChan, use that. + spendEvent := &chainntnfs.SpendEvent{ + Spend: m.SpendChan, + Cancel: func() { + // No-op for now. + }, + } + + m.spendEvents = append(m.spendEvents, spendEvent) + + // Signal that a new spend event has been registered. + select { + case m.spendRegistered <- struct{}{}: + default: + } + + return spendEvent, nil +} + +// RegisterConfirmationsNtfn creates a new mock confirmation event. +func (m *mockChainNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + pkScript []byte, numConfs, heightHint uint32, + opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) { + + mockEvent := &mockConfirmationEvent{ + txid: *txid, + numConfs: numConfs, + confirmedChan: make(chan *chainntnfs.TxConfirmation, 1), + negConfChan: make(chan int32, 1), + } + + m.confEvents = append(m.confEvents, mockEvent) + + // Signal that a new confirmation event has been registered. + select { + case m.confRegistered <- struct{}{}: + default: + } + + return &chainntnfs.ConfirmationEvent{ + Confirmed: mockEvent.confirmedChan, + NegativeConf: mockEvent.negConfChan, + Cancel: func() { + mockEvent.cancelled = true + }, + }, nil +} + +// harnessOpt is a functional option for configuring the test harness. +type harnessOpt func(*harnessConfig) + +// harnessConfig holds configuration for the test harness. +type harnessConfig struct { + requiredConfs fn.Option[uint32] +} + +// withRequiredConfs sets the number of confirmations required for channel closes. +func withRequiredConfs(confs uint32) harnessOpt { + return func(cfg *harnessConfig) { + cfg.requiredConfs = fn.Some(confs) + } +} + +// newChainWatcherTestHarness creates a new test harness for chain watcher tests. +func newChainWatcherTestHarness(t *testing.T, opts ...harnessOpt) *chainWatcherTestHarness { + return newChainWatcherTestHarnessFromReporter(t, t, opts...) +} + +// newChainWatcherTestHarnessFromReporter creates a test harness that works with +// both *testing.T and *rapid.T. The testingT parameter is used for operations +// that specifically require *testing.T (like CreateTestChannels), while reporter +// is used for all test reporting (Helper, Fatalf). +func newChainWatcherTestHarnessFromReporter(testingT *testing.T, + reporter testReporter, opts ...harnessOpt) *chainWatcherTestHarness { + + reporter.Helper() + + // Apply options. + cfg := &harnessConfig{ + requiredConfs: fn.None[uint32](), + } + for _, opt := range opts { + opt(cfg) + } + + // Create test channels. + aliceChannel, bobChannel, err := lnwallet.CreateTestChannels( + testingT, channeldb.SingleFunderTweaklessBit, + ) + if err != nil { + reporter.Fatalf("unable to create test channels: %v", err) + } + + // Create mock notifier. + baseNotifier := &lnmock.ChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail, 1), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + } + + notifier := &mockChainNotifier{ + ChainNotifier: baseNotifier, + confEvents: make([]*mockConfirmationEvent, 0), + confRegistered: make(chan struct{}, 10), + spendEvents: make([]*chainntnfs.SpendEvent, 0), + spendRegistered: make(chan struct{}, 10), + } + + // Create chain watcher. + chainWatcher, err := newChainWatcher(chainWatcherConfig{ + chanState: aliceChannel.State(), + notifier: notifier, + signer: aliceChannel.Signer, + extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: cfg.requiredConfs, + contractBreach: func(retInfo *lnwallet.BreachRetribution) error { + // In tests, we just need to accept the breach notification. + // The actual breach handling is tested elsewhere. + return nil + }, + }) + if err != nil { + reporter.Fatalf("unable to create chain watcher: %v", err) + } + + // Start chain watcher (this will register for spend notification). + err = chainWatcher.Start() + if err != nil { + reporter.Fatalf("unable to start chain watcher: %v", err) + } + + // Subscribe to channel events. + chanEvents := chainWatcher.SubscribeChannelEvents() + + harness := &chainWatcherTestHarness{ + t: reporter, + aliceChannel: aliceChannel, + bobChannel: bobChannel, + chainWatcher: chainWatcher, + notifier: notifier, + chanEvents: chanEvents, + currentHeight: 100, + blockbeatProcessed: make(chan struct{}), + } + + // Wait for the initial spend registration that happens in Start(). + harness.waitForSpendRegistration() + + // Verify BlockbeatChan is initialized. + if chainWatcher.BlockbeatChan == nil { + reporter.Fatalf("BlockbeatChan is nil after initialization") + } + + // Register cleanup. We use the testingT for Cleanup since rapid.T + // may not have this method in the same way. + testingT.Cleanup(func() { + chainWatcher.Stop() + }) + + return harness +} + +// createCoopCloseTx creates a cooperative close transaction with the given +// output value. The transaction will have the proper sequence number to +// indicate it's a cooperative close. +func (h *chainWatcherTestHarness) createCoopCloseTx(outputValue int64) *wire.MsgTx { + return &wire.MsgTx{ + TxIn: []*wire.TxIn{{ + PreviousOutPoint: h.aliceChannel.State().FundingOutpoint, + Sequence: wire.MaxTxInSequenceNum, + }}, + TxOut: []*wire.TxOut{{ + Value: outputValue, + PkScript: []byte{byte(outputValue % 255)}, // Unique script. + }}, + } +} + +// createRemoteUnilateralCloseTx creates a remote unilateral close transaction. +// From Alice's perspective, this is Bob's local commitment transaction. +func (h *chainWatcherTestHarness) createRemoteUnilateralCloseTx() *wire.MsgTx { + return h.bobChannel.State().LocalCommitment.CommitTx +} + +// createLocalForceCloseTx creates a local force close transaction. +// This is Alice's local commitment transaction. +func (h *chainWatcherTestHarness) createLocalForceCloseTx() *wire.MsgTx { + return h.aliceChannel.State().LocalCommitment.CommitTx +} + +// createBreachCloseTx creates a breach (revoked commitment) transaction. +// We advance the channel state, save the commitment, then advance again +// to revoke it. Returns the revoked commitment tx. +func (h *chainWatcherTestHarness) createBreachCloseTx() *wire.MsgTx { + h.t.Helper() + + // To create a revoked commitment, we need to advance the channel state + // at least once. We'll use the test utils helper to add an HTLC and + // force a state transition. + + // Get the current commitment before we advance (this will be revoked). + revokedCommit := h.bobChannel.State().LocalCommitment.CommitTx + + // Add a fake HTLC to advance state. + htlcAmount := lnwire.NewMSatFromSatoshis(10000) + paymentHash := [32]byte{4, 5, 6} + htlc := &lnwire.UpdateAddHTLC{ + ID: 0, + Amount: htlcAmount, + Expiry: uint32(h.currentHeight + 100), + PaymentHash: paymentHash, + } + + // Add HTLC to both channels. + if _, err := h.aliceChannel.AddHTLC(htlc, nil); err != nil { + h.t.Fatalf("unable to add HTLC to alice: %v", err) + } + if _, err := h.bobChannel.ReceiveHTLC(htlc); err != nil { + h.t.Fatalf("unable to add HTLC to bob: %v", err) + } + + // Force state transition using the helper. + if err := lnwallet.ForceStateTransition(h.aliceChannel, h.bobChannel); err != nil { + h.t.Fatalf("unable to force state transition: %v", err) + } + + // Return the revoked commitment (Bob's previous local commitment). + return revokedCommit +} + +// sendSpend sends a spend notification for the given transaction. +func (h *chainWatcherTestHarness) sendSpend(tx *wire.MsgTx) { + h.t.Helper() + + txHash := tx.TxHash() + spend := &chainntnfs.SpendDetail{ + SpenderTxHash: &txHash, + SpendingTx: tx, + SpendingHeight: h.currentHeight, + } + + select { + case h.notifier.SpendChan <- spend: + case <-time.After(time.Second): + h.t.Fatalf("unable to send spend notification") + } +} + +// sendBlockBeat sends a blockbeat to the chain watcher. +// Note: This is not used for cooperative close tests since the chain watcher +// blocks synchronously waiting for confirmations and can't process blockbeats. +func (h *chainWatcherTestHarness) sendBlockBeat() { + h.t.Helper() + + // Create mock blockbeat exactly as the other tests do. + mockBeat := &chainio.MockBlockbeat{} + + // Mock the logger. We don't care how many times it's called as it's + // not critical. + mockBeat.On("logger").Return(log) + + // Mock a fake block height - this is called based on the debuglevel. + mockBeat.On("Height").Return(h.currentHeight).Maybe() + + // Create a channel to signal when blockbeat is processed. + processed := make(chan struct{}) + + // Mock `NotifyBlockProcessed` to signal when done. + mockBeat.On("NotifyBlockProcessed", + nil, h.chainWatcher.quit).Return().Run(func(args mock.Arguments) { + close(processed) + }).Once() + + // Send the blockbeat. + select { + case h.chainWatcher.BlockbeatChan <- mockBeat: + case <-time.After(5 * time.Second): + h.t.Fatalf("unable to send blockbeat") + } + + // Wait for the blockbeat to be processed. + select { + case <-processed: + // Blockbeat processed. + case <-time.After(5 * time.Second): + h.t.Fatalf("blockbeat not processed") + } +} + +// confirmTx sends a confirmation notification for the given transaction. +func (h *chainWatcherTestHarness) confirmTx(tx *wire.MsgTx, height int32) { + h.t.Helper() + + // Find the confirmation event for this transaction. + txHash := tx.TxHash() + var confEvent *mockConfirmationEvent + for _, event := range h.notifier.confEvents { + if event.txid == txHash && !event.cancelled { + confEvent = event + break + } + } + + if confEvent == nil { + // The chain watcher might not have registered for confirmations yet. + // This is not necessarily an error in some test scenarios. + return + } + + // Send confirmation. + select { + case confEvent.confirmedChan <- &chainntnfs.TxConfirmation{ + Tx: tx, + BlockHeight: uint32(height), + }: + case <-time.After(time.Second): + h.t.Fatalf("unable to send confirmation") + } +} + +// triggerReorg sends a negative confirmation (reorg) notification for the +// given transaction with the specified reorg depth. +func (h *chainWatcherTestHarness) triggerReorg(tx *wire.MsgTx, reorgDepth int32) { + h.t.Helper() + + // Find the confirmation event for this transaction. + txHash := tx.TxHash() + var confEvent *mockConfirmationEvent + for _, event := range h.notifier.confEvents { + if event.txid == txHash && !event.cancelled { + confEvent = event + break + } + } + + if confEvent == nil { + // The chain watcher might not have registered for confirmations yet. + // This is not necessarily an error in some test scenarios. + return + } + + // Send negative confirmation. + select { + case confEvent.negConfChan <- reorgDepth: + case <-time.After(time.Second): + h.t.Fatalf("unable to send negative confirmation") + } +} + +// mineBlocks advances the current block height. +func (h *chainWatcherTestHarness) mineBlocks(n int32) { + h.currentHeight += n +} + +// waitForCoopClose waits for a cooperative close event and returns it. +func (h *chainWatcherTestHarness) waitForCoopClose(timeout time.Duration) *CooperativeCloseInfo { + h.t.Helper() + + select { + case coopClose := <-h.chanEvents.CooperativeClosure: + return coopClose + case <-time.After(timeout): + h.t.Fatalf("didn't receive cooperative close event") + return nil + } +} + +// waitForConfRegistration waits for the chain watcher to register for +// confirmation notifications. +func (h *chainWatcherTestHarness) waitForConfRegistration() { + h.t.Helper() + + select { + case <-h.notifier.confRegistered: + // Registration complete. + case <-time.After(2 * time.Second): + // Not necessarily a failure - some tests don't register. + } +} + +// waitForSpendRegistration waits for the chain watcher to register for +// spend notifications. +func (h *chainWatcherTestHarness) waitForSpendRegistration() { + h.t.Helper() + + select { + case <-h.notifier.spendRegistered: + // Registration complete. + case <-time.After(2 * time.Second): + // Not necessarily a failure - some tests don't register. + } +} + +// assertCoopCloseTx asserts that the given cooperative close info matches +// the expected transaction. +func (h *chainWatcherTestHarness) assertCoopCloseTx( + closeInfo *CooperativeCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + if closeInfo.ClosingTXID != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, closeInfo.ClosingTXID) + } +} + +// assertNoCoopClose asserts that no cooperative close event is received +// within the given timeout. +func (h *chainWatcherTestHarness) assertNoCoopClose(timeout time.Duration) { + h.t.Helper() + + select { + case <-h.chanEvents.CooperativeClosure: + h.t.Fatalf("unexpected cooperative close event") + case <-time.After(timeout): + // Expected timeout. + } +} + +// runCoopCloseFlow runs a complete cooperative close flow including spend, +// optional reorg, and confirmation. This helper coordinates the timing +// between the different events. +func (h *chainWatcherTestHarness) runCoopCloseFlow( + tx *wire.MsgTx, shouldReorg bool, reorgDepth int32, + altTx *wire.MsgTx) *CooperativeCloseInfo { + + h.t.Helper() + + // Send initial spend notification. + // This will trigger handleCommitSpend which will detect the coop close + // and call waitForCoopCloseConfirmation (which blocks). + h.sendSpend(tx) + + // Wait for the chain watcher to register for confirmations. + // This happens inside waitForCoopCloseConfirmation. + h.waitForConfRegistration() + + if shouldReorg { + // Trigger reorg to unblock waitForCoopCloseConfirmation. + h.triggerReorg(tx, reorgDepth) + + // If we have an alternative transaction, send it. + if altTx != nil { + // After reorg, the chain watcher should re-register for + // ANY spend of the funding output. + h.waitForSpendRegistration() + + // Send alternative spend. + h.sendSpend(altTx) + + // Wait for it to register for confirmations. + h.waitForConfRegistration() + + // Confirm alternative transaction to unblock. + h.mineBlocks(1) + h.confirmTx(altTx, h.currentHeight) + } + } else { + // Normal confirmation flow - confirm to unblock waitForCoopCloseConfirmation. + h.mineBlocks(1) + h.confirmTx(tx, h.currentHeight) + } + + // Wait for cooperative close event. + return h.waitForCoopClose(5 * time.Second) +} + +// runMultipleReorgFlow simulates multiple consecutive reorganizations with +// different transactions confirming after each reorg. +func (h *chainWatcherTestHarness) runMultipleReorgFlow(txs []*wire.MsgTx, + reorgDepths []int32) *CooperativeCloseInfo { + + h.t.Helper() + + if len(txs) < 2 { + h.t.Fatalf("need at least 2 transactions for reorg flow") + } + if len(reorgDepths) != len(txs)-1 { + h.t.Fatalf("reorg depths must be one less than transactions") + } + + // Send initial spend. + h.sendSpend(txs[0]) + + // Process each reorg. + for i, depth := range reorgDepths { + // Wait for confirmation registration. + h.waitForConfRegistration() + + // Trigger reorg for current transaction. + h.triggerReorg(txs[i], depth) + + // Wait for re-registration for spend. + h.waitForSpendRegistration() + + // Send next transaction. + h.sendSpend(txs[i+1]) + } + + // Wait for final confirmation registration. + h.waitForConfRegistration() + + // Confirm the final transaction. + finalTx := txs[len(txs)-1] + h.mineBlocks(1) + h.confirmTx(finalTx, h.currentHeight) + + // Wait for cooperative close event. + return h.waitForCoopClose(10 * time.Second) +} + +// waitForRemoteUnilateralClose waits for a remote unilateral close event. +func (h *chainWatcherTestHarness) waitForRemoteUnilateralClose( + timeout time.Duration) *RemoteUnilateralCloseInfo { + + h.t.Helper() + + select { + case remoteClose := <-h.chanEvents.RemoteUnilateralClosure: + return remoteClose + case <-time.After(timeout): + h.t.Fatalf("didn't receive remote unilateral close event") + return nil + } +} + +// waitForLocalUnilateralClose waits for a local unilateral close event. +func (h *chainWatcherTestHarness) waitForLocalUnilateralClose( + timeout time.Duration) *LocalUnilateralCloseInfo { + + h.t.Helper() + + select { + case localClose := <-h.chanEvents.LocalUnilateralClosure: + return localClose + case <-time.After(timeout): + h.t.Fatalf("didn't receive local unilateral close event") + return nil + } +} + +// waitForBreach waits for a breach (contract breach) event. +func (h *chainWatcherTestHarness) waitForBreach( + timeout time.Duration) *BreachCloseInfo { + + h.t.Helper() + + select { + case breach := <-h.chanEvents.ContractBreach: + return breach + case <-time.After(timeout): + h.t.Fatalf("didn't receive contract breach event") + return nil + } +} + +// assertRemoteUnilateralCloseTx asserts that the given remote unilateral close +// info matches the expected transaction. +func (h *chainWatcherTestHarness) assertRemoteUnilateralCloseTx( + closeInfo *RemoteUnilateralCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + actualHash := closeInfo.UnilateralCloseSummary.SpendDetail.SpenderTxHash + if *actualHash != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, *actualHash) + } +} + +// assertLocalUnilateralCloseTx asserts that the given local unilateral close +// info matches the expected transaction. +func (h *chainWatcherTestHarness) assertLocalUnilateralCloseTx( + closeInfo *LocalUnilateralCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + actualHash := closeInfo.LocalForceCloseSummary.CloseTx.TxHash() + if actualHash != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, actualHash) + } +} + +// assertBreachTx asserts that the given breach info matches the expected +// transaction. +func (h *chainWatcherTestHarness) assertBreachTx( + breachInfo *BreachCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + if breachInfo.CommitHash != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, breachInfo.CommitHash) + } +} + +// createChannelCapacity returns a channel capacity suitable for testing +// scaled confirmations. +func createChannelCapacity(scale float64) btcutil.Amount { + // Use the maximum channel size as base. + maxSize := btcutil.Amount(16777215) // From lnwallet constants. + return btcutil.Amount(float64(maxSize) * scale) +} \ No newline at end of file From a706c99718a1548c1d06a34a009d04f20298640c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:51:30 -0700 Subject: [PATCH 12/16] contractcourt: update existing chain watcher tests due to new logic All the tests need to send a confirmation _after_ the spend is detected now. --- contractcourt/chain_watcher_test.go | 49 ++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index 2dc3605d39..c57859ca42 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -12,6 +12,7 @@ import ( "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/input" lnmock "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lnwallet" @@ -34,16 +35,19 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { // With the channels created, we'll now create a chain watcher instance // which will be watching for any closes of Alice's channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail, 1), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail, 1), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChannel.State(), notifier: aliceNotifier, signer: aliceChannel.Signer, extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: fn.Some(uint32(1)), }) require.NoError(t, err, "unable to create chain watcher") err = aliceChainWatcher.Start() @@ -90,6 +94,11 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { t.Fatalf("unable to send blockbeat") } + // Wait for the chain watcher to register for confirmations and send + // the confirmation. Since we set chanCloseConfs to 1, one confirmation + // is sufficient. + aliceNotifier.WaitForConfRegistrationAndSend(t) + // We should get a new spend event over the remote unilateral close // event channel. var uniClose *RemoteUnilateralCloseInfo @@ -144,16 +153,19 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { // With the channels created, we'll now create a chain watcher instance // which will be watching for any closes of Alice's channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChannel.State(), notifier: aliceNotifier, signer: aliceChannel.Signer, extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: fn.Some(uint32(1)), }) require.NoError(t, err, "unable to create chain watcher") if err := aliceChainWatcher.Start(); err != nil { @@ -219,6 +231,11 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { t.Fatalf("unable to send blockbeat") } + // Wait for the chain watcher to register for confirmations and send + // the confirmation. Since we set chanCloseConfs to 1, one confirmation + // is sufficient. + aliceNotifier.WaitForConfRegistrationAndSend(t) + // We should get a new spend event over the remote unilateral close // event channel. var uniClose *RemoteUnilateralCloseInfo @@ -331,10 +348,12 @@ func TestChainWatcherDataLossProtect(t *testing.T) { // With the channels created, we'll now create a chain watcher // instance which will be watching for any closes of Alice's // channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChanState, @@ -407,6 +426,8 @@ func TestChainWatcherDataLossProtect(t *testing.T) { t.Fatalf("unable to send blockbeat") } + aliceNotifier.WaitForConfRegistrationAndSend(t) + // We should get a new uni close resolution that indicates we // processed the DLP scenario. var uniClose *RemoteUnilateralCloseInfo @@ -532,10 +553,12 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) { // With the channels created, we'll now create a chain watcher // instance which will be watching for any closes of Alice's // channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChanState, @@ -604,6 +627,8 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) { t.Fatalf("unable to send blockbeat") } + aliceNotifier.WaitForConfRegistrationAndSend(t) + // We should get a local force close event from Alice as she // should be able to detect the close based on the commitment // outputs. From be476bf5f545dbf3acbacd72382e0d1d80dcf505 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:52:18 -0700 Subject: [PATCH 13/16] contractcourt: add unit tests for rbf re-org cases This set of new tests ensures that if have created N RBF variants of the coop close transaction, that any of then can confirm, and be re-org'd, with us detecting the final spend once it confirms deeploy enough. --- .../chain_watcher_coop_reorg_test.go | 325 ++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 contractcourt/chain_watcher_coop_reorg_test.go diff --git a/contractcourt/chain_watcher_coop_reorg_test.go b/contractcourt/chain_watcher_coop_reorg_test.go new file mode 100644 index 0000000000..9fd9ebc0d8 --- /dev/null +++ b/contractcourt/chain_watcher_coop_reorg_test.go @@ -0,0 +1,325 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/wire" +) + +// TestChainWatcherCoopCloseReorg tests that the chain watcher properly handles +// a reorganization during cooperative close confirmation waiting. When a +// cooperative close transaction is reorganized out, the chain watcher should +// re-register for spend notifications and detect an alternative transaction. +func TestChainWatcherCoopCloseReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create two cooperative close transactions with different fees. + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // Run cooperative close flow with reorg. + closeInfo := harness.runCoopCloseFlow(tx1, true, 2, tx2) + + // Assert that the second transaction was confirmed. + harness.assertCoopCloseTx(closeInfo, tx2) +} + +// TestChainWatcherCoopCloseSameTransactionAfterReorg tests that if the same +// transaction re-confirms after a reorganization, it is properly handled. +func TestChainWatcherCoopCloseSameTransactionAfterReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a single cooperative close transaction. + tx := harness.createCoopCloseTx(5000) + + // Run flow: send tx, trigger reorg, re-send same tx. + harness.sendSpend(tx) + + // Wait for confirmation registration and trigger reorg. + harness.waitForConfRegistration() + harness.mineBlocks(2) + harness.triggerReorg(tx, 2) + + // After reorg, wait for re-registration then re-send the same transaction. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + + // Confirm it. + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(tx, harness.currentHeight) + + // Wait for and verify cooperative close. + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, tx) +} + +// TestChainWatcherCoopCloseMultipleReorgs tests handling of multiple +// consecutive reorganizations during cooperative close confirmation. +func TestChainWatcherCoopCloseMultipleReorgs(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create multiple cooperative close transactions with different fees. + txs := []*wire.MsgTx{ + harness.createCoopCloseTx(5000), + harness.createCoopCloseTx(4950), + harness.createCoopCloseTx(4900), + harness.createCoopCloseTx(4850), + } + + // Define reorg depths for each transition. + reorgDepths := []int32{1, 2, 3} + + // Run multiple reorg flow. + closeInfo := harness.runMultipleReorgFlow(txs, reorgDepths) + + // Assert that the final transaction was confirmed. + harness.assertCoopCloseTx(closeInfo, txs[3]) +} + +// TestChainWatcherCoopCloseDeepReorg tests that the chain watcher can handle +// deep reorganizations where the reorg depth exceeds the required number of +// confirmations. +func TestChainWatcherCoopCloseDeepReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create two cooperative close transactions. + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // Run with a deep reorg (10 blocks). + closeInfo := harness.runCoopCloseFlow(tx1, true, 10, tx2) + + // Assert that the second transaction was confirmed after deep reorg. + harness.assertCoopCloseTx(closeInfo, tx2) +} + +// TestChainWatcherCoopCloseReorgNoAlternative tests that if a cooperative +// close is reorganized out and no alternative transaction appears, the +// chain watcher continues waiting. +func TestChainWatcherCoopCloseReorgNoAlternative(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a cooperative close transaction. + tx := harness.createCoopCloseTx(5000) + + // Send spend and wait for confirmation registration. + harness.sendSpend(tx) + harness.waitForConfRegistration() + + // Trigger reorg after some confirmations. + harness.mineBlocks(2) + harness.triggerReorg(tx, 2) + + // Assert no cooperative close event is received. + harness.assertNoCoopClose(2 * time.Second) + + // Now send a new transaction after the timeout. + harness.waitForSpendRegistration() + newTx := harness.createCoopCloseTx(4900) + harness.sendSpend(newTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(newTx, harness.currentHeight) + + // Should receive cooperative close for the new transaction. + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, newTx) +} + +// TestChainWatcherCoopCloseRBFAfterReorg tests that RBF cooperative close +// transactions are properly handled when reorganizations occur. +func TestChainWatcherCoopCloseRBFAfterReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a series of RBF transactions with increasing fees. + rbfTxs := make([]*wire.MsgTx, 3) + for i := range rbfTxs { + // Each transaction has higher fee (lower output). + outputValue := int64(5000 - i*100) + rbfTxs[i] = harness.createCoopCloseTx(outputValue) + } + + // Send first RBF transaction. + harness.sendSpend(rbfTxs[0]) + harness.waitForConfRegistration() + + // Trigger reorg. + harness.mineBlocks(1) + harness.triggerReorg(rbfTxs[0], 1) + + // Send second RBF transaction after re-registration. + harness.waitForSpendRegistration() + harness.sendSpend(rbfTxs[1]) + harness.waitForConfRegistration() + + // Another reorg. + harness.mineBlocks(1) + harness.triggerReorg(rbfTxs[1], 1) + + // Send final RBF transaction after re-registration. + harness.waitForSpendRegistration() + harness.sendSpend(rbfTxs[2]) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(rbfTxs[2], harness.currentHeight) + + // Should confirm the highest fee transaction. + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, rbfTxs[2]) +} + +// TestChainWatcherCoopCloseScaledConfirmationsWithReorg tests that scaled +// confirmations (based on channel capacity) work correctly with reorgs. +func TestChainWatcherCoopCloseScaledConfirmationsWithReorg(t *testing.T) { + t.Parallel() + + // Test with different channel capacities that require different + // confirmation counts. + testCases := []struct { + name string + capacityScale float64 + expectedConfs uint16 + reorgDepth int32 + }{ + { + name: "small_channel", + capacityScale: 0.1, + expectedConfs: 1, + reorgDepth: 1, + }, + { + name: "medium_channel", + capacityScale: 0.5, + expectedConfs: 3, + reorgDepth: 2, + }, + { + name: "large_channel", + capacityScale: 1.0, + expectedConfs: 6, + reorgDepth: 4, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Create harness with specific channel capacity. + harness := newChainWatcherTestHarness(t) + + // Create transactions. + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // Run with reorg at different depths based on capacity. + closeInfo := harness.runCoopCloseFlow( + tx1, true, tc.reorgDepth, tx2, + ) + + // Verify correct transaction confirmed. + harness.assertCoopCloseTx(closeInfo, tx2) + }) + } +} + +// TestChainWatcherCoopCloseReorgRaceCondition tests that the chain watcher +// correctly handles race conditions where multiple transactions might be +// in flight during reorganizations. +func TestChainWatcherCoopCloseReorgRaceCondition(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create multiple transactions. + txs := make([]*wire.MsgTx, 5) + for i := range txs { + txs[i] = harness.createCoopCloseTx(int64(5000 - i*50)) + } + + // Rapidly send multiple transactions and reorgs. + for i := 0; i < 3; i++ { + harness.sendSpend(txs[i]) + harness.waitForConfRegistration() + harness.mineBlocks(1) + + // Quick reorg. + harness.triggerReorg(txs[i], 1) + + // Wait for re-registration. + harness.waitForSpendRegistration() + } + + // Eventually send and confirm a final transaction. + finalTx := txs[4] + harness.sendSpend(finalTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(finalTx, harness.currentHeight) + + // Should eventually settle on the final transaction. + closeInfo := harness.waitForCoopClose(10 * time.Second) + harness.assertCoopCloseTx(closeInfo, finalTx) +} + +// TestChainWatcherCoopCloseReorgErrorHandling tests that errors during +// re-registration after reorg are properly handled. +func TestChainWatcherCoopCloseReorgErrorHandling(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a cooperative close transaction. + tx := harness.createCoopCloseTx(5000) + + // Send spend notification. + harness.sendSpend(tx) + + // Trigger multiple rapid reorgs to stress error handling. + for i := 0; i < 5; i++ { + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.triggerReorg(tx, int32(i+1)) + if i < 4 { + // Re-register for spend after each reorg except the + // last. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + } + } + + // After stress, send a clean transaction. + harness.waitForSpendRegistration() + cleanTx := harness.createCoopCloseTx(4800) + harness.sendSpend(cleanTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(cleanTx, harness.currentHeight) + + // Should still receive the cooperative close. + closeInfo := harness.waitForCoopClose(10 * time.Second) + harness.assertCoopCloseTx(closeInfo, cleanTx) +} From 2666c71f8132b2a94e8b7bb90000b65ece8b022e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:55:26 -0700 Subject: [PATCH 14/16] contractcourt: add generic close re-org tests In this commit, we add a set of generic close re-org tests. The most important test is the property based test, they will randomly confirm transactions, generate a re-org, then assert that eventually we dtect the final version. --- contractcourt/chain_watcher_reorg_test.go | 388 ++++++++++++++++++++++ 1 file changed, 388 insertions(+) create mode 100644 contractcourt/chain_watcher_reorg_test.go diff --git a/contractcourt/chain_watcher_reorg_test.go b/contractcourt/chain_watcher_reorg_test.go new file mode 100644 index 0000000000..34fb71ed9b --- /dev/null +++ b/contractcourt/chain_watcher_reorg_test.go @@ -0,0 +1,388 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/wire" + "pgregory.net/rapid" +) + +// closeType represents the type of channel close for testing purposes. +type closeType int + +const ( + // closeTypeCoop represents a cooperative channel close. + closeTypeCoop closeType = iota + + // closeTypeRemoteUnilateral represents a remote unilateral close + // (remote party broadcasting their commitment). + closeTypeRemoteUnilateral + + // closeTypeLocalForce represents a local force close (us broadcasting + // our commitment). + closeTypeLocalForce + + // closeTypeBreach represents a breach (remote party broadcasting a + // revoked commitment). + closeTypeBreach +) + +// String returns a string representation of the close type. +func (c closeType) String() string { + switch c { + case closeTypeCoop: + return "cooperative" + case closeTypeRemoteUnilateral: + return "remote_unilateral" + case closeTypeLocalForce: + return "local_force" + case closeTypeBreach: + return "breach" + default: + return "unknown" + } +} + +// createCloseTx creates a close transaction of the specified type using the +// harness. +func createCloseTx(h *chainWatcherTestHarness, ct closeType, + outputValue int64) *wire.MsgTx { + + switch ct { + case closeTypeCoop: + return h.createCoopCloseTx(outputValue) + case closeTypeRemoteUnilateral: + return h.createRemoteUnilateralCloseTx() + case closeTypeLocalForce: + return h.createLocalForceCloseTx() + case closeTypeBreach: + return h.createBreachCloseTx() + default: + h.t.Fatalf("unknown close type: %v", ct) + return nil + } +} + +// waitForCloseEvent waits for the appropriate close event based on close type. +func waitForCloseEvent(h *chainWatcherTestHarness, ct closeType, + timeout time.Duration) any { + + switch ct { + case closeTypeCoop: + return h.waitForCoopClose(timeout) + case closeTypeRemoteUnilateral: + return h.waitForRemoteUnilateralClose(timeout) + case closeTypeLocalForce: + return h.waitForLocalUnilateralClose(timeout) + case closeTypeBreach: + return h.waitForBreach(timeout) + default: + h.t.Fatalf("unknown close type: %v", ct) + return nil + } +} + +// assertCloseEventTx asserts that the close event matches the expected +// transaction based on close type. +func assertCloseEventTx(h *chainWatcherTestHarness, ct closeType, + event any, expectedTx *wire.MsgTx) { + + switch ct { + case closeTypeCoop: + h.assertCoopCloseTx(event.(*CooperativeCloseInfo), expectedTx) + + case closeTypeRemoteUnilateral: + h.assertRemoteUnilateralCloseTx( + event.(*RemoteUnilateralCloseInfo), expectedTx, + ) + + case closeTypeLocalForce: + h.assertLocalUnilateralCloseTx( + event.(*LocalUnilateralCloseInfo), expectedTx, + ) + + case closeTypeBreach: + h.assertBreachTx(event.(*BreachCloseInfo), expectedTx) + + default: + h.t.Fatalf("unknown close type: %v", ct) + } +} + +// generateAltTxsForReorgs generates alternative transactions for reorg +// scenarios. For commitment-based closes (breach, remote/local force), the same +// tx is reused since we can only have one commitment tx per channel state. For +// coop closes, new transactions with different output values are created. +func generateAltTxsForReorgs(h *chainWatcherTestHarness, ct closeType, + originalTx *wire.MsgTx, numReorgs int, sameTxAtEnd bool) []*wire.MsgTx { + + altTxs := make([]*wire.MsgTx, numReorgs) + + for i := 0; i < numReorgs; i++ { + switch ct { + case closeTypeBreach, closeTypeRemoteUnilateral, + closeTypeLocalForce: + + // Non-coop closes can only have one commitment tx, so + // all reorgs use the same transaction. + altTxs[i] = originalTx + + case closeTypeCoop: + if i == numReorgs-1 && sameTxAtEnd { + // Last reorg goes back to original transaction. + altTxs[i] = originalTx + } else { + // Create different coop close tx with different + // output value to make it unique. + outputValue := int64(5000 - (i+1)*100) + altTxs[i] = createCloseTx(h, ct, outputValue) + } + } + } + + return altTxs +} + +// testReorgProperties is the main property-based test for reorg handling +// across all close types. +// +// The testingT parameter is captured from the outer test function and used +// for operations that require *testing.T (like channel creation), while the +// rapid.T is used for all test reporting and property generation. +func testReorgProperties(testingT *testing.T) func(*rapid.T) { + return func(t *rapid.T) { + // Generate random close type. + allCloseTypes := []closeType{ + closeTypeCoop, + closeTypeRemoteUnilateral, + closeTypeLocalForce, + closeTypeBreach, + } + ct := rapid.SampledFrom(allCloseTypes).Draw(t, "closeType") + + // Generate random number of required confirmations (2-6). We + // use at least 2 so we have room for reorgs during + // confirmation. + requiredConfs := rapid.IntRange(2, 6).Draw(t, "requiredConfs") + + // Generate number of reorgs (1-3 to keep test runtime + // reasonable). + numReorgs := rapid.IntRange(1, 3).Draw(t, "numReorgs") + + // Generate whether the final transaction is the same as the + // original. + sameTxAtEnd := rapid.Bool().Draw(t, "sameTxAtEnd") + + // Log test parameters for debugging. + t.Logf("Testing %s close with %d confs, %d reorgs, "+ + "sameTxAtEnd=%v", + ct, requiredConfs, numReorgs, sameTxAtEnd) + + // Create test harness using both the concrete *testing.T for + // channel creation and the rapid.T for test reporting. + harness := newChainWatcherTestHarnessFromReporter( + testingT, t, withRequiredConfs(uint32(requiredConfs)), + ) + + // Create initial transaction. + tx1 := createCloseTx(harness, ct, 5000) + + // Generate alternative transactions for each reorg. + altTxs := generateAltTxsForReorgs( + harness, ct, tx1, numReorgs, sameTxAtEnd, + ) + + // Send the initial spend. + harness.sendSpend(tx1) + harness.waitForConfRegistration() + + // Execute the set of re-orgs, based on our random sample, we'll + // mine N blocks, do a re-org of size N, then wait for + // detection, and repeat. + for i := 0; i < numReorgs; i++ { + // Generate random reorg depth (1 to requiredConfs-1). + // We cap it to avoid reorging too far back. + reorgDepth := rapid.IntRange( + 1, requiredConfs-1, + ).Draw(t, "reorgDepth") + + // Mine some blocks (but less than required confs). + blocksToMine := rapid.IntRange( + 1, requiredConfs-1, + ).Draw(t, "blocksToMine") + harness.mineBlocks(int32(blocksToMine)) + + // Trigger reorg. + if i == 0 { + harness.triggerReorg( + tx1, int32(reorgDepth), + ) + } else { + harness.triggerReorg( + altTxs[i-1], int32(reorgDepth), + ) + } + + harness.waitForSpendRegistration() + + harness.sendSpend(altTxs[i]) + harness.waitForConfRegistration() + } + + // Mine enough blocks to confirm final transaction. + harness.mineBlocks(1) + finalTx := altTxs[numReorgs-1] + harness.confirmTx(finalTx, harness.currentHeight) + + // Wait for and verify close event. + event := waitForCloseEvent(harness, ct, 10*time.Second) + assertCloseEventTx(harness, ct, event, finalTx) + } +} + +// TestChainWatcherReorgAllCloseTypes runs property-based tests for reorg +// handling across all channel close types. It generates random combinations of: +// - Close type (coop, remote unilateral, local force, breach) +// - Number of confirmations required (2-6) +// - Number of reorgs (1-3) +// - Whether the final tx is same as original or different +func TestChainWatcherReorgAllCloseTypes(t *testing.T) { + t.Parallel() + + rapid.Check(t, testReorgProperties(t)) +} + +// TestRemoteUnilateralCloseWithSingleReorg tests that a remote unilateral +// close is properly handled when a single reorg occurs during confirmation. +func TestRemoteUnilateralCloseWithSingleReorg(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // Create two remote unilateral close transactions. + // Since these are commitment transactions, we can only have one per + // state, so we'll use the current one as tx1. + tx1 := harness.createRemoteUnilateralCloseTx() + + // Advance channel state to get a different commitment. + _ = harness.createBreachCloseTx() + tx2 := harness.createRemoteUnilateralCloseTx() + + // Send initial spend. + harness.sendSpend(tx1) + harness.waitForConfRegistration() + + // Mine a block and trigger reorg. + harness.mineBlocks(1) + harness.triggerReorg(tx1, 1) + + // Send alternative transaction after reorg. + harness.waitForSpendRegistration() + harness.sendSpend(tx2) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(tx2, harness.currentHeight) + + // Verify correct event. + closeInfo := harness.waitForRemoteUnilateralClose(5 * time.Second) + harness.assertRemoteUnilateralCloseTx(closeInfo, tx2) +} + +// TestLocalForceCloseWithMultipleReorgs tests that a local force close is +// properly handled through multiple consecutive reorgs. +func TestLocalForceCloseWithMultipleReorgs(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // For local force close, we can only broadcast our current commitment. + // We'll simulate multiple reorgs where the same tx keeps getting + // reorganized out and re-broadcast. + tx := harness.createLocalForceCloseTx() + + // First spend and reorg. + harness.sendSpend(tx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.triggerReorg(tx, 1) + + // Second spend and reorg. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.triggerReorg(tx, 1) + + // Third spend - this one confirms. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(tx, harness.currentHeight) + + // Verify correct event. + closeInfo := harness.waitForLocalUnilateralClose(5 * time.Second) + harness.assertLocalUnilateralCloseTx(closeInfo, tx) +} + +// TestBreachCloseWithDeepReorg tests that a breach (revoked commitment) is +// properly detected after a deep reorganization. +func TestBreachCloseWithDeepReorg(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // Create a revoked commitment transaction. + revokedTx := harness.createBreachCloseTx() + + // Send spend and wait for confirmation registration. + harness.sendSpend(revokedTx) + harness.waitForConfRegistration() + + // Mine several blocks and then trigger a deep reorg. + harness.mineBlocks(5) + harness.triggerReorg(revokedTx, 5) + + // Re-broadcast same transaction after reorg. + harness.waitForSpendRegistration() + harness.sendSpend(revokedTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(revokedTx, harness.currentHeight) + + // Verify breach detection. + breachInfo := harness.waitForBreach(5 * time.Second) + harness.assertBreachTx(breachInfo, revokedTx) +} + +// TestCoopCloseReorgToForceClose tests the edge case where a cooperative +// close gets reorged out and is replaced by a force close. +func TestCoopCloseReorgToForceClose(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // Create a cooperative close and a force close transaction. + coopTx := harness.createCoopCloseTx(5000) + forceTx := harness.createRemoteUnilateralCloseTx() + + // Send cooperative close. + harness.sendSpend(coopTx) + harness.waitForConfRegistration() + + // Trigger reorg that removes coop close. + harness.mineBlocks(1) + harness.triggerReorg(coopTx, 1) + + // Send force close as alternative. + harness.waitForSpendRegistration() + harness.sendSpend(forceTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(forceTx, harness.currentHeight) + + // Should receive remote unilateral close event, not coop close. + closeInfo := harness.waitForRemoteUnilateralClose(5 * time.Second) + harness.assertRemoteUnilateralCloseTx(closeInfo, forceTx) +} From 29454c2fdfed110835eb14948aa49af163ad8522 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Oct 2025 16:55:48 -0700 Subject: [PATCH 15/16] itest: add new coop close rbf itest This ensures that during the RBF process, if one confirms, a re-org occurs, then another confirms, that we'll properly detect this case. --- itest/list_on_test.go | 4 + itest/lnd_coop_close_rbf_test.go | 175 +++++++++++++++++++++++++++++++ itest/lnd_funding_test.go | 18 +++- 3 files changed, 195 insertions(+), 2 deletions(-) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 92c6547b3a..ec608c629d 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -727,6 +727,10 @@ var allTestCases = []*lntest.TestCase{ Name: "rbf coop close disconnect", TestFunc: testRBFCoopCloseDisconnect, }, + { + Name: "coop close rbf with reorg", + TestFunc: testCoopCloseRBFWithReorg, + }, { Name: "bump fee low budget", TestFunc: testBumpFeeLowBudget, diff --git a/itest/lnd_coop_close_rbf_test.go b/itest/lnd_coop_close_rbf_test.go index 5f8b15d405..57ebe0003f 100644 --- a/itest/lnd_coop_close_rbf_test.go +++ b/itest/lnd_coop_close_rbf_test.go @@ -1,8 +1,13 @@ package itest import ( + "fmt" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/stretchr/testify/require" ) @@ -153,3 +158,173 @@ func testRBFCoopCloseDisconnect(ht *lntest.HarnessTest) { // Disconnect Bob from Alice. ht.DisconnectNodes(alice, bob) } + +// testCoopCloseRBFWithReorg tests that when a cooperative close transaction +// is reorganized out during confirmation waiting, the system properly handles +// RBF replacements and re-registration for any spend of the funding output. +func testCoopCloseRBFWithReorg(ht *lntest.HarnessTest) { + // Skip this test for neutrino backend as we can't trigger reorgs. + if ht.IsNeutrinoBackend() { + ht.Skipf("skipping reorg test for neutrino backend") + } + + // Force cooperative close to require 3 confirmations for predictable + // testing. + const requiredConfs = 3 + rbfCoopFlags := []string{ + "--protocol.rbf-coop-close", + "--dev.force-channel-close-confs=3", + } + + // Set the fee estimate to 1sat/vbyte to ensure our RBF attempts work. + ht.SetFeeEstimate(250) + ht.SetFeeEstimateWithConf(250, 6) + + // Create two nodes with enough coins for a 50/50 channel. + cfgs := [][]string{rbfCoopFlags, rbfCoopFlags} + params := lntest.OpenChannelParams{ + Amt: btcutil.Amount(10_000_000), + PushAmt: btcutil.Amount(5_000_000), + } + chanPoints, nodes := ht.CreateSimpleNetwork(cfgs, params) + alice, bob := nodes[0], nodes[1] + chanPoint := chanPoints[0] + + // Initiate cooperative close with initial fee rate of 5 sat/vb. + initialFeeRate := chainfee.SatPerVByte(5) + _, aliceCloseUpdate := ht.CloseChannelAssertPending( + alice, chanPoint, false, + lntest.WithCoopCloseFeeRate(initialFeeRate), + lntest.WithLocalTxNotify(), + ) + + // Verify the initial close transaction is at the expected fee rate. + alicePendingUpdate := aliceCloseUpdate.GetClosePending() + require.NotNil(ht, aliceCloseUpdate) + require.Equal( + ht, int64(initialFeeRate), alicePendingUpdate.FeePerVbyte, + ) + + // Capture the initial close transaction from the mempool. + initialCloseTxid, err := chainhash.NewHash(alicePendingUpdate.Txid) + require.NoError(ht, err) + initialCloseTx := ht.AssertTxInMempool(*initialCloseTxid) + + // Create first RBF replacement before any mining. + firstRbfFeeRate := chainfee.SatPerVByte(10) + _, firstRbfUpdate := ht.CloseChannelAssertPending( + bob, chanPoint, false, + lntest.WithCoopCloseFeeRate(firstRbfFeeRate), + lntest.WithLocalTxNotify(), + ) + + // Capture the first RBF transaction. + firstRbfTxid, err := chainhash.NewHash(firstRbfUpdate.GetClosePending().Txid) + require.NoError(ht, err) + firstRbfTx := ht.AssertTxInMempool(*firstRbfTxid) + + _, bestHeight, err := ht.Miner().Client.GetBestBlock() + require.NoError(ht, err) + + ht.Logf("Current block height: %d", bestHeight) + + // Mine n-1 blocks (2 blocks when requiring 3 confirmations) with the + // first RBF transaction. This is just shy of full confirmation. + block1 := ht.Miner().MineBlockWithTxes( + []*btcutil.Tx{btcutil.NewTx(firstRbfTx)}, + ) + + ht.Logf("Mined block %d with first RBF tx", bestHeight+1) + + block2 := ht.MineEmptyBlocks(1)[0] + + ht.Logf("Mined block %d", bestHeight+2) + + ht.Logf("Re-orging two blocks to remove first RBF tx") + + // Trigger a reorganization that removes the last 2 blocks. This is safe + // because we haven't reached full confirmation yet. + bestBlockHash := block2.Header.BlockHash() + require.NoError( + ht, ht.Miner().Client.InvalidateBlock(&bestBlockHash), + ) + bestBlockHash = block1.Header.BlockHash() + require.NoError( + ht, ht.Miner().Client.InvalidateBlock(&bestBlockHash), + ) + + _, bestHeight, err = ht.Miner().Client.GetBestBlock() + require.NoError(ht, err) + ht.Logf("Re-orged to block height: %d", bestHeight) + + ht.Log("Mining blocks to surpass previous chain") + + // Mine 2 empty blocks to trigger the reorg on the nodes. + ht.MineEmptyBlocks(2) + + _, bestHeight, err = ht.Miner().Client.GetBestBlock() + require.NoError(ht, err) + ht.Logf("Mined blocks to reach height: %d", bestHeight) + + // Now, instead of mining the second RBF, mine the INITIAL transaction + // to test that the system can handle any valid spend of the funding + // output. + block := ht.Miner().MineBlockWithTxes( + []*btcutil.Tx{btcutil.NewTx(initialCloseTx)}, + ) + ht.AssertTxInBlock(block, *initialCloseTxid) + + // Mine additional blocks to reach the required confirmations (3 total). + ht.MineEmptyBlocks(requiredConfs - 1) + + // Both parties should see that the channel is now fully closed on chain + // with the expected closing txid. + expectedClosingTxid := initialCloseTxid.String() + err = wait.NoError(func() error { + req := &lnrpc.ClosedChannelsRequest{} + aliceClosedChans := alice.RPC.ClosedChannels(req) + bobClosedChans := bob.RPC.ClosedChannels(req) + if len(aliceClosedChans.Channels) != 1 { + return fmt.Errorf("alice: expected 1 closed chan, got %d", + len(aliceClosedChans.Channels)) + } + if len(bobClosedChans.Channels) != 1 { + return fmt.Errorf("bob: expected 1 closed chan, got %d", + len(bobClosedChans.Channels)) + } + + // Verify both Alice and Bob have the expected closing txid. + aliceClosedChan := aliceClosedChans.Channels[0] + if aliceClosedChan.ClosingTxHash != expectedClosingTxid { + return fmt.Errorf("alice: expected closing txid %s, "+ + "got %s", + expectedClosingTxid, + aliceClosedChan.ClosingTxHash) + } + if aliceClosedChan.CloseType != + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE { + return fmt.Errorf("alice: expected cooperative "+ + "close, got %v", + aliceClosedChan.CloseType) + } + + bobClosedChan := bobClosedChans.Channels[0] + if bobClosedChan.ClosingTxHash != expectedClosingTxid { + return fmt.Errorf("bob: expected closing txid %s, "+ + "got %s", + expectedClosingTxid, + bobClosedChan.ClosingTxHash) + } + if bobClosedChan.CloseType != + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE { + return fmt.Errorf("bob: expected cooperative "+ + "close, got %v", + bobClosedChan.CloseType) + } + + return nil + }, defaultTimeout) + require.NoError(ht, err) + + ht.Logf("Successfully verified closing txid: %s", expectedClosingTxid) +} diff --git a/itest/lnd_funding_test.go b/itest/lnd_funding_test.go index b6734e032d..2c1daf53d2 100644 --- a/itest/lnd_funding_test.go +++ b/itest/lnd_funding_test.go @@ -1272,8 +1272,17 @@ func testChannelFundingWithUnstableUtxos(ht *lntest.HarnessTest) { // Make sure Carol sees her to_remote output from the force close tx. ht.AssertNumPendingSweeps(carol, 1) - // We need to wait for carol initiating the sweep of the to_remote - // output of chanPoint2. + // Wait for Carol's sweep transaction to appear in the mempool. Due to + // async confirmation notifications, there's a race between when the + // sweep is registered and when the sweeper processes the next block. + // The sweeper uses immediate=false, so it broadcasts on the next block + // after registration. Mine an empty block to trigger the broadcast. + ht.MineEmptyBlocks(1) + + // Now the sweep should be in the mempool. + ht.AssertNumTxsInMempool(1) + + // Now we should see the unconfirmed UTXO from the sweep. utxo := ht.AssertNumUTXOsUnconfirmed(carol, 1)[0] // We now try to open channel using the unconfirmed utxo. @@ -1329,6 +1338,11 @@ func testChannelFundingWithUnstableUtxos(ht *lntest.HarnessTest) { // Make sure Carol sees her to_remote output from the force close tx. ht.AssertNumPendingSweeps(carol, 1) + // Mine an empty block to trigger the sweep broadcast (same fix as + // above). + ht.MineEmptyBlocks(1) + ht.AssertNumTxsInMempool(1) + // Wait for the to_remote sweep tx to show up in carol's wallet. ht.AssertNumUTXOsUnconfirmed(carol, 1) From 8b565d64d93df94102de4b6ff0bcf8dc42e49195 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 30 Oct 2025 16:32:57 -0700 Subject: [PATCH 16/16] contractcourt: add sync dispatch fast-path for single confirmation closes In this commit, we add a fast-path optimization to the chain watcher's closeObserver that immediately dispatches close events when only a single confirmation is required (numConfs == 1). This addresses a timing issue with integration tests that were designed around the old synchronous blockbeat behavior, where close events were dispatched immediately upon spend detection. The recent async confirmation architecture (introduced in commit f6f716ab7) properly handles reorgs by waiting for N confirmations before dispatching close events. However, this created a race condition in integration tests that mine blocks synchronously and expect immediate close notifications. With the build tag setting numConfs to 1 for itests, the async confirmation notification could arrive after the test already started waiting for the close event, causing timeouts. We introduce a new handleSpendDispatch method that checks if numConfs == 1 and, if so, immediately calls handleCommitSpend to dispatch the close event synchronously, then returns true to skip the async state machine. This preserves the old behavior for integration tests while maintaining the full async reorg protection for production (where numConfs >= 3). The implementation adds the fast-path check in both spend detection paths (blockbeat and spend notification) to ensure consistent behavior regardless of which detects the spend first. We also update the affected unit tests to remove their expectation of confirmation registration, since the fast-path bypasses that step entirely. This approach optimizes for the integration test scenario without compromising production safety, as the fast-path only activates when a single confirmation is sufficient - a configuration that only exists in the controlled test environment. --- contractcourt/chain_watcher.go | 44 +++++++++++++++++++++++++++++ contractcourt/chain_watcher_test.go | 14 ++++----- 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 234fc7c38b..9fd70fe2c9 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -693,6 +693,12 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) { // - Pending (confNtfn != nil): Spend detected, waiting for N confirmations // // - Confirmed: Spend confirmed with N blocks, close has been processed +// +// For single-confirmation scenarios (numConfs == 1), we bypass the async state +// machine and immediately dispatch close events upon spend detection. This +// provides synchronous behavior for integration tests which expect immediate +// notifications. For multi-confirmation scenarios (production with numConfs >= 3), +// we use the full async state machine with reorg protection. func (c *chainWatcher) closeObserver() { defer c.wg.Done() @@ -803,6 +809,13 @@ func (c *chainWatcher) closeObserver() { continue } + // FAST PATH: Check if we should dispatch immediately for + // single-confirmation scenarios. + if c.handleSpendDispatch(spend, "blockbeat") { + continue + } + + // ASYNC PATH: Multiple confirmations (production). // STATE TRANSITION: None -> Pending (from blockbeat). log.Infof("ChannelPoint(%v): detected spend from "+ "blockbeat, transitioning to %v", @@ -826,6 +839,13 @@ func (c *chainWatcher) closeObserver() { return } + // FAST PATH: Check if we should dispatch immediately for + // single-confirmation scenarios. + if c.handleSpendDispatch(spend, "spend notification") { + continue + } + + // ASYNC PATH: Multiple confirmations (production). log.Infof("ChannelPoint(%v): detected spend from "+ "notification, transitioning to %v", c.cfg.chanState.FundingOutpoint, @@ -1582,6 +1602,30 @@ func deriveFundingPkScript(chanState *channeldb.OpenChannel) ([]byte, error) { return fundingPkScript, nil } +// handleSpendDispatch processes a detected spend. For single-confirmation +// scenarios (numConfs == 1), it immediately dispatches the close event and +// returns true. For multi-confirmation scenarios, it returns false, indicating +// the caller should proceed with the async state machine. +func (c *chainWatcher) handleSpendDispatch(spend *chainntnfs.SpendDetail, + source string) bool { + + numConfs := c.requiredConfsForSpend() + if numConfs == 1 { + log.Infof("ChannelPoint(%v): single confirmation mode, "+ + "dispatching immediately from %s", + c.cfg.chanState.FundingOutpoint, source) + + err := c.handleCommitSpend(spend) + if err != nil { + log.Errorf("Failed to handle commit spend: %v", err) + } + + return true + } + + return false +} + // handleCommitSpend takes a spending tx of the funding output and handles the // channel close based on the closure type. func (c *chainWatcher) handleCommitSpend( diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index c57859ca42..8275886a14 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -94,10 +94,9 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { t.Fatalf("unable to send blockbeat") } - // Wait for the chain watcher to register for confirmations and send - // the confirmation. Since we set chanCloseConfs to 1, one confirmation - // is sufficient. - aliceNotifier.WaitForConfRegistrationAndSend(t) + // With chanCloseConfs set to 1, the fast-path dispatches immediately + // without confirmation registration. The close event should arrive + // directly after processing the blockbeat. // We should get a new spend event over the remote unilateral close // event channel. @@ -231,10 +230,9 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { t.Fatalf("unable to send blockbeat") } - // Wait for the chain watcher to register for confirmations and send - // the confirmation. Since we set chanCloseConfs to 1, one confirmation - // is sufficient. - aliceNotifier.WaitForConfRegistrationAndSend(t) + // With chanCloseConfs set to 1, the fast-path dispatches immediately + // without confirmation registration. The close event should arrive + // directly after processing the blockbeat. // We should get a new spend event over the remote unilateral close // event channel.