@@ -1149,8 +1149,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
11491149
11501150#[ cfg( test) ]
11511151mod tests {
1152+ use secp256k1:: Signature ;
1153+ use bitcoin:: BitcoinHash ;
1154+ use bitcoin:: network:: constants:: Network ;
1155+ use bitcoin:: blockdata:: constants:: genesis_block;
11521156 use ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
11531157 use ln:: msgs;
1158+ use ln:: features:: ChannelFeatures ;
11541159 use util:: events;
11551160 use util:: test_utils;
11561161 use util:: logger:: Logger ;
@@ -1161,7 +1166,9 @@ mod tests {
11611166 use rand:: { thread_rng, Rng } ;
11621167
11631168 use std;
1169+ use std:: cmp:: min;
11641170 use std:: sync:: { Arc , Mutex } ;
1171+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
11651172
11661173 #[ derive( Clone ) ]
11671174 struct FileDescriptor {
@@ -1199,7 +1206,7 @@ mod tests {
11991206 chan_handlers
12001207 }
12011208
1202- fn create_network < ' a > ( peer_count : usize , chan_handlers : & ' a Vec < test_utils:: TestChannelMessageHandler > ) -> Vec < PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > > {
1209+ fn create_network < ' a > ( peer_count : usize , chan_handlers : & ' a Vec < test_utils:: TestChannelMessageHandler > , routing_handlers : Option < & ' a Vec < Arc < msgs :: RoutingMessageHandler > > > ) -> Vec < PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > > {
12031210 let mut peers = Vec :: new ( ) ;
12041211 let mut rng = thread_rng ( ) ;
12051212 let logger : Arc < Logger > = Arc :: new ( test_utils:: TestLogger :: new ( ) ) ;
@@ -1208,20 +1215,23 @@ mod tests {
12081215
12091216 for i in 0 ..peer_count {
12101217 let router = test_utils:: TestRoutingMessageHandler :: new ( ) ;
1218+ let router = if let Some ( routers) = routing_handlers { routers[ i] . clone ( ) } else {
1219+ Arc :: new ( router)
1220+ } ;
12111221 let node_id = {
12121222 let mut key_slice = [ 0 ; 32 ] ;
12131223 rng. fill_bytes ( & mut key_slice) ;
12141224 SecretKey :: from_slice ( & key_slice) . unwrap ( )
12151225 } ;
1216- let msg_handler = MessageHandler { chan_handler : & chan_handlers[ i] , route_handler : Arc :: new ( router) } ;
1226+ let msg_handler = MessageHandler { chan_handler : & chan_handlers[ i] , route_handler : router } ;
12171227 let peer = PeerManager :: new ( msg_handler, node_id, & ephemeral_bytes, Arc :: clone ( & logger) ) ;
12181228 peers. push ( peer) ;
12191229 }
12201230
12211231 peers
12221232 }
12231233
1224- fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > ) {
1234+ fn establish_connection < ' a > ( peer_a : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > , peer_b : & PeerManager < FileDescriptor , & ' a test_utils:: TestChannelMessageHandler > ) -> ( FileDescriptor , FileDescriptor ) {
12251235 let secp_ctx = Secp256k1 :: new ( ) ;
12261236 let a_id = PublicKey :: from_secret_key ( & secp_ctx, & peer_a. our_node_secret ) ;
12271237 let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
@@ -1231,6 +1241,7 @@ mod tests {
12311241 assert_eq ! ( peer_a. read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
12321242 assert_eq ! ( peer_b. read_event( & mut fd_b, & fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
12331243 assert_eq ! ( peer_a. read_event( & mut fd_a, & fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
1244+ ( fd_a. clone ( ) , fd_b. clone ( ) )
12341245 }
12351246
12361247 #[ test]
@@ -1239,7 +1250,7 @@ mod tests {
12391250 // push a DisconnectPeer event to remove the node flagged by id
12401251 let chan_handlers = create_chan_handlers ( 2 ) ;
12411252 let chan_handler = test_utils:: TestChannelMessageHandler :: new ( ) ;
1242- let mut peers = create_network ( 2 , & chan_handlers) ;
1253+ let mut peers = create_network ( 2 , & chan_handlers, None ) ;
12431254 establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
12441255 assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
12451256
@@ -1256,11 +1267,12 @@ mod tests {
12561267 peers[ 0 ] . process_events ( ) ;
12571268 assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
12581269 }
1270+
12591271 #[ test]
1260- fn test_timer_tick_occured ( ) {
1272+ fn test_timer_tick_occurred ( ) {
12611273 // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
12621274 let chan_handlers = create_chan_handlers ( 2 ) ;
1263- let peers = create_network ( 2 , & chan_handlers) ;
1275+ let peers = create_network ( 2 , & chan_handlers, None ) ;
12641276 establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
12651277 assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
12661278
@@ -1272,4 +1284,140 @@ mod tests {
12721284 peers[ 0 ] . timer_tick_occured ( ) ;
12731285 assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
12741286 }
1287+
1288+ pub struct TestRoutingMessageHandler {
1289+ pub chan_upds_recvd : AtomicUsize ,
1290+ pub chan_anns_recvd : AtomicUsize ,
1291+ pub chan_anns_sent : AtomicUsize ,
1292+ }
1293+
1294+ impl TestRoutingMessageHandler {
1295+ pub fn new ( ) -> Self {
1296+ TestRoutingMessageHandler {
1297+ chan_upds_recvd : AtomicUsize :: new ( 0 ) ,
1298+ chan_anns_recvd : AtomicUsize :: new ( 0 ) ,
1299+ chan_anns_sent : AtomicUsize :: new ( 0 ) ,
1300+ }
1301+ }
1302+
1303+ }
1304+ impl msgs:: RoutingMessageHandler for TestRoutingMessageHandler {
1305+ fn handle_node_announcement ( & self , _msg : & msgs:: NodeAnnouncement ) -> Result < bool , msgs:: LightningError > {
1306+ Err ( msgs:: LightningError { err : "" , action : msgs:: ErrorAction :: IgnoreError } )
1307+ }
1308+ fn handle_channel_announcement ( & self , _msg : & msgs:: ChannelAnnouncement ) -> Result < bool , msgs:: LightningError > {
1309+ self . chan_anns_recvd . fetch_add ( 1 , Ordering :: AcqRel ) ;
1310+ Err ( msgs:: LightningError { err : "" , action : msgs:: ErrorAction :: IgnoreError } )
1311+ }
1312+ fn handle_channel_update ( & self , _msg : & msgs:: ChannelUpdate ) -> Result < bool , msgs:: LightningError > {
1313+ self . chan_upds_recvd . fetch_add ( 1 , Ordering :: AcqRel ) ;
1314+ Err ( msgs:: LightningError { err : "" , action : msgs:: ErrorAction :: IgnoreError } )
1315+ }
1316+ fn handle_htlc_fail_channel_update ( & self , _update : & msgs:: HTLCFailChannelUpdate ) { }
1317+ fn get_next_channel_announcements ( & self , starting_point : u64 , batch_amount : u8 ) -> Vec < ( msgs:: ChannelAnnouncement , msgs:: ChannelUpdate , msgs:: ChannelUpdate ) > {
1318+ let mut chan_anns = Vec :: new ( ) ;
1319+ let batch_amt = batch_amount as u64 ;
1320+ const TOTAL_ANNS : u64 = 100 ;
1321+ let num_anns_sent = self . chan_anns_sent . load ( Ordering :: Acquire ) as u64 ;
1322+ let end: u64 = if starting_point > 0 { min ( starting_point + batch_amt, TOTAL_ANNS - num_anns_sent) } else { batch_amount. into ( ) } ;
1323+ for j in starting_point..end {
1324+ let chan_upd_1 = get_dummy_channel_update ( j) ;
1325+ let chan_upd_2 = get_dummy_channel_update ( j) ;
1326+ let chan_ann = get_dummy_channel_announcement ( j) ;
1327+
1328+ chan_anns. push ( ( chan_ann, chan_upd_1, chan_upd_2) ) ;
1329+ }
1330+
1331+ self . chan_anns_sent . fetch_add ( chan_anns. len ( ) , Ordering :: AcqRel ) ;
1332+ chan_anns
1333+ }
1334+
1335+ fn get_next_node_announcements ( & self , _starting_point : Option < & PublicKey > , _batch_amount : u8 ) -> Vec < msgs:: NodeAnnouncement > {
1336+ Vec :: new ( )
1337+ }
1338+
1339+ fn should_request_full_sync ( & self , _node_id : & PublicKey ) -> bool {
1340+ true
1341+ }
1342+ }
1343+
1344+ fn get_dummy_channel_announcement ( short_chan_id : u64 ) -> msgs:: ChannelAnnouncement {
1345+ use secp256k1:: ffi:: Signature as FFISignature ;
1346+ let secp_ctx = Secp256k1 :: new ( ) ;
1347+ let network = Network :: Testnet ;
1348+ let node_1_privkey = SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ;
1349+ let node_2_privkey = SecretKey :: from_slice ( & [ 41 ; 32 ] ) . unwrap ( ) ;
1350+ let node_1_btckey = SecretKey :: from_slice ( & [ 40 ; 32 ] ) . unwrap ( ) ;
1351+ let node_2_btckey = SecretKey :: from_slice ( & [ 39 ; 32 ] ) . unwrap ( ) ;
1352+ let unsigned_ann = msgs:: UnsignedChannelAnnouncement {
1353+ features : ChannelFeatures :: supported ( ) ,
1354+ chain_hash : genesis_block ( network) . header . bitcoin_hash ( ) ,
1355+ short_channel_id : short_chan_id,
1356+ node_id_1 : PublicKey :: from_secret_key ( & secp_ctx, & node_1_privkey) ,
1357+ node_id_2 : PublicKey :: from_secret_key ( & secp_ctx, & node_2_privkey) ,
1358+ bitcoin_key_1 : PublicKey :: from_secret_key ( & secp_ctx, & node_1_btckey) ,
1359+ bitcoin_key_2 : PublicKey :: from_secret_key ( & secp_ctx, & node_2_btckey) ,
1360+ excess_data : Vec :: new ( ) ,
1361+ } ;
1362+
1363+ msgs:: ChannelAnnouncement {
1364+ node_signature_1 : Signature :: from ( FFISignature :: new ( ) ) ,
1365+ node_signature_2 : Signature :: from ( FFISignature :: new ( ) ) ,
1366+ bitcoin_signature_1 : Signature :: from ( FFISignature :: new ( ) ) ,
1367+ bitcoin_signature_2 : Signature :: from ( FFISignature :: new ( ) ) ,
1368+ contents : unsigned_ann,
1369+ }
1370+ }
1371+
1372+ fn get_dummy_channel_update ( short_chan_id : u64 ) -> msgs:: ChannelUpdate {
1373+ use secp256k1:: ffi:: Signature as FFISignature ;
1374+ let network = Network :: Testnet ;
1375+ msgs:: ChannelUpdate {
1376+ signature : Signature :: from ( FFISignature :: new ( ) ) ,
1377+ contents : msgs:: UnsignedChannelUpdate {
1378+ chain_hash : genesis_block ( network) . header . bitcoin_hash ( ) ,
1379+ short_channel_id : short_chan_id,
1380+ timestamp : 0 ,
1381+ flags : 0 ,
1382+ cltv_expiry_delta : 0 ,
1383+ htlc_minimum_msat : 0 ,
1384+ fee_base_msat : 0 ,
1385+ fee_proportional_millionths : 0 ,
1386+ excess_data : vec ! [ ] ,
1387+ }
1388+ }
1389+ }
1390+
1391+ #[ test]
1392+ fn test_do_attempt_write_data ( ) {
1393+ // Create 2 peers with custom TestRoutingMessageHandlers and connect them.
1394+ let chan_handlers = create_chan_handlers ( 2 ) ;
1395+ let mut routing_handlers: Vec < Arc < msgs:: RoutingMessageHandler > > = Vec :: new ( ) ;
1396+ let mut routing_handlers_concrete: Vec < Arc < TestRoutingMessageHandler > > = Vec :: new ( ) ;
1397+ for _ in 0 ..2 {
1398+ let routing_handler = Arc :: new ( TestRoutingMessageHandler :: new ( ) ) ;
1399+ routing_handlers. push ( routing_handler. clone ( ) ) ;
1400+ routing_handlers_concrete. push ( routing_handler. clone ( ) ) ;
1401+ }
1402+ let peers = create_network ( 2 , & chan_handlers, Some ( & routing_handlers) ) ;
1403+
1404+ // By calling establish_connect, we trigger do_attempt_write_data between
1405+ // the peers. Previously this function would mistakenly enter an infinite loop
1406+ // when there were more channel messages available than could fit into a peer's
1407+ // buffer. This issue would now be detected by this test (because we use custom
1408+ // RoutingMessageHandlers that intentionally return more channel messages
1409+ // than can fit into a peer's buffer).
1410+ let ( mut fd_a, mut fd_b) = establish_connection ( & peers[ 0 ] , & peers[ 1 ] ) ;
1411+
1412+ // Make each peer to read the messages that the other peer just wrote to them.
1413+ peers[ 1 ] . read_event ( & mut fd_b, & fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1414+ peers[ 0 ] . read_event ( & mut fd_a, & fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ) . unwrap ( ) ;
1415+
1416+ // Check that each peer has received the expected number of channel updates and channel
1417+ // announcements.
1418+ assert_eq ! ( routing_handlers_concrete[ 0 ] . clone( ) . chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
1419+ assert_eq ! ( routing_handlers_concrete[ 0 ] . clone( ) . chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
1420+ assert_eq ! ( routing_handlers_concrete[ 1 ] . clone( ) . chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
1421+ assert_eq ! ( routing_handlers_concrete[ 1 ] . clone( ) . chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
1422+ }
12751423}
0 commit comments