@@ -88,6 +88,38 @@ type BreachCloseInfo struct {
8888 CloseSummary channeldb.ChannelCloseSummary
8989}
9090
91+ // spendConfirmationState represents the state of spend confirmation tracking
92+ // in the closeObserver state machine. We wait for N confirmations before
93+ // processing any spend to protect against shallow reorgs.
94+ type spendConfirmationState uint8
95+
96+ const (
97+ // spendStateNone indicates no spend has been detected yet.
98+ spendStateNone spendConfirmationState = iota
99+
100+ // spendStatePending indicates a spend has been detected and we're
101+ // waiting for the required number of confirmations.
102+ spendStatePending
103+
104+ // spendStateConfirmed indicates the spend has reached the required
105+ // confirmations and has been processed.
106+ spendStateConfirmed
107+ )
108+
109+ // String returns a human-readable representation of the state.
110+ func (s spendConfirmationState ) String () string {
111+ switch s {
112+ case spendStateNone :
113+ return "None"
114+ case spendStatePending :
115+ return "Pending"
116+ case spendStateConfirmed :
117+ return "Confirmed"
118+ default :
119+ return "Unknown"
120+ }
121+ }
122+
91123// CommitSet is a collection of the set of known valid commitments at a given
92124// instant. If ConfCommitKey is set, then the commitment identified by the
93125// HtlcSetKey has hit the chain. This struct will be used to examine all live
@@ -652,51 +684,225 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) {
652684}
653685
654686// closeObserver is a dedicated goroutine that will watch for any closes of the
655- // channel that it's watching on chain. In the event of an on-chain event, the
656- // close observer will assembled the proper materials required to claim the
657- // funds of the channel on-chain (if required), then dispatch these as
658- // notifications to all subscribers.
687+ // channel that it's watching on chain. It implements a state machine to handle
688+ // spend detection and confirmation with reorg protection. The states are:
689+ //
690+ // - None (confNtfn == nil): No spend detected yet, waiting for spend
691+ // notification
692+ //
693+ // - Pending (confNtfn != nil): Spend detected, waiting for N confirmations
694+ //
695+ // - Confirmed: Spend confirmed with N blocks, close has been processed
659696func (c * chainWatcher ) closeObserver () {
660697 defer c .wg .Done ()
661- defer c .fundingSpendNtfn .Cancel ()
698+
699+ registerForSpend := func () (* chainntnfs.SpendEvent , error ) {
700+ fundingPkScript , err := deriveFundingPkScript (c .cfg .chanState )
701+ if err != nil {
702+ return nil , err
703+ }
704+
705+ heightHint := c .cfg .chanState .DeriveHeightHint ()
706+
707+ return c .cfg .notifier .RegisterSpendNtfn (
708+ & c .cfg .chanState .FundingOutpoint ,
709+ fundingPkScript ,
710+ heightHint ,
711+ )
712+ }
713+
714+ spendNtfn := c .fundingSpendNtfn
715+ defer spendNtfn .Cancel ()
716+
717+ // We use these variables to implement a state machine to track the
718+ // state of the spend confirmation process:
719+ // * When confNtfn is nil, we're in state "None" waiting for a spend.
720+ // * When confNtfn is set, we're in state "Pending" waiting for
721+ // confirmations.
722+ //
723+ // After confirmations, we transition to state "Confirmed" and clean up.
724+ var (
725+ pendingSpend * chainntnfs.SpendDetail
726+ confNtfn * chainntnfs.ConfirmationEvent
727+ )
662728
663729 log .Infof ("Close observer for ChannelPoint(%v) active" ,
664730 c .cfg .chanState .FundingOutpoint )
665731
732+ // handleSpendDetection processes a newly detected spend by registering
733+ // for confirmations. Returns the new confNtfn or error.
734+ handleSpendDetection := func (
735+ spend * chainntnfs.SpendDetail ,
736+ ) (* chainntnfs.ConfirmationEvent , error ) {
737+
738+ // If we already have a pending spend, check if it's the same
739+ // transaction. This can happen if both the spend notification
740+ // and blockbeat detect the same spend.
741+ if pendingSpend != nil {
742+ if * pendingSpend .SpenderTxHash == * spend .SpenderTxHash {
743+ log .Debugf ("ChannelPoint(%v): ignoring " +
744+ "duplicate spend detection for tx %v" ,
745+ c .cfg .chanState .FundingOutpoint ,
746+ spend .SpenderTxHash )
747+ return confNtfn , nil
748+ }
749+
750+ // Different spend detected. Cancel existing confNtfn
751+ // and replace with new one.
752+ log .Warnf ("ChannelPoint(%v): detected different " +
753+ "spend tx %v, replacing pending tx %v" ,
754+ c .cfg .chanState .FundingOutpoint ,
755+ spend .SpenderTxHash ,
756+ pendingSpend .SpenderTxHash )
757+
758+ if confNtfn != nil {
759+ confNtfn .Cancel ()
760+ }
761+ }
762+
763+ numConfs := c .requiredConfsForSpend ()
764+ txid := spend .SpenderTxHash
765+
766+ newConfNtfn , err := c .cfg .notifier .RegisterConfirmationsNtfn (
767+ txid , spend .SpendingTx .TxOut [0 ].PkScript ,
768+ numConfs , uint32 (spend .SpendingHeight ),
769+ )
770+ if err != nil {
771+ return nil , fmt .Errorf ("register confirmations: %w" ,
772+ err )
773+ }
774+
775+ log .Infof ("ChannelPoint(%v): waiting for %d confirmations " +
776+ "of spend tx %v" , c .cfg .chanState .FundingOutpoint ,
777+ numConfs , txid )
778+
779+ return newConfNtfn , nil
780+ }
781+
666782 for {
783+ // We only listen to confirmation channels when we have a
784+ // pending spend. By setting these to nil when not needed, Go's
785+ // select ignores those cases, effectively implementing our
786+ // state machine.
787+ var (
788+ confChan <- chan * chainntnfs.TxConfirmation
789+ negativeConfChan <- chan int32
790+ )
791+ if confNtfn != nil {
792+ confChan = confNtfn .Confirmed
793+ negativeConfChan = confNtfn .NegativeConf
794+ }
795+
667796 select {
668- // A new block is received, we will check whether this block
669- // contains a spending tx that we are interested in.
670797 case beat := <- c .BlockbeatChan :
671798 log .Debugf ("ChainWatcher(%v) received blockbeat %v" ,
672799 c .cfg .chanState .FundingOutpoint , beat .Height ())
673800
674- // Process the block.
675- c .handleBlockbeat (beat )
676-
677- // If the funding outpoint is spent, we now go ahead and handle
678- // it. Note that we cannot rely solely on the `block` event
679- // above to trigger a close event, as deep down, the receiving
680- // of block notifications and the receiving of spending
681- // notifications are done in two different goroutines, so the
682- // expected order: [receive block -> receive spend] is not
683- // guaranteed .
684- case spend , ok := <- c .fundingSpendNtfn .Spend :
685- // If the channel was closed, then this means that the
686- // notifier exited, so we will as well.
801+ spend := c .handleBlockbeat (beat )
802+ if spend == nil {
803+ continue
804+ }
805+
806+ // STATE TRANSITION: None -> Pending (from blockbeat).
807+ log .Infof ("ChannelPoint(%v): detected spend from " +
808+ "blockbeat, transitioning to %v" ,
809+ c .cfg .chanState .FundingOutpoint ,
810+ spendStatePending )
811+
812+ newConfNtfn , err := handleSpendDetection (spend )
813+ if err != nil {
814+ log .Errorf ("Unable to handle spend " +
815+ "detection: %v" , err )
816+ return
817+ }
818+ pendingSpend = spend
819+ confNtfn = newConfNtfn
820+
821+ // STATE TRANSITION: None -> Pending.
822+ // We've detected a spend, but don't process it yet. Instead,
823+ // register for confirmations to protect against shallow reorgs.
824+ case spend , ok := <- spendNtfn .Spend :
825+ if ! ok {
826+ return
827+ }
828+
829+ log .Infof ("ChannelPoint(%v): detected spend from " +
830+ "notification, transitioning to %v" ,
831+ c .cfg .chanState .FundingOutpoint ,
832+ spendStatePending )
833+
834+ newConfNtfn , err := handleSpendDetection (spend )
835+ if err != nil {
836+ log .Errorf ("Unable to handle spend " +
837+ "detection: %v" , err )
838+ return
839+ }
840+ pendingSpend = spend
841+ confNtfn = newConfNtfn
842+
843+ // STATE TRANSITION: Pending -> Confirmed
844+ // The spend has reached required confirmations. It's now safe
845+ // to process since we've protected against shallow reorgs.
846+ case conf , ok := <- confChan :
847+ if ! ok {
848+ log .Errorf ("Confirmation channel closed " +
849+ "unexpectedly" )
850+ return
851+ }
852+
853+ log .Infof ("ChannelPoint(%v): spend confirmed at " +
854+ "height %d, transitioning to %v" ,
855+ c .cfg .chanState .FundingOutpoint ,
856+ conf .BlockHeight , spendStateConfirmed )
857+
858+ err := c .handleCommitSpend (pendingSpend )
859+ if err != nil {
860+ log .Errorf ("Failed to handle confirmed " +
861+ "spend: %v" , err )
862+ }
863+
864+ confNtfn .Cancel ()
865+ confNtfn = nil
866+ pendingSpend = nil
867+
868+ // STATE TRANSITION: Pending -> None
869+ // A reorg removed the spend tx. We reset to initial state and
870+ // wait for ANY new spend (could be the same tx re-mined, or a
871+ // different tx like an RBF replacement).
872+ case reorgDepth , ok := <- negativeConfChan :
687873 if ! ok {
874+ log .Errorf ("Negative conf channel closed " +
875+ "unexpectedly" )
688876 return
689877 }
690878
691- err := c .handleCommitSpend (spend )
879+ log .Infof ("ChannelPoint(%v): spend reorged out at " +
880+ "depth %d, transitioning back to %v" ,
881+ c .cfg .chanState .FundingOutpoint , reorgDepth ,
882+ spendStateNone )
883+
884+ confNtfn .Cancel ()
885+ confNtfn = nil
886+ pendingSpend = nil
887+
888+ spendNtfn .Cancel ()
889+ var err error
890+ spendNtfn , err = registerForSpend ()
692891 if err != nil {
693- log .Errorf ("Failed to handle commit spend: %v" ,
694- err )
892+ log .Errorf ("Unable to re-register for " +
893+ "spend: %v" , err )
894+ return
695895 }
696896
897+ log .Infof ("ChannelPoint(%v): re-registered for spend " +
898+ "detection" , c .cfg .chanState .FundingOutpoint )
899+
697900 // The chainWatcher has been signalled to exit, so we'll do so
698901 // now.
699902 case <- c .quit :
903+ if confNtfn != nil {
904+ confNtfn .Cancel ()
905+ }
700906 return
701907 }
702908 }
@@ -992,6 +1198,18 @@ func (c *chainWatcher) toSelfAmount(tx *wire.MsgTx) btcutil.Amount {
9921198 return btcutil .Amount (fn .Sum (vals ))
9931199}
9941200
1201+ // requiredConfsForSpend determines the number of confirmations required before
1202+ // processing a spend of the funding output. Uses config override if set
1203+ // (typically for testing), otherwise scales with channel capacity to balance
1204+ // security vs user experience for channels of different sizes.
1205+ func (c * chainWatcher ) requiredConfsForSpend () uint32 {
1206+ return c .cfg .chanCloseConfs .UnwrapOrFunc (func () uint32 {
1207+ return lnwallet .CloseConfsForCapacity (
1208+ c .cfg .chanState .Capacity ,
1209+ )
1210+ })
1211+ }
1212+
9951213// dispatchCooperativeClose processed a detect cooperative channel closure.
9961214// We'll use the spending transaction to locate our output within the
9971215// transaction, then clean up the database state. We'll also dispatch a
@@ -1009,8 +1227,8 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet
10091227 localAmt := c .toSelfAmount (broadcastTx )
10101228
10111229 // Once this is known, we'll mark the state as fully closed in the
1012- // database. We can do this as a cooperatively closed channel has all
1013- // its outputs resolved after only one confirmation .
1230+ // database. For cooperative closes, we wait for a confirmation depth
1231+ // determined by channel capacity before dispatching this event .
10141232 closeSummary := & channeldb.ChannelCloseSummary {
10151233 ChanPoint : c .cfg .chanState .FundingOutpoint ,
10161234 ChainHash : c .cfg .chanState .ChainHash ,
@@ -1419,9 +1637,10 @@ func (c *chainWatcher) handleCommitSpend(
14191637 case wire .MaxTxInSequenceNum :
14201638 fallthrough
14211639 case mempool .MaxRBFSequence :
1422- // TODO(roasbeef): rare but possible, need itest case for
1423- err := c .dispatchCooperativeClose (commitSpend )
1424- if err != nil {
1640+ // This is a cooperative close. Dispatch it directly - the
1641+ // confirmation waiting and reorg handling is done in the
1642+ // closeObserver state machine before we reach this point.
1643+ if err := c .dispatchCooperativeClose (commitSpend ); err != nil {
14251644 return fmt .Errorf ("handle coop close: %w" , err )
14261645 }
14271646
@@ -1526,9 +1745,9 @@ func (c *chainWatcher) chanPointConfirmed() bool {
15261745}
15271746
15281747// handleBlockbeat takes a blockbeat and queries for a spending tx for the
1529- // funding output. If the spending tx is found, it will be handled based on the
1530- // closure type .
1531- func (c * chainWatcher ) handleBlockbeat (beat chainio.Blockbeat ) {
1748+ // funding output. If found, it returns the spend details so closeObserver can
1749+ // process it. Returns nil if no spend was detected .
1750+ func (c * chainWatcher ) handleBlockbeat (beat chainio.Blockbeat ) * chainntnfs. SpendDetail {
15321751 // Notify the chain watcher has processed the block.
15331752 defer c .NotifyBlockProcessed (beat , nil )
15341753
@@ -1540,24 +1759,23 @@ func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) {
15401759 // If the funding output hasn't confirmed in this block, we
15411760 // will check it again in the next block.
15421761 if ! c .chanPointConfirmed () {
1543- return
1762+ return nil
15441763 }
15451764 }
15461765
15471766 // Perform a non-blocking read to check whether the funding output was
1548- // spent.
1767+ // spent. The actual spend handling is done in closeObserver's state
1768+ // machine to avoid blocking the block processing pipeline.
15491769 spend := c .checkFundingSpend ()
15501770 if spend == nil {
15511771 log .Tracef ("No spend found for ChannelPoint(%v) in block %v" ,
15521772 c .cfg .chanState .FundingOutpoint , beat .Height ())
15531773
1554- return
1774+ return nil
15551775 }
15561776
1557- // The funding output was spent, we now handle it by sending a close
1558- // event to the channel arbitrator.
1559- err := c .handleCommitSpend (spend )
1560- if err != nil {
1561- log .Errorf ("Failed to handle commit spend: %v" , err )
1562- }
1777+ log .Debugf ("Detected spend of ChannelPoint(%v) in block %v" ,
1778+ c .cfg .chanState .FundingOutpoint , beat .Height ())
1779+
1780+ return spend
15631781}
0 commit comments