@@ -71,7 +71,7 @@ use lightning::ln::peer_handler;
7171use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
7272use lightning:: ln:: msgs:: ChannelMessageHandler ;
7373
74- use std:: task;
74+ use std:: { task, thread } ;
7575use std:: net:: SocketAddr ;
7676use std:: sync:: { Arc , Mutex } ;
7777use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -113,6 +113,11 @@ struct Connection {
113113 // send into any read_blocker to wake the reading future back up and set read_paused back to
114114 // false.
115115 read_blocker : Option < oneshot:: Sender < ( ) > > ,
116+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
117+ // are sure we won't call any more read/write PeerManager functions with the same connection.
118+ // This is set to true if we're in such a condition (with disconnect checked before with the
119+ // top-level mutex held) and false when we can return.
120+ block_disconnect_socket : bool ,
116121 read_paused : bool ,
117122 disconnect_state : DisconnectionState ,
118123 id : u64 ,
@@ -130,20 +135,29 @@ impl Connection {
130135 } }
131136 }
132137
138+ macro_rules! prepare_read_write_call {
139+ ( ) => { {
140+ let mut us_lock = us. lock( ) . unwrap( ) ;
141+ if us_lock. disconnect_state == DisconnectionState :: RLTriggeredDisconnect {
142+ shutdown_socket!( "disconnect_socket() call from RL" ) ;
143+ }
144+ us_lock. block_disconnect_socket = true ;
145+ } }
146+ }
147+
133148 // Whenever we want to block on reading or waiting for reading to resume, we have to
134149 // at least select with the write_avail_receiver, which is used by the
135150 // SocketDescriptor to wake us up if we need to shut down the socket or if we need
136151 // to generate a write_buffer_space_avail call.
137152 macro_rules! select_write_ev {
138153 ( $v: expr) => { {
139154 assert!( $v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
140- if us. lock( ) . unwrap( ) . disconnect_state == DisconnectionState :: RLTriggeredDisconnect {
141- shutdown_socket!( "disconnect_socket() call from RL" ) ;
142- }
155+ prepare_read_write_call!( ) ;
143156 if let Err ( e) = peer_manager. write_buffer_space_avail( & mut SocketDescriptor :: new( us. clone( ) ) ) {
144157 us. lock( ) . unwrap( ) . disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
145158 shutdown_socket!( e) ;
146159 }
160+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
147161 } }
148162 }
149163
@@ -176,6 +190,7 @@ impl Connection {
176190 v = write_avail_receiver. recv( ) => select_write_ev!( v) ,
177191 }
178192 }
193+ prepare_read_write_call!( ) ;
179194 match peer_manager. read_event( & mut SocketDescriptor :: new( Arc :: clone( & us) ) , & buf[ 0 ..len] ) {
180195 Ok ( pause_read) => {
181196 if pause_read {
@@ -197,6 +212,7 @@ impl Connection {
197212 shutdown_socket!( e)
198213 } ,
199214 }
215+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
200216 } ,
201217 Err ( e) => {
202218 println!( "Connection closed: {}" , e) ;
@@ -205,6 +221,7 @@ impl Connection {
205221 } ,
206222 }
207223 }
224+ us. lock ( ) . unwrap ( ) . block_disconnect_socket = false ;
208225 let writer_option = us. lock ( ) . unwrap ( ) . writer . take ( ) ;
209226 if let Some ( mut writer) = writer_option {
210227 // If the socket is already closed, shutdown() will fail, so just ignore it.
@@ -234,7 +251,7 @@ impl Connection {
234251
235252 ( reader, receiver,
236253 Arc :: new ( Mutex :: new ( Self {
237- writer : Some ( writer) , event_notify, write_avail,
254+ writer : Some ( writer) , event_notify, write_avail, block_disconnect_socket : false ,
238255 read_blocker : None , read_paused : false , disconnect_state : DisconnectionState :: NeedDisconnectEvent ,
239256 id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
240257 } ) ) )
@@ -423,15 +440,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
423440 }
424441
425442 fn disconnect_socket ( & mut self ) {
426- let mut us = self . conn . lock ( ) . unwrap ( ) ;
427- us. disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
428- us. read_paused = true ;
429- // Wake up the sending thread, assuming it is still alive
430- let _ = us. write_avail . try_send ( ( ) ) ;
431- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
432- // read task is about to call a PeerManager function (eg read_event or write_event).
433- // Ideally we need to release the us lock and block until we have confirmation from the
434- // read task that it has broken out of its main loop.
443+ {
444+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
445+ us. disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
446+ us. read_paused = true ;
447+ // Wake up the sending thread, assuming it is still alive
448+ let _ = us. write_avail . try_send ( ( ) ) ;
449+ // Happy-path return:
450+ if !us. block_disconnect_socket { return ; }
451+ }
452+ while self . conn . lock ( ) . unwrap ( ) . block_disconnect_socket {
453+ thread:: yield_now ( ) ;
454+ }
435455 }
436456}
437457impl Clone for SocketDescriptor {
0 commit comments