@@ -1075,31 +1075,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
10751075 let peers = & mut peers. peers ;
10761076
10771077 peers. retain ( |descriptor, peer| {
1078- if peer. awaiting_pong == true {
1078+ if peer. awaiting_pong {
10791079 peers_needing_send. remove ( descriptor) ;
10801080 match peer. their_node_id {
10811081 Some ( node_id) => {
10821082 node_id_to_descriptor. remove ( & node_id) ;
10831083 self . message_handler . chan_handler . peer_disconnected ( & node_id, true ) ;
1084- } ,
1084+ }
10851085 None => { }
10861086 }
1087+ return false ;
1088+ }
1089+
1090+ if !peer. channel_encryptor . is_ready_for_encryption ( ) {
1091+ // The peer needs to complete its handshake before we can exchange messages
1092+ return true ;
10871093 }
10881094
10891095 let ping = msgs:: Ping {
10901096 ponglen : 0 ,
10911097 byteslen : 64 ,
10921098 } ;
1093- peer. pending_outbound_buffer . push_back ( encode_msg ! ( & ping) ) ;
1099+ peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( & ping) ) ) ;
1100+
10941101 let mut descriptor_clone = descriptor. clone ( ) ;
10951102 self . do_attempt_write_data ( & mut descriptor_clone, peer) ;
10961103
1097- if peer. awaiting_pong {
1098- false // Drop the peer
1099- } else {
1100- peer. awaiting_pong = true ;
1101- true
1102- }
1104+ peer. awaiting_pong = true ;
1105+ true
11031106 } ) ;
11041107 }
11051108 }
@@ -1118,15 +1121,29 @@ mod tests {
11181121
11191122 use rand:: { thread_rng, Rng } ;
11201123
1121- use std:: sync:: { Arc } ;
1124+ use std;
1125+ use std:: sync:: { Arc , Mutex } ;
11221126
1123- #[ derive( PartialEq , Eq , Clone , Hash ) ]
1127+ #[ derive( Clone ) ]
11241128 struct FileDescriptor {
11251129 fd : u16 ,
1130+ outbound_data : Arc < Mutex < Vec < u8 > > > ,
1131+ }
1132+ impl PartialEq for FileDescriptor {
1133+ fn eq ( & self , other : & Self ) -> bool {
1134+ self . fd == other. fd
1135+ }
1136+ }
1137+ impl Eq for FileDescriptor { }
1138+ impl std:: hash:: Hash for FileDescriptor {
1139+ fn hash < H : std:: hash:: Hasher > ( & self , hasher : & mut H ) {
1140+ self . fd . hash ( hasher)
1141+ }
11261142 }
11271143
11281144 impl SocketDescriptor for FileDescriptor {
11291145 fn send_data ( & mut self , data : & [ u8 ] , _resume_read : bool ) -> usize {
1146+ self . outbound_data . lock ( ) . unwrap ( ) . extend_from_slice ( data) ;
11301147 data. len ( )
11311148 }
11321149
@@ -1167,10 +1184,14 @@ mod tests {
11671184
11681185 fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > ) {
11691186 let secp_ctx = Secp256k1 :: new ( ) ;
1170- let their_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_b. our_node_secret ) ;
1171- let fd = FileDescriptor { fd : 1 } ;
1172- peer_a. new_inbound_connection ( fd. clone ( ) ) . unwrap ( ) ;
1173- peer_a. peers . lock ( ) . unwrap ( ) . node_id_to_descriptor . insert ( their_id, fd. clone ( ) ) ;
1187+ let a_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_a. our_node_secret ) ;
1188+ let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
1189+ let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
1190+ let initial_data = peer_b. new_outbound_connection ( a_id, fd_b. clone ( ) ) . unwrap ( ) ;
1191+ peer_a. new_inbound_connection ( fd_a. clone ( ) ) . unwrap ( ) ;
1192+ assert_eq ! ( peer_a. read_event( & mut fd_a, initial_data) . unwrap( ) , false ) ;
1193+ assert_eq ! ( peer_b. read_event( & mut fd_b, fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
1194+ assert_eq ! ( peer_a. read_event( & mut fd_a, fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
11741195 }
11751196
11761197 #[ test]
0 commit comments