Skip to content

More fixes for loopback networking #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,13 @@ impl Builder {
panic!("Maximum message latency must be greater than minimum.");
}

let world = World::new(self.link.clone(), rng, self.ip_version.iter());
let world = World::new(
self.link.clone(),
rng,
self.ip_version.iter(),
self.config.tick,
);

Sim::new(self.config.clone(), world)
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::{
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Duration,
};

use bytes::{Buf, Bytes};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
runtime::Handle,
sync::{mpsc, oneshot},
time::sleep,
};
Expand Down Expand Up @@ -281,8 +281,21 @@ impl WriteHalf {
}

fn send_loopback(src: SocketAddr, dst: SocketAddr, message: Protocol) {
// Check for a runtime before spawning as this code is hit in the drop path
// as streams attempt to send FINs.
// TODO: Investigate drop ordering within the Sim to ensure things are unrolling
// as expected.
if Handle::try_current().is_err() {
return;
}

tokio::spawn(async move {
sleep(Duration::from_micros(1)).await;
// FIXME: Forces delivery on the next step which better aligns with the
// remote networking behavior.
// https://github.com/tokio-rs/turmoil/issues/132
let tick_duration = World::current(|world| world.tick_duration);
sleep(tick_duration).await;

World::current(|world| {
if let Err(rst) =
world
Expand Down
8 changes: 6 additions & 2 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::{
cmp,
io::{self, Error, ErrorKind, Result},
net::SocketAddr,
time::Duration,
};

/// A simulated UDP socket.
Expand Down Expand Up @@ -304,7 +303,12 @@ impl UdpSocket {

fn send_loopback(src: SocketAddr, dst: SocketAddr, message: Protocol) {
tokio::spawn(async move {
sleep(Duration::from_micros(1)).await;
// FIXME: Forces delivery on the next step which better aligns with the
// remote networking behavior.
// https://github.com/tokio-rs/turmoil/issues/132
let tick_duration = World::current(|world| world.tick_duration);
sleep(tick_duration).await;

World::current(|world| {
world
.current_host_mut()
Expand Down
11 changes: 8 additions & 3 deletions src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct Sim<'a> {

/// Simulation elapsed time
elapsed: Duration,

steps: usize,
}

impl<'a> Sim<'a> {
Expand All @@ -44,6 +46,7 @@ impl<'a> Sim<'a> {
rts: IndexMap::new(),
since_epoch,
elapsed: Duration::ZERO,
steps: 1, // bumped after each step
}
}

Expand Down Expand Up @@ -325,8 +328,9 @@ impl<'a> Sim<'a> {
///
/// Returns whether or not all clients have completed.
pub fn step(&mut self) -> Result<bool> {
let tick = self.config.tick;
tracing::debug!("step {}", self.steps);

let tick = self.config.tick;
let mut is_finished = true;

// Tick the networking, processing messages. This is done before
Expand Down Expand Up @@ -376,11 +380,12 @@ impl<'a> Sim<'a> {
}

self.elapsed += tick;
self.steps += 1;

if self.elapsed > self.config.duration && !is_finished {
return Err(format!(
"Ran for {:?} without completing",
self.config.duration
"Ran for duration: {:?} steps: {} without completing",
self.config.duration, self.steps,
))?;
}

Expand Down
6 changes: 6 additions & 0 deletions src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub(crate) struct World {
/// Random number generator used for all decisions. To make execution
/// determinstic, reuse the same seed.
pub(crate) rng: Box<dyn RngCore>,

/// Run duration for each host on every step.
// TODO: Remove this once we've cleaned up the loopback implementation hacks
pub(crate) tick_duration: Duration,
}

scoped_thread_local!(static CURRENT: RefCell<World>);
Expand All @@ -38,13 +42,15 @@ impl World {
link: config::Link,
rng: Box<dyn RngCore>,
addrs: IpVersionAddrIter,
tick_duration: Duration,
) -> World {
World {
hosts: IndexMap::new(),
topology: Topology::new(link),
dns: Dns::new(addrs),
current: None,
rng,
tick_duration,
}
}

Expand Down
37 changes: 36 additions & 1 deletion tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,12 +844,47 @@ fn bind_addr_in_use() -> Result {
sim.run()
}

#[test]
fn loopback_delivery() -> Result {
let mut sim = Builder::new()
.tick_duration(Duration::from_millis(100))
.build();

sim.client("host", async {
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 1738)).await?;

tokio::spawn(async move {
let (mut acc, _) = listener.accept().await.unwrap();

tokio::time::sleep(Duration::from_millis(50)).await;
_ = acc.write_u128(9).await;
});

let mut s = TcpStream::connect((Ipv4Addr::LOCALHOST, 1738)).await?;
assert_eq!(9, s.read_u128().await?);

Ok(())
});

// syn
assert!(!sim.step()?);
// write
assert!(!sim.step()?);
// write delivered
assert!(sim.step()?);

Ok(())
}

fn run_localhost_test(
ip_version: IpVersion,
bind_addr: SocketAddr,
connect_addr: SocketAddr,
) -> Result {
let mut sim = Builder::new().ip_version(ip_version).build();
let mut sim = Builder::new()
.tick_duration(Duration::from_millis(100))
.ip_version(ip_version)
.build();
let expected = [0, 1, 7, 3, 8];
sim.client("client", async move {
let listener = TcpListener::bind(bind_addr).await?;
Expand Down
35 changes: 34 additions & 1 deletion tests/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,39 @@ fn bind_addr_in_use() -> Result {
sim.run()
}

#[test]
fn loopback_delivery() -> Result {
let mut sim = Builder::new()
.tick_duration(Duration::from_millis(100))
.build();

sim.client("host", async {
let socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).await?;

tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;

_ = socket
.send_to(&[1, 7, 3, 8], SocketAddr::from((Ipv4Addr::LOCALHOST, 1738)))
.await;
});

let socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 1738)).await?;
let mut buf: [u8; 4] = [0; 4];
socket.recv_from(&mut buf).await?;
assert_eq!([1, 7, 3, 8], buf);

Ok(())
});

// write
assert!(!sim.step()?);
// write delivered
assert!(sim.step()?);

Ok(())
}

fn run_localhost_test(
ip_version: IpVersion,
bind_addr: SocketAddr,
Expand All @@ -479,7 +512,7 @@ fn run_localhost_test(
socket.send_to(&expected, peer).await.unwrap();
});

let mut buf = [0; 5];
let mut buf: [u8; 5] = [0; 5];
let bind_addr = SocketAddr::new(bind_addr.ip(), 0);
let socket = UdpSocket::bind(bind_addr).await?;
socket.send_to(&expected, connect_addr).await?;
Expand Down