@@ -123,8 +123,12 @@ pub trait ManyChannelMonitor: Send + Sync {
123123 fn add_update_monitor ( & self , funding_txo : OutPoint , monitor : ChannelMonitor ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
124124
125125 /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
126- /// with success or failure backward
127- fn fetch_pending_htlc_updated ( & self ) -> Vec < HTLCUpdate > ;
126+ /// with success or failure.
127+ ///
128+ /// You should probably just call through to
129+ /// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
130+ /// the full list.
131+ fn get_and_clear_pending_htlcs_updated ( & self ) -> Vec < HTLCUpdate > ;
128132}
129133
130134/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
@@ -146,7 +150,6 @@ pub struct SimpleManyChannelMonitor<Key> {
146150 chain_monitor : Arc < ChainWatchInterface > ,
147151 broadcaster : Arc < BroadcasterInterface > ,
148152 pending_events : Mutex < Vec < events:: Event > > ,
149- pending_htlc_updated : Mutex < HashMap < PaymentHash , Vec < ( HTLCSource , Option < PaymentPreimage > ) > > > ,
150153 logger : Arc < Logger > ,
151154 fee_estimator : Arc < FeeEstimator >
152155}
@@ -155,11 +158,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelM
155158 fn block_connected ( & self , header : & BlockHeader , height : u32 , txn_matched : & [ & Transaction ] , _indexes_of_txn_matched : & [ u32 ] ) {
156159 let block_hash = header. bitcoin_hash ( ) ;
157160 let mut new_events: Vec < events:: Event > = Vec :: with_capacity ( 0 ) ;
158- let mut htlc_updated_infos = Vec :: new ( ) ;
159161 {
160162 let mut monitors = self . monitors . lock ( ) . unwrap ( ) ;
161163 for monitor in monitors. values_mut ( ) {
162- let ( txn_outputs, spendable_outputs, mut htlc_updated ) = monitor. block_connected ( txn_matched, height, & block_hash, & * self . broadcaster , & * self . fee_estimator ) ;
164+ let ( txn_outputs, spendable_outputs) = monitor. block_connected ( txn_matched, height, & block_hash, & * self . broadcaster , & * self . fee_estimator ) ;
163165 if spendable_outputs. len ( ) > 0 {
164166 new_events. push ( events:: Event :: SpendableOutputs {
165167 outputs : spendable_outputs,
@@ -171,35 +173,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelM
171173 self . chain_monitor . install_watch_outpoint ( ( txid. clone ( ) , idx as u32 ) , & output. script_pubkey ) ;
172174 }
173175 }
174- htlc_updated_infos. append ( & mut htlc_updated) ;
175- }
176- }
177- {
178- // ChannelManager will just need to fetch pending_htlc_updated and pass state backward
179- let mut pending_htlc_updated = self . pending_htlc_updated . lock ( ) . unwrap ( ) ;
180- for htlc in htlc_updated_infos. drain ( ..) {
181- match pending_htlc_updated. entry ( htlc. 2 ) {
182- hash_map:: Entry :: Occupied ( mut e) => {
183- // In case of reorg we may have htlc outputs solved in a different way so
184- // we prefer to keep claims but don't store duplicate updates for a given
185- // (payment_hash, HTLCSource) pair.
186- let mut existing_claim = false ;
187- e. get_mut ( ) . retain ( |htlc_data| {
188- if htlc. 0 == htlc_data. 0 {
189- if htlc_data. 1 . is_some ( ) {
190- existing_claim = true ;
191- true
192- } else { false }
193- } else { true }
194- } ) ;
195- if !existing_claim {
196- e. get_mut ( ) . push ( ( htlc. 0 , htlc. 1 ) ) ;
197- }
198- }
199- hash_map:: Entry :: Vacant ( e) => {
200- e. insert ( vec ! [ ( htlc. 0 , htlc. 1 ) ] ) ;
201- }
202- }
203176 }
204177 }
205178 let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
@@ -224,7 +197,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key>
224197 chain_monitor,
225198 broadcaster,
226199 pending_events : Mutex :: new ( Vec :: new ( ) ) ,
227- pending_htlc_updated : Mutex :: new ( HashMap :: new ( ) ) ,
228200 logger,
229201 fee_estimator : feeest,
230202 } ;
@@ -272,17 +244,10 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor<OutPoint> {
272244 }
273245 }
274246
275- fn fetch_pending_htlc_updated ( & self ) -> Vec < HTLCUpdate > {
276- let mut updated = self . pending_htlc_updated . lock ( ) . unwrap ( ) ;
277- let mut pending_htlcs_updated = Vec :: with_capacity ( updated. len ( ) ) ;
278- for ( k, v) in updated. drain ( ) {
279- for htlc_data in v {
280- pending_htlcs_updated. push ( HTLCUpdate {
281- payment_hash : k,
282- payment_preimage : htlc_data. 1 ,
283- source : htlc_data. 0 ,
284- } ) ;
285- }
247+ fn get_and_clear_pending_htlcs_updated ( & self ) -> Vec < HTLCUpdate > {
248+ let mut pending_htlcs_updated = Vec :: new ( ) ;
249+ for chan in self . monitors . lock ( ) . unwrap ( ) . values_mut ( ) {
250+ pending_htlcs_updated. append ( & mut chan. get_and_clear_pending_htlcs_updated ( ) ) ;
286251 }
287252 pending_htlcs_updated
288253 }
@@ -604,6 +569,8 @@ pub struct ChannelMonitor {
604569
605570 payment_preimages : HashMap < PaymentHash , PaymentPreimage > ,
606571
572+ pending_htlcs_updated : HashMap < PaymentHash , Vec < ( HTLCSource , Option < PaymentPreimage > ) > > ,
573+
607574 destination_script : Script ,
608575 // Thanks to data loss protection, we may be able to claim our non-htlc funds
609576 // back, this is the script we have to spend from but we need to
@@ -708,6 +675,7 @@ impl PartialEq for ChannelMonitor {
708675 self . current_remote_commitment_number != other. current_remote_commitment_number ||
709676 self . current_local_signed_commitment_tx != other. current_local_signed_commitment_tx ||
710677 self . payment_preimages != other. payment_preimages ||
678+ self . pending_htlcs_updated != other. pending_htlcs_updated ||
711679 self . destination_script != other. destination_script ||
712680 self . to_remote_rescue != other. to_remote_rescue ||
713681 self . pending_claim_requests != other. pending_claim_requests ||
@@ -761,6 +729,8 @@ impl ChannelMonitor {
761729 current_remote_commitment_number : 1 << 48 ,
762730
763731 payment_preimages : HashMap :: new ( ) ,
732+ pending_htlcs_updated : HashMap :: new ( ) ,
733+
764734 destination_script : destination_script,
765735 to_remote_rescue : None ,
766736
@@ -1117,6 +1087,22 @@ impl ChannelMonitor {
11171087 res
11181088 }
11191089
1090+ /// Get the list of HTLCs who's status has been updated on chain. This should be called by
1091+ /// ChannelManager via ManyChannelMonitor::fetch_pending_htlcs_updated().
1092+ pub fn get_and_clear_pending_htlcs_updated ( & mut self ) -> Vec < HTLCUpdate > {
1093+ let mut pending_htlcs_updated = Vec :: with_capacity ( self . pending_htlcs_updated . len ( ) ) ;
1094+ for ( k, v) in self . pending_htlcs_updated . drain ( ) {
1095+ for htlc_data in v {
1096+ pending_htlcs_updated. push ( HTLCUpdate {
1097+ payment_hash : k,
1098+ payment_preimage : htlc_data. 1 ,
1099+ source : htlc_data. 0 ,
1100+ } ) ;
1101+ }
1102+ }
1103+ pending_htlcs_updated
1104+ }
1105+
11201106 /// Serializes into a vec, with various modes for the exposed pub fns
11211107 fn write < W : Writer > ( & self , writer : & mut W , for_local_storage : bool ) -> Result < ( ) , :: std:: io:: Error > {
11221108 //TODO: We still write out all the serialization here manually instead of using the fancy
@@ -1284,6 +1270,16 @@ impl ChannelMonitor {
12841270 writer. write_all ( & payment_preimage. 0 [ ..] ) ?;
12851271 }
12861272
1273+ writer. write_all ( & byte_utils:: be64_to_array ( self . pending_htlcs_updated . len ( ) as u64 ) ) ?;
1274+ for ( payment_hash, data) in self . pending_htlcs_updated . iter ( ) {
1275+ writer. write_all ( & payment_hash. 0 [ ..] ) ?;
1276+ writer. write_all ( & byte_utils:: be64_to_array ( data. len ( ) as u64 ) ) ?;
1277+ for & ( ref source, ref payment_preimage) in data. iter ( ) {
1278+ source. write ( writer) ?;
1279+ write_option ! ( payment_preimage) ;
1280+ }
1281+ }
1282+
12871283 self . last_block_hash . write ( writer) ?;
12881284 self . destination_script . write ( writer) ?;
12891285 if let Some ( ( ref to_remote_script, ref local_key) ) = self . to_remote_rescue {
@@ -2334,11 +2330,38 @@ impl ChannelMonitor {
23342330 }
23352331 }
23362332
2337- fn block_connected ( & mut self , txn_matched : & [ & Transaction ] , height : u32 , block_hash : & Sha256dHash , broadcaster : & BroadcasterInterface , fee_estimator : & FeeEstimator ) -> ( Vec < ( Sha256dHash , Vec < TxOut > ) > , Vec < SpendableOutputDescriptor > , Vec < ( HTLCSource , Option < PaymentPreimage > , PaymentHash ) > ) {
2333+ fn append_htlc_updated ( & mut self , mut htlc_updated_infos : Vec < ( HTLCSource , Option < PaymentPreimage > , PaymentHash ) > ) {
2334+ // ChannelManager will just need to fetch pending_htlcs_updated and pass state backward
2335+ for htlc in htlc_updated_infos. drain ( ..) {
2336+ match self . pending_htlcs_updated . entry ( htlc. 2 ) {
2337+ hash_map:: Entry :: Occupied ( mut e) => {
2338+ // In case of reorg we may have htlc outputs solved in a different way so
2339+ // we prefer to keep claims but don't store duplicate updates for a given
2340+ // (payment_hash, HTLCSource) pair.
2341+ let mut existing_claim = false ;
2342+ e. get_mut ( ) . retain ( |htlc_data| {
2343+ if htlc. 0 == htlc_data. 0 {
2344+ if htlc_data. 1 . is_some ( ) {
2345+ existing_claim = true ;
2346+ true
2347+ } else { false }
2348+ } else { true }
2349+ } ) ;
2350+ if !existing_claim {
2351+ e. get_mut ( ) . push ( ( htlc. 0 , htlc. 1 ) ) ;
2352+ }
2353+ }
2354+ hash_map:: Entry :: Vacant ( e) => {
2355+ e. insert ( vec ! [ ( htlc. 0 , htlc. 1 ) ] ) ;
2356+ }
2357+ }
2358+ }
2359+ }
2360+
2361+ fn block_connected ( & mut self , txn_matched : & [ & Transaction ] , height : u32 , block_hash : & Sha256dHash , broadcaster : & BroadcasterInterface , fee_estimator : & FeeEstimator ) -> ( Vec < ( Sha256dHash , Vec < TxOut > ) > , Vec < SpendableOutputDescriptor > ) {
23382362 log_trace ! ( self , "Block {} at height {} connected with {} txn matched" , block_hash, height, txn_matched. len( ) ) ;
23392363 let mut watch_outputs = Vec :: new ( ) ;
23402364 let mut spendable_outputs = Vec :: new ( ) ;
2341- let mut htlc_updated = Vec :: new ( ) ;
23422365 let mut bump_candidates = HashSet :: new ( ) ;
23432366 for tx in txn_matched {
23442367 if tx. input . len ( ) == 1 {
@@ -2397,10 +2420,8 @@ impl ChannelMonitor {
23972420 // While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
23982421 // can also be resolved in a few other ways which can have more than one output. Thus,
23992422 // we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
2400- let mut updated = self . is_resolving_htlc_output ( & tx, height) ;
2401- if updated. len ( ) > 0 {
2402- htlc_updated. append ( & mut updated) ;
2403- }
2423+ let htlcs_updated = self . is_resolving_htlc_output ( & tx, height) ;
2424+ self . append_htlc_updated ( htlcs_updated) ;
24042425
24052426 // Scan all input to verify is one of the outpoint spent is of interest for us
24062427 let mut claimed_outputs_material = Vec :: new ( ) ;
@@ -2523,7 +2544,7 @@ impl ChannelMonitor {
25232544 } ,
25242545 OnchainEvent :: HTLCUpdate { htlc_update } => {
25252546 log_trace ! ( self , "HTLC {} failure update has got enough confirmations to be passed upstream" , log_bytes!( ( htlc_update. 1 ) . 0 ) ) ;
2526- htlc_updated . push ( ( htlc_update. 0 , None , htlc_update. 1 ) ) ;
2547+ self . append_htlc_updated ( vec ! [ ( htlc_update. 0 , None , htlc_update. 1 ) ] ) ;
25272548 } ,
25282549 OnchainEvent :: ContentiousOutpoint { outpoint, .. } => {
25292550 self . claimable_outpoints . remove ( & outpoint) ;
@@ -2552,7 +2573,7 @@ impl ChannelMonitor {
25522573 }
25532574 }
25542575 self . last_block_hash = block_hash. clone ( ) ;
2555- ( watch_outputs, spendable_outputs, htlc_updated )
2576+ ( watch_outputs, spendable_outputs)
25562577 }
25572578
25582579 fn block_disconnected ( & mut self , height : u32 , block_hash : & Sha256dHash , broadcaster : & BroadcasterInterface , fee_estimator : & FeeEstimator ) {
@@ -3140,6 +3161,20 @@ impl<R: ::std::io::Read> ReadableArgs<R, Arc<Logger>> for (Sha256dHash, ChannelM
31403161 }
31413162 }
31423163
3164+ let pending_htlcs_updated_len: u64 = Readable :: read ( reader) ?;
3165+ let mut pending_htlcs_updated = HashMap :: with_capacity ( cmp:: min ( pending_htlcs_updated_len as usize , MAX_ALLOC_SIZE / ( 32 + 8 * 3 ) ) ) ;
3166+ for _ in 0 ..pending_htlcs_updated_len {
3167+ let payment_hash: PaymentHash = Readable :: read ( reader) ?;
3168+ let htlcs_len: u64 = Readable :: read ( reader) ?;
3169+ let mut htlcs = Vec :: with_capacity ( cmp:: min ( htlcs_len as usize , MAX_ALLOC_SIZE / 64 ) ) ;
3170+ for _ in 0 ..htlcs_len {
3171+ htlcs. push ( ( Readable :: read ( reader) ?, Readable :: read ( reader) ?) ) ;
3172+ }
3173+ if let Some ( _) = pending_htlcs_updated. insert ( payment_hash, htlcs) {
3174+ return Err ( DecodeError :: InvalidValue ) ;
3175+ }
3176+ }
3177+
31433178 let last_block_hash: Sha256dHash = Readable :: read ( reader) ?;
31443179 let destination_script = Readable :: read ( reader) ?;
31453180 let to_remote_rescue = match <u8 as Readable < R > >:: read ( reader) ? {
@@ -3226,6 +3261,7 @@ impl<R: ::std::io::Read> ReadableArgs<R, Arc<Logger>> for (Sha256dHash, ChannelM
32263261 current_remote_commitment_number,
32273262
32283263 payment_preimages,
3264+ pending_htlcs_updated,
32293265
32303266 destination_script,
32313267 to_remote_rescue,
0 commit comments