@@ -70,9 +70,9 @@ use lightning::ln::peer_handler;
7070use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
7171use lightning:: ln:: msgs:: ChannelMessageHandler ;
7272
73- use std:: task;
73+ use std:: { task, thread } ;
7474use std:: net:: SocketAddr ;
75- use std:: sync:: { Arc , Mutex } ;
75+ use std:: sync:: { Arc , Mutex , MutexGuard } ;
7676use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
7777use std:: time:: Duration ;
7878use std:: hash:: Hash ;
@@ -101,29 +101,31 @@ struct Connection {
101101 // socket. To wake it up (without otherwise changing its state, we can push a value into this
102102 // Sender.
103103 read_waker : mpsc:: Sender < ( ) > ,
104+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
105+ // are sure we won't call any more read/write PeerManager functions with the same connection.
106+ // This is set to true if we're in such a condition (with disconnect checked before with the
107+ // top-level mutex held) and false when we can return.
108+ block_disconnect_socket : bool ,
104109 read_paused : bool ,
105110 rl_requested_disconnect : bool ,
106111 id : u64 ,
107112}
108113impl Connection {
114+ fn event_trigger ( us : & mut MutexGuard < Self > ) {
115+ match us. event_notify . try_send ( ( ) ) {
116+ Ok ( _) => { } ,
117+ Err ( mpsc:: error:: TrySendError :: Full ( _) ) => {
118+ // Ignore full errors as we just need the user to poll after this point, so if they
119+ // haven't received the last send yet, it doesn't matter.
120+ } ,
121+ _ => panic ! ( )
122+ }
123+ }
109124 async fn schedule_read < CMH : ChannelMessageHandler + ' static > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > > > , us : Arc < Mutex < Self > > , mut reader : io:: ReadHalf < TcpStream > , mut read_wake_receiver : mpsc:: Receiver < ( ) > , mut write_avail_receiver : mpsc:: Receiver < ( ) > ) {
110125 let peer_manager_ref = peer_manager. clone ( ) ;
111126 // 8KB is nice and big but also should never cause any issues with stack overflowing.
112127 let mut buf = [ 0 ; 8192 ] ;
113128
114- macro_rules! event_occurred {
115- ( $lock: expr) => { {
116- match $lock. event_notify. try_send( ( ) ) {
117- Ok ( _) => { } ,
118- Err ( mpsc:: error:: TrySendError :: Full ( _) ) => {
119- // Ignore full errors as we just need the user to poll after this point, so if they
120- // haven't received the last send yet, it doesn't matter.
121- } ,
122- _ => panic!( )
123- }
124- } }
125- }
126-
127129 // If Rust-Lightning tells us to disconnect, we don't, then, call socket_disconnect() after
128130 // closing the socket, but if the socket closes on its own (ie our counterparty does it), we do.
129131 let need_disconnect_event;
@@ -136,48 +138,60 @@ impl Connection {
136138 } }
137139 }
138140
141+ macro_rules! prepare_read_write_call {
142+ ( ) => { {
143+ let mut us_lock = us. lock( ) . unwrap( ) ;
144+ if us_lock. rl_requested_disconnect {
145+ shutdown_socket!( "disconnect_socket() call from RL" , false ) ;
146+ }
147+ us_lock. block_disconnect_socket = true ;
148+ } }
149+ }
150+
139151 let read_paused = us. lock ( ) . unwrap ( ) . read_paused ;
140152 tokio:: select! {
141153 v = write_avail_receiver. recv( ) => {
142154 assert!( v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
143- if us. lock( ) . unwrap( ) . rl_requested_disconnect {
144- shutdown_socket!( "disconnect_socket() call from RL" , false ) ;
145- }
155+ prepare_read_write_call!( ) ;
146156 if let Err ( e) = peer_manager. write_buffer_space_avail( & mut SocketDescriptor :: new( us. clone( ) ) ) {
147157 shutdown_socket!( e, false ) ;
148158 }
159+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
149160 } ,
150161 _ = read_wake_receiver. recv( ) => { } ,
151162 read = reader. read( & mut buf) , if !read_paused => match read {
152163 Ok ( 0 ) => shutdown_socket!( "Connection closed" , true ) ,
153164 Ok ( len) => {
154- if us. lock( ) . unwrap( ) . rl_requested_disconnect {
155- shutdown_socket!( "disconnect_socket() call from RL" , false ) ;
156- }
165+ prepare_read_write_call!( ) ;
157166 let read_res = peer_manager. read_event( & mut SocketDescriptor :: new( Arc :: clone( & us) ) , & buf[ 0 ..len] ) ;
167+ let mut us_lock = us. lock( ) . unwrap( ) ;
158168 match read_res {
159169 Ok ( pause_read) => {
160- let mut us_lock = us. lock( ) . unwrap( ) ;
161170 if pause_read {
162171 us_lock. read_paused = true ;
163172 }
164- event_occurred! ( us_lock) ;
173+ Self :: event_trigger ( & mut us_lock) ;
165174 } ,
166175 Err ( e) => shutdown_socket!( e, false ) ,
167176 }
177+ us_lock. block_disconnect_socket = false ;
168178 } ,
169179 Err ( e) => shutdown_socket!( e, true ) ,
170180 } ,
171181 }
172182 }
173- let writer_option = us. lock ( ) . unwrap ( ) . writer . take ( ) ;
183+ let writer_option = {
184+ let mut us_lock = us. lock ( ) . unwrap ( ) ;
185+ us_lock. block_disconnect_socket = false ;
186+ us_lock. writer . take ( )
187+ } ;
174188 if let Some ( mut writer) = writer_option {
175189 // If the socket is already closed, shutdown() will fail, so just ignore it.
176190 let _ = writer. shutdown ( ) . await ;
177191 }
178192 if need_disconnect_event {
179193 peer_manager_ref. socket_disconnected ( & SocketDescriptor :: new ( Arc :: clone ( & us) ) ) ;
180- event_occurred ! ( us. lock( ) . unwrap( ) ) ;
194+ Self :: event_trigger ( & mut us. lock ( ) . unwrap ( ) ) ;
181195 }
182196 }
183197
@@ -196,8 +210,8 @@ impl Connection {
196210
197211 ( reader, write_receiver, read_receiver,
198212 Arc :: new ( Mutex :: new ( Self {
199- writer : Some ( writer) , event_notify, write_avail, read_waker,
200- read_paused : false , rl_requested_disconnect : false ,
213+ writer : Some ( writer) , event_notify, write_avail, read_waker, read_paused : false ,
214+ block_disconnect_socket : false , rl_requested_disconnect : false ,
201215 id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
202216 } ) ) )
203217 }
@@ -384,15 +398,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
384398 }
385399
386400 fn disconnect_socket ( & mut self ) {
387- let mut us = self . conn . lock ( ) . unwrap ( ) ;
388- us. rl_requested_disconnect = true ;
389- us. read_paused = true ;
390- // Wake up the sending thread, assuming it is still alive
391- let _ = us. write_avail . try_send ( ( ) ) ;
392- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
393- // read task is about to call a PeerManager function (eg read_event or write_event).
394- // Ideally we need to release the us lock and block until we have confirmation from the
395- // read task that it has broken out of its main loop.
401+ {
402+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
403+ us. rl_requested_disconnect = true ;
404+ us. read_paused = true ;
405+ // Wake up the sending thread, assuming it is still alive
406+ let _ = us. write_avail . try_send ( ( ) ) ;
407+ // Happy-path return:
408+ if !us. block_disconnect_socket { return ; }
409+ }
410+ while self . conn . lock ( ) . unwrap ( ) . block_disconnect_socket {
411+ thread:: yield_now ( ) ;
412+ }
396413 }
397414}
398415impl Clone for SocketDescriptor {
0 commit comments