Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ enum NodeError {

dictionary NodeStatus {
boolean is_running;
boolean is_listening;
BestBlock current_best_block;
u64? latest_lightning_wallet_sync_timestamp;
u64? latest_onchain_wallet_sync_timestamp;
Expand Down
3 changes: 0 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::default::Default;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, Once, RwLock};
use std::time::SystemTime;
use std::{fmt, fs};
Expand Down Expand Up @@ -1133,7 +1132,6 @@ fn build_with_store_internal(
}

// Initialize the status fields.
let is_listening = Arc::new(AtomicBool::new(false));
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(metrics) => Arc::new(RwLock::new(metrics)),
Err(e) => {
Expand Down Expand Up @@ -1734,7 +1732,6 @@ fn build_with_store_internal(
peer_store,
payment_store,
is_running,
is_listening,
node_metrics,
om_mailbox,
async_payments_role,
Expand Down
96 changes: 52 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ mod wallet;

use std::default::Default;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -189,7 +188,6 @@ pub struct Node {
peer_store: Arc<PeerStore<Arc<Logger>>>,
payment_store: Arc<PaymentStore>,
is_running: Arc<RwLock<bool>>,
is_listening: Arc<AtomicBool>,
node_metrics: Arc<RwLock<NodeMetrics>>,
om_mailbox: Option<Arc<OnionMessageMailbox>>,
async_payments_role: Option<AsyncPaymentsRole>,
Expand Down Expand Up @@ -293,9 +291,7 @@ impl Node {
if let Some(listening_addresses) = &self.config.listening_addresses {
// Setup networking
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);
let listening_indicator = Arc::clone(&self.is_listening);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());

Expand All @@ -313,45 +309,62 @@ impl Node {
bind_addrs.extend(resolved_address);
}

self.runtime.spawn_cancellable_background_task(async move {
{
let listener =
tokio::net::TcpListener::bind(&*bind_addrs).await
.unwrap_or_else(|e| {
log_error!(listening_logger, "Failed to bind to listen addresses/ports - is something else already listening on it?: {}", e);
panic!(
"Failed to bind to listen address/port - is something else already listening on it?",
);
});

listening_indicator.store(true, Ordering::Release);

loop {
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
tokio::select! {
_ = stop_listen.changed() => {
log_debug!(
listening_logger,
"Stopping listening to inbound connections."
let logger = Arc::clone(&listening_logger);
let listeners = self.runtime.block_on(async move {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to block if we want to return an error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see the point. I guess we need to if we want to abort on error for now, but we'll probably make start async soonish anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative is to use non-tokio binds here.

let mut listeners = Vec::new();

// Try to bind to all addresses
for addr in &*bind_addrs {
match tokio::net::TcpListener::bind(addr).await {
Ok(listener) => {
log_trace!(logger, "Listener bound to {}", addr);
listeners.push(listener);
},
Err(e) => {
log_error!(
logger,
"Failed to bind to {}: {} - is something else already listening?",
addr,
e
);
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
tokio::spawn(async move {
lightning_net_tokio::setup_inbound(
Arc::clone(&peer_mgr),
tcp_stream.into_std().unwrap(),
)
.await;
});
}
return Err(Error::InvalidSocketAddress);
},
}
}
}

listening_indicator.store(false, Ordering::Release);
});
Ok(listeners)
})?;

for listener in listeners {
let logger = Arc::clone(&listening_logger);
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
let mut stop_listen = self.stop_sender.subscribe();
let runtime = Arc::clone(&self.runtime);
self.runtime.spawn_cancellable_background_task(async move {
loop {
tokio::select! {
_ = stop_listen.changed() => {
log_debug!(
logger,
"Stopping listening to inbound connections."
);
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
let peer_mgr = Arc::clone(&peer_mgr);
runtime.spawn_cancellable_background_task(async move {
lightning_net_tokio::setup_inbound(
Arc::clone(&peer_mgr),
tcp_stream.into_std().unwrap(),
)
.await;
});
}
}
}
});
}
}

// Regularly reconnect to persisted peers.
Expand Down Expand Up @@ -666,7 +679,6 @@ impl Node {
/// Returns the status of the [`Node`].
pub fn status(&self) -> NodeStatus {
let is_running = *self.is_running.read().unwrap();
let is_listening = self.is_listening.load(Ordering::Acquire);
let current_best_block = self.channel_manager.current_best_block().into();
let locked_node_metrics = self.node_metrics.read().unwrap();
let latest_lightning_wallet_sync_timestamp =
Expand All @@ -684,7 +696,6 @@ impl Node {

NodeStatus {
is_running,
is_listening,
current_best_block,
latest_lightning_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
Expand Down Expand Up @@ -1495,9 +1506,6 @@ impl Drop for Node {
pub struct NodeStatus {
/// Indicates whether the [`Node`] is running.
pub is_running: bool,
/// Indicates whether the [`Node`] is listening for incoming connections on the addresses
/// configured via [`Config::listening_addresses`].
pub is_listening: bool,
/// The best block to which our Lightning wallet is currently synced.
pub current_best_block: BestBlock,
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
Expand Down
2 changes: 1 addition & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub(crate) fn random_storage_path() -> PathBuf {

pub(crate) fn random_port() -> u16 {
let mut rng = thread_rng();
rng.gen_range(5000..65535)
rng.gen_range(5000..32768)
}

pub(crate) fn random_listening_addresses() -> Vec<SocketAddress> {
Expand Down
24 changes: 15 additions & 9 deletions tests/integration_tests_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,21 @@ fn sign_verify_msg() {
assert!(node.verify_signature(msg, sig.as_str(), &pkey));
}

#[test]
fn connection_multi_listen() {
let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd();
let chain_source = TestChainSource::Esplora(&electrsd);
let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false);

let node_id_b = node_b.node_id();

let node_addrs_b = node_b.listening_addresses().unwrap();
for node_addr_b in &node_addrs_b {
node_a.connect(node_id_b, node_addr_b.clone(), false).unwrap();
node_a.disconnect(node_id_b).unwrap();
}
}

#[test]
fn connection_restart_behavior() {
do_connection_restart_behavior(true);
Expand All @@ -832,11 +847,6 @@ fn do_connection_restart_behavior(persist: bool) {
let node_id_b = node_b.node_id();

let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();

while !node_b.status().is_listening {
std::thread::sleep(std::time::Duration::from_millis(10));
}

node_a.connect(node_id_b, node_addr_b, persist).unwrap();

let peer_details_a = node_a.list_peers().first().unwrap().clone();
Expand Down Expand Up @@ -886,10 +896,6 @@ fn concurrent_connections_succeed() {
let node_id_b = node_b.node_id();
let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone();

while !node_b.status().is_listening {
std::thread::sleep(std::time::Duration::from_millis(10));
}

let mut handles = Vec::new();
for _ in 0..10 {
let thread_node = Arc::clone(&node_a);
Expand Down
Loading