Skip to content

Commit d2f82fe

Browse files
Test that do_attempt_write_data does not infinitely loop
when all the channel messages can't fit into the buffer. Adds a test for PR lightningdevkit#550.
1 parent a5e0834 commit d2f82fe

File tree

1 file changed

+165
-6
lines changed

1 file changed

+165
-6
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 165 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
343343
($msg: expr) => {
344344
{
345345
log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
346+
println!("VMW: len of peer.pending_outbound_buffer: {}", peer.pending_outbound_buffer.len());
346347
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg)[..]));
347348
}
348349
}
@@ -353,6 +354,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
353354
match peer.sync_status {
354355
InitSyncTracker::NoSyncRequested => {},
355356
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
357+
println!("VMW: peer.pending_outbound_buffer.len() before calculating step: {}", peer.pending_outbound_buffer.len());
356358
let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
357359
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
358360
for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
@@ -482,6 +484,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
482484
{
483485
log_trace!(self, "Encoding and sending message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
484486
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(&$msg)[..]));
487+
println!("VMW in encode_and_send_msg: len of peer.pending_outbound_buffer: {}", peer.pending_outbound_buffer.len());
485488
peers.peers_needing_send.insert(peer_descriptor.clone());
486489
}
487490
}
@@ -953,6 +956,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
953956
//TODO: Do whatever we're gonna do for handling dropped messages
954957
});
955958
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
959+
println!("VMW: len of peer.pending_outbound_buffer: {}", peer.pending_outbound_buffer.len());
956960
self.do_attempt_write_data(&mut descriptor, peer);
957961
},
958962
MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
@@ -976,6 +980,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
976980
}
977981
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
978982
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_update_msg[..]));
983+
println!("VMW: len of peer.pending_outbound_buffer: {}", peer.pending_outbound_buffer.len());
979984
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
980985
}
981986
}
@@ -1006,6 +1011,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
10061011
continue
10071012
}
10081013
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_msg[..]));
1014+
println!("VMW: len of peer.pending_outbound_buffer post BroadcastChannelUpdate: {}", peer.pending_outbound_buffer.len());
10091015
self.do_attempt_write_data(&mut (*descriptor).clone(), peer);
10101016
}
10111017
}
@@ -1149,8 +1155,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
11491155

11501156
#[cfg(test)]
11511157
mod tests {
1158+
use secp256k1::Signature;
1159+
use bitcoin::BitcoinHash;
1160+
use bitcoin::network::constants::Network;
1161+
use bitcoin::blockdata::constants::genesis_block;
11521162
use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
11531163
use ln::msgs;
1164+
use ln::features::ChannelFeatures;
11541165
use util::events;
11551166
use util::test_utils;
11561167
use util::logger::Logger;
@@ -1162,6 +1173,7 @@ mod tests {
11621173

11631174
use std;
11641175
use std::sync::{Arc, Mutex};
1176+
use std::sync::atomic::{AtomicUsize, Ordering};
11651177

11661178
#[derive(Clone)]
11671179
struct FileDescriptor {
@@ -1199,7 +1211,7 @@ mod tests {
11991211
chan_handlers
12001212
}
12011213

1202-
fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
1214+
fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec<test_utils::TestChannelMessageHandler>, routing_handlers: Option<&'a Vec<Arc<msgs::RoutingMessageHandler>>>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>> {
12031215
let mut peers = Vec::new();
12041216
let mut rng = thread_rng();
12051217
let logger : Arc<Logger> = Arc::new(test_utils::TestLogger::new());
@@ -1208,29 +1220,37 @@ mod tests {
12081220

12091221
for i in 0..peer_count {
12101222
let router = test_utils::TestRoutingMessageHandler::new();
1223+
let router = if let Some(routers) = routing_handlers { routers[i].clone() } else {
1224+
Arc::new(router)
1225+
};
12111226
let node_id = {
12121227
let mut key_slice = [0;32];
12131228
rng.fill_bytes(&mut key_slice);
12141229
SecretKey::from_slice(&key_slice).unwrap()
12151230
};
1216-
let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: Arc::new(router) };
1231+
let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: router };
12171232
let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger));
12181233
peers.push(peer);
12191234
}
12201235

12211236
peers
12221237
}
12231238

1224-
fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) {
1239+
fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler>) -> (FileDescriptor, FileDescriptor) {
12251240
let secp_ctx = Secp256k1::new();
12261241
let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
12271242
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
12281243
let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
12291244
let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone()).unwrap();
12301245
peer_a.new_inbound_connection(fd_a.clone()).unwrap();
1246+
println!("doing peer_a read event");
12311247
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
1248+
println!("just did peer_a read event");
12321249
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
1250+
println!("just did peer_b read event");
12331251
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
1252+
println!("just did peer_a read event");
1253+
(fd_a.clone(), fd_b.clone())
12341254
}
12351255

12361256
#[test]
@@ -1239,7 +1259,7 @@ mod tests {
12391259
// push a DisconnectPeer event to remove the node flagged by id
12401260
let chan_handlers = create_chan_handlers(2);
12411261
let chan_handler = test_utils::TestChannelMessageHandler::new();
1242-
let mut peers = create_network(2, &chan_handlers);
1262+
let mut peers = create_network(2, &chan_handlers, None);
12431263
establish_connection(&peers[0], &peers[1]);
12441264
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
12451265

@@ -1256,11 +1276,12 @@ mod tests {
12561276
peers[0].process_events();
12571277
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
12581278
}
1279+
12591280
#[test]
1260-
fn test_timer_tick_occured(){
1281+
fn test_timer_tick_occurred() {
12611282
// Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
12621283
let chan_handlers = create_chan_handlers(2);
1263-
let peers = create_network(2, &chan_handlers);
1284+
let peers = create_network(2, &chan_handlers, None);
12641285
establish_connection(&peers[0], &peers[1]);
12651286
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
12661287

@@ -1272,4 +1293,142 @@ mod tests {
12721293
peers[0].timer_tick_occured();
12731294
assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
12741295
}
1296+
1297+
pub struct TestRoutingMessageHandler {
1298+
pub chan_upds_recvd: AtomicUsize,
1299+
pub chan_anns_recvd: AtomicUsize,
1300+
}
1301+
1302+
impl TestRoutingMessageHandler {
1303+
pub fn new() -> Self {
1304+
TestRoutingMessageHandler {
1305+
chan_upds_recvd: AtomicUsize::new(0),
1306+
chan_anns_recvd: AtomicUsize::new(0),
1307+
}
1308+
}
1309+
1310+
}
1311+
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
1312+
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
1313+
Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
1314+
}
1315+
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
1316+
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
1317+
Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
1318+
}
1319+
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
1320+
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
1321+
Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
1322+
}
1323+
fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
1324+
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, msgs::ChannelUpdate,msgs::ChannelUpdate)> {
1325+
println!("VMW: starting_point: {}, batch_amt: {}", starting_point, batch_amount);
1326+
let mut chan_anns = Vec::new();
1327+
let batch_amt = batch_amount as u64;
1328+
let end: u64 = if starting_point > 0 { starting_point + batch_amt - 1 } else { batch_amount.into() };
1329+
for j in starting_point..end {
1330+
let chan_upd_1 = get_dummy_channel_update(j);
1331+
let chan_upd_2 = get_dummy_channel_update(j);
1332+
let chan_ann = get_dummy_channel_announcement(j);
1333+
1334+
chan_anns.push((chan_ann, chan_upd_1, chan_upd_2));
1335+
}
1336+
1337+
chan_anns
1338+
}
1339+
1340+
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
1341+
Vec::new()
1342+
}
1343+
1344+
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
1345+
true
1346+
}
1347+
}
1348+
1349+
fn get_dummy_channel_announcement(short_chan_id: u64) -> msgs::ChannelAnnouncement {
1350+
use secp256k1::ffi::Signature as FFISignature;
1351+
let secp_ctx = Secp256k1::new();
1352+
let network = Network::Testnet;
1353+
let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1354+
let node_2_privkey = SecretKey::from_slice(&[41; 32]).unwrap();
1355+
let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
1356+
let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
1357+
let unsigned_ann = msgs::UnsignedChannelAnnouncement {
1358+
features: ChannelFeatures::supported(),
1359+
chain_hash: genesis_block(network).header.bitcoin_hash(),
1360+
short_channel_id: short_chan_id,
1361+
node_id_1: PublicKey::from_secret_key(&secp_ctx, &node_1_privkey),
1362+
node_id_2: PublicKey::from_secret_key(&secp_ctx, &node_2_privkey),
1363+
bitcoin_key_1: PublicKey::from_secret_key(&secp_ctx, &node_1_btckey),
1364+
bitcoin_key_2: PublicKey::from_secret_key(&secp_ctx, &node_2_btckey),
1365+
excess_data: Vec::new(),
1366+
};
1367+
1368+
msgs::ChannelAnnouncement {
1369+
node_signature_1: Signature::from(FFISignature::new()),
1370+
node_signature_2: Signature::from(FFISignature::new()),
1371+
bitcoin_signature_1: Signature::from(FFISignature::new()),
1372+
bitcoin_signature_2: Signature::from(FFISignature::new()),
1373+
contents: unsigned_ann,
1374+
}
1375+
}
1376+
1377+
fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
1378+
use secp256k1::ffi::Signature as FFISignature;
1379+
let network = Network::Testnet;
1380+
msgs::ChannelUpdate {
1381+
signature: Signature::from(FFISignature::new()),
1382+
contents: msgs::UnsignedChannelUpdate {
1383+
chain_hash: genesis_block(network).header.bitcoin_hash(),
1384+
short_channel_id: short_chan_id,
1385+
timestamp: 0,
1386+
flags: 0,
1387+
cltv_expiry_delta: 0,
1388+
htlc_minimum_msat: 0,
1389+
fee_base_msat: 0,
1390+
fee_proportional_millionths: 0,
1391+
excess_data: vec![],
1392+
}
1393+
}
1394+
}
1395+
1396+
#[test]
1397+
fn test_do_attempt_write_data() {
1398+
// Create 2 peers with custom TestRoutingMessageHandlers and connect them.
1399+
let chan_handlers = create_chan_handlers(2);
1400+
let mut routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = Vec::new();
1401+
let mut routing_handlers_concrete: Vec<Arc<TestRoutingMessageHandler>> = Vec::new();
1402+
for _ in 0..2 {
1403+
let routing_handler = Arc::new(TestRoutingMessageHandler::new());
1404+
routing_handlers.push(routing_handler.clone());
1405+
routing_handlers_concrete.push(routing_handler.clone());
1406+
}
1407+
let peers = create_network(2, &chan_handlers, Some(&routing_handlers));
1408+
1409+
// By calling establish_connect, we trigger do_attempt_write_data between
1410+
// the peers. Previously this function would mistakenly enter an infinite loop
1411+
// when there were more channel messages available than could fit into a peer's
1412+
// buffer. This issue would now be detected by this test (because we use custom
1413+
// RoutingMessageHandlers that intentionally return more channel messages
1414+
// than can fit into a peer's buffer).
1415+
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
1416+
1417+
// Make each peer to read the messages that the other peer just wrote to them.
1418+
println!("VMW IN TEST: doing peer_b read_event");
1419+
peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
1420+
println!("VMW IN TEST: just did peer_b read event, doing peer_a read_event");
1421+
peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
1422+
println!("VMW IN TEST: just did peer_a read_event");
1423+
1424+
// Check that each peer has received the expected number of channel updates and channel
1425+
// announcements. peer_a sends one fewer channel announcement to peer_b because, at the
1426+
// time of graph sync, peer_a already has an Init message for peer_b in its buffer of
1427+
// messages to send to peer_b (and the number of messages sent is dependent on the size
1428+
// of said buffer; see how `steps` is calculated in do_attempt_write_data).
1429+
assert_eq!(routing_handlers_concrete[0].clone().chan_upds_recvd.load(Ordering::Acquire), 8);
1430+
assert_eq!(routing_handlers_concrete[0].clone().chan_anns_recvd.load(Ordering::Acquire), 4);
1431+
assert_eq!(routing_handlers_concrete[1].clone().chan_upds_recvd.load(Ordering::Acquire), 6);
1432+
assert_eq!(routing_handlers_concrete[1].clone().chan_anns_recvd.load(Ordering::Acquire), 3);
1433+
}
12751434
}

0 commit comments

Comments
 (0)