Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/config/nat_timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ impl Default for RelayTimeouts {
}

/// Default time to wait for the peer to acknowledge stream data after a send.
const DEFAULT_SEND_ACK_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_SEND_ACK_TIMEOUT: Duration = Duration::from_millis(500);

/// Fast-network send ACK timeout.
const FAST_SEND_ACK_TIMEOUT: Duration = Duration::from_millis(500);
/// Fast-network send ACK timeout (halved from default, matching the fast profile pattern).
const FAST_SEND_ACK_TIMEOUT: Duration = Duration::from_millis(250);

/// Master timeout configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
20 changes: 10 additions & 10 deletions src/connection_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ pub struct StrategyConfig {
impl Default for StrategyConfig {
fn default() -> Self {
Self {
ipv4_timeout: Duration::from_secs(5),
ipv6_timeout: Duration::from_secs(5),
holepunch_timeout: Duration::from_secs(15),
relay_timeout: Duration::from_secs(30),
max_holepunch_rounds: 3,
ipv4_timeout: Duration::from_secs(2),
ipv6_timeout: Duration::from_secs(2),
holepunch_timeout: Duration::from_secs(3),
relay_timeout: Duration::from_secs(10),
max_holepunch_rounds: 2,
ipv6_enabled: true,
relay_enabled: true,
coordinator: None,
Expand Down Expand Up @@ -484,11 +484,11 @@ mod tests {
#[test]
fn test_default_config() {
let config = StrategyConfig::default();
assert_eq!(config.ipv4_timeout, Duration::from_secs(5));
assert_eq!(config.ipv6_timeout, Duration::from_secs(5));
assert_eq!(config.holepunch_timeout, Duration::from_secs(15));
assert_eq!(config.relay_timeout, Duration::from_secs(30));
assert_eq!(config.max_holepunch_rounds, 3);
assert_eq!(config.ipv4_timeout, Duration::from_secs(2));
assert_eq!(config.ipv6_timeout, Duration::from_secs(2));
assert_eq!(config.holepunch_timeout, Duration::from_secs(3));
assert_eq!(config.relay_timeout, Duration::from_secs(10));
assert_eq!(config.max_holepunch_rounds, 2);
assert!(config.ipv6_enabled);
assert!(config.relay_enabled);
}
Expand Down
23 changes: 12 additions & 11 deletions src/p2p_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ const EVENT_CHANNEL_CAPACITY: usize = 256;
/// event-driven reader-exit detection.
const STALE_REAPER_INTERVAL: Duration = Duration::from_secs(10);

/// Quick direct connection attempt after a failed hole-punch round.
/// If the target's outgoing packets created a NAT binding, a QUIC handshake
/// through the pinhole needs only 1-2 RTTs (~600ms at 300ms worst-case RTT).
const POST_HOLEPUNCH_DIRECT_RETRY_TIMEOUT: Duration = Duration::from_secs(1);

use crate::SHUTDOWN_DRAIN_TIMEOUT;

/// Extract the raw SPKI (SubjectPublicKeyInfo) bytes from a QUIC connection's
Expand Down Expand Up @@ -1544,7 +1549,8 @@ impl P2pEndpoint {
// the target's outgoing packets even though our
// try_hole_punch didn't detect the connection.
if let Ok(Ok(peer_conn)) =
timeout(Duration::from_secs(3), self.connect(target)).await
timeout(POST_HOLEPUNCH_DIRECT_RETRY_TIMEOUT, self.connect(target))
.await
{
info!("✓ Post-hole-punch direct connect succeeded to {}", target);
return Ok((
Expand Down Expand Up @@ -1578,7 +1584,8 @@ impl P2pEndpoint {
Err(_) => {
// Same: try a quick direct connect after timeout
if let Ok(Ok(peer_conn)) =
timeout(Duration::from_secs(3), self.connect(target)).await
timeout(POST_HOLEPUNCH_DIRECT_RETRY_TIMEOUT, self.connect(target))
.await
{
info!("✓ Post-hole-punch direct connect succeeded to {}", target);
return Ok((
Expand Down Expand Up @@ -1816,7 +1823,8 @@ impl P2pEndpoint {
// Poll for the connection to appear. The target node will receive
// the relayed PUNCH_ME_NOW and initiate a QUIC connection to us,
// which gets accepted by saorsa-core's transport handler.
let deadline = tokio::time::Instant::now() + Duration::from_secs(15);
// No internal deadline — the outer strategy.holepunch_timeout()
// cancels this future when it expires.
let mut poll_count = 0u32;

loop {
Expand Down Expand Up @@ -1873,21 +1881,14 @@ impl P2pEndpoint {
}
}

// Wait briefly then re-check, or timeout
// Wait briefly then re-check; the outer timeout cancels us on expiry
tokio::select! {
_ = self.inner.connection_notify().notified() => {
debug!("try_hole_punch: connection_notify fired for {}", target);
}
_ = self.shutdown.cancelled() => {
return Err(EndpointError::ShuttingDown);
}
_ = tokio::time::sleep_until(deadline) => {
info!(
"try_hole_punch: TIMEOUT after 15s for {} (polled {} times)",
target, poll_count
);
return Err(EndpointError::Timeout);
}
// Wake periodically to drive session and re-check connections
_ = tokio::time::sleep(Duration::from_millis(500)) => {}
}
Expand Down
4 changes: 0 additions & 4 deletions tests/constrained_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,6 @@ fn test_peer_connection_transport_addr() {
authenticated: true,
connected_at: Instant::now(),
last_activity: Instant::now(),
last_health_ping_sent: None,
last_health_pong_received: None,
};
assert_eq!(peer_conn_udp.remote_addr, udp_addr);
assert_eq!(
Expand All @@ -776,8 +774,6 @@ fn test_peer_connection_transport_addr() {
authenticated: false,
connected_at: Instant::now(),
last_activity: Instant::now(),
last_health_ping_sent: None,
last_health_pong_received: None,
};
assert_eq!(peer_conn_ble.remote_addr, ble_addr);
assert_eq!(
Expand Down
65 changes: 23 additions & 42 deletions tests/simultaneous_connect_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#![allow(clippy::unwrap_used, clippy::expect_used)]

use saorsa_transport::{ConnectionHealth, Node};
use saorsa_transport::Node;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use tokio::time::timeout;
Expand Down Expand Up @@ -375,10 +375,9 @@ async fn test_connect_after_failure_succeeds() {
// Phase 1.3: Phantom Connection Detection & Recovery Tests
// ============================================================================

/// Test that two connected nodes have healthy connection status after
/// the health check PING/PONG cycle runs.
/// Test that two connected nodes report as connected via is_connected.
#[tokio::test]
async fn test_connection_health_status() {
async fn test_connection_status() {
let node_a = create_localhost_node().await;
let node_b = create_localhost_node().await;

Expand All @@ -400,58 +399,42 @@ async fn test_connection_health_status() {

let conn_addr = remote_socket_addr(&conn);

// Immediately after connect, health should be Healthy (not yet probed)
let health = node_a.connection_health(&conn_addr).await;
assert_eq!(
health,
Some(ConnectionHealth::Healthy),
"Newly connected peer should be Healthy"
);

// Wait for at least one health check cycle (30s reaper interval + margin)
// The reaper will send a PING, and the reader task on the remote end
// responds with a PONG.
tokio::time::sleep(Duration::from_secs(35)).await;

// After one cycle, the peer should still be Healthy (PONG received)
let health_after = node_a.connection_health(&conn_addr).await;
// Immediately after connect, node_a should see the peer as connected
assert!(
matches!(
health_after,
Some(ConnectionHealth::Healthy) | Some(ConnectionHealth::Checking)
),
"After one health cycle, peer should be Healthy or Checking, got {:?}",
health_after
node_a.is_connected(&conn_addr).await,
"Newly connected peer should be reported as connected"
);

// node_a should still have exactly 1 peer (not evicted)
// node_a should have exactly 1 peer
let peers = node_a.connected_peers().await;
assert_eq!(
peers.len(),
1,
"Healthy connection should not be evicted, but got {} peers",
"Should have exactly 1 connected peer, but got {} peers",
peers.len()
);

node_a.shutdown().await;
node_b.shutdown().await;
}

/// Test that connection_health returns None for unknown peers.
/// Test that is_connected returns false for unknown peers.
#[tokio::test]
async fn test_connection_health_unknown_peer() {
async fn test_connection_status_unknown_peer() {
let node = create_localhost_node().await;

let unknown_addr: SocketAddr = "127.0.0.1:59999".parse().unwrap();
let health = node.connection_health(&unknown_addr).await;
assert_eq!(health, None, "Unknown peer should return None");
assert!(
!node.is_connected(&unknown_addr).await,
"Unknown peer should not be reported as connected"
);

node.shutdown().await;
}

/// Test that after disconnect, connection_health returns None.
/// Test that after disconnect, is_connected returns false.
#[tokio::test]
async fn test_connection_health_after_disconnect() {
async fn test_connection_status_after_disconnect() {
let node_a = create_localhost_node().await;
let node_b = create_localhost_node().await;

Expand All @@ -472,9 +455,9 @@ async fn test_connection_health_after_disconnect() {
let conn_addr = remote_socket_addr(&conn);

// Verify connected
assert_eq!(
node_a.connection_health(&conn_addr).await,
Some(ConnectionHealth::Healthy),
assert!(
node_a.is_connected(&conn_addr).await,
"Peer should be connected"
);

// Disconnect
Expand All @@ -483,12 +466,10 @@ async fn test_connection_health_after_disconnect() {
.await
.expect("disconnect should succeed");

// After disconnect, health should be None
let health = node_a.connection_health(&conn_addr).await;
assert_eq!(
health, None,
"Disconnected peer should return None, got {:?}",
health
// After disconnect, should no longer be connected
assert!(
!node_a.is_connected(&conn_addr).await,
"Disconnected peer should not be reported as connected"
);

node_a.shutdown().await;
Expand Down
Loading