Skip to content

Commit 3bc8b99

Browse files
authored
harness: improve robustness of tests (#1194)
* improve robustness of `should_update_load_epoch_when_balancer_restart_2_monoliths` * slightly improve `should_prioritize_same_region_ws` robustness * refactor harness discoverer to make it easier to debug * fix harness discoverer sometimes listening on a port that is already being used * improve some error messages * use ipv6 localhost to check port availability * minor refactor * automatically try to restart the balancer if starting it the first time fails * make `route_ws_to_correct_monolith_race` much less flaky * improve reliability of `should_not_unload_rooms_when_balancer_restart` * format
1 parent 6a5498d commit 3bc8b99

File tree

6 files changed

+180
-80
lines changed

6 files changed

+180
-80
lines changed

crates/harness-tests/src/routing.rs

+22-5
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {
138138

139139
for i in 0..100 {
140140
let room_name = format!("foo{}", i);
141+
println!("iteration: {}", room_name);
141142
m.load_room(room_name.clone()).await;
142143

143144
let mut client = Client::new(ctx).unwrap();
@@ -153,19 +154,30 @@ async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {
153154
},
154155
result = tokio::time::timeout(Duration::from_secs(1), dummy.wait_recv()) => {
155156
result.expect("msg recv timeout");
157+
println!("dummy received message");
158+
tokio::time::timeout(Duration::from_millis(100), dummy.wait_recv()).await.expect("dummy never received unload message"); // wait for unload message
156159
continue; // because we are waiting for the client to reconnect
157160
},
158161
_ = client.wait_for_disconnect() => {
159-
println!("client disconnected, retrying");
162+
println!("client disconnected, retrying =====================================");
160163
client.join(room_name.clone()).await;
161164
continue;
162165
}
163166
};
164167
}
165168

166169
let recvd = m.collect_recv();
167-
assert_eq!(recvd.len(), 1);
168-
assert!(matches!(recvd[0], MsgB2M::Join(_)));
170+
assert_eq!(
171+
recvd.len(),
172+
1,
173+
"expected exactly one message, got {:?}",
174+
recvd
175+
);
176+
if let MsgB2M::Join(m) = &recvd[0] {
177+
assert_eq!(m.room, room_name.into());
178+
} else {
179+
panic!("expected join message, got {:?}", recvd[0])
180+
}
169181
m.clear_recv();
170182
dummy.clear_recv();
171183
}
@@ -237,7 +249,12 @@ async fn unicast_messaging(ctx: &mut TestRunner) {
237249
let res = vec![c1.recv().await, c2.recv().await];
238250
let oks: Vec<_> = res.iter().filter(|r| r.is_ok()).collect();
239251

240-
assert_eq!(oks.len(), 1);
252+
assert_eq!(
253+
oks.len(),
254+
1,
255+
"expected exactly one ok result, got these messages: {:?}",
256+
oks
257+
);
241258
assert_eq!(oks[0].as_ref().unwrap().to_string(), "{}");
242259
}
243260

@@ -297,7 +314,7 @@ async fn should_prioritize_same_region_ws(ctx: &mut TestRunner) {
297314
let mut c1 = Client::new(ctx).unwrap();
298315
c1.join("foo").await;
299316

300-
tokio::time::timeout(Duration::from_millis(100), m1.wait_recv())
317+
tokio::time::timeout(Duration::from_millis(200), m1.wait_recv())
301318
.await
302319
.expect("timeout waiting for join message");
303320

crates/harness-tests/src/state.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,19 @@ async fn should_not_unload_rooms_when_balancer_restart(ctx: &mut TestRunner) {
6666

6767
m.show().await;
6868
c1.join("foo").await;
69-
m.wait_recv().await;
69+
tokio::time::timeout(Duration::from_millis(200), m.wait_recv())
70+
.await
71+
.expect("m did not receive join message (pre restart)");
7072
m.clear_recv();
7173

7274
ctx.restart_balancer().await;
7375

7476
m.wait_for_balancer_connect().await;
7577
c1.disconnect().await;
7678
c1.join("foo").await;
77-
m.wait_recv().await;
79+
tokio::time::timeout(Duration::from_millis(200), m.wait_recv())
80+
.await
81+
.expect("m did not receive join message (post restart)");
7882
m.gossip().await;
7983
let _ = tokio::time::timeout(Duration::from_millis(200), m.wait_recv()).await;
8084

@@ -103,15 +107,19 @@ async fn should_update_load_epoch_when_balancer_restart_2_monoliths(ctx: &mut Te
103107

104108
m.show().await;
105109
c1.join("foo").await;
106-
m.wait_recv().await;
110+
tokio::time::timeout(Duration::from_millis(200), m.wait_recv())
111+
.await
112+
.expect("m did not receive join message (pre restart)");
107113
m.clear_recv();
108114

109115
ctx.restart_balancer().await;
116+
c1.disconnect().await;
110117

111118
m.wait_for_balancer_connect().await;
112-
c1.disconnect().await;
113119
c1.join("foo").await;
114-
m.wait_recv().await;
120+
tokio::time::timeout(Duration::from_millis(200), m.wait_recv())
121+
.await
122+
.expect("m did not receive join message (post restart)");
115123
m.gossip().await;
116124
let _ = tokio::time::timeout(Duration::from_millis(200), m.wait_recv()).await;
117125

@@ -121,7 +129,9 @@ async fn should_update_load_epoch_when_balancer_restart_2_monoliths(ctx: &mut Te
121129
.await;
122130
m2.show().await;
123131
m2.load_room("foo").await;
124-
m2.wait_recv().await;
132+
tokio::time::timeout(Duration::from_millis(200), m2.wait_recv())
133+
.await
134+
.expect("m2 did not receive any message");
125135

126136
let recv = m2.collect_recv();
127137
for msg in recv {

crates/harness/src/test_runner.rs

+45-19
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@ impl TestRunner {
4949
warn!("restart_balancer: Failed to kill balancer: {:?}", result);
5050
}
5151

52-
self.child = Self::spawn_balancer(&self.spawn_options).await;
52+
self.child = Self::spawn_balancer(&self.spawn_options)
53+
.await
54+
.expect("failed to respawn balancer");
5355
}
5456

5557
/// Spawn a new balancer and wait for it to be ready.
56-
async fn spawn_balancer(opts: &BalancerSpawnOptions) -> Child {
58+
async fn spawn_balancer(opts: &BalancerSpawnOptions) -> anyhow::Result<Child> {
5759
let mut envs: HashMap<_, _> = HashMap::from_iter([
5860
("BALANCER_PORT", format!("{}", opts.port)),
5961
(
@@ -65,7 +67,12 @@ impl TestRunner {
6567
envs.insert("BALANCER_REGION", region.clone());
6668
}
6769

68-
let child = Command::new("cargo")
70+
let client = reqwest::Client::builder()
71+
.timeout(std::time::Duration::from_millis(100))
72+
.build()
73+
.expect("failed to build request client");
74+
75+
let mut child = Command::new("cargo")
6976
.args([
7077
"run",
7178
"-p",
@@ -77,12 +84,8 @@ impl TestRunner {
7784
.envs(envs)
7885
.spawn()
7986
.expect("Failed to start balancer");
80-
8187
loop {
82-
let client = reqwest::Client::builder()
83-
.timeout(std::time::Duration::from_millis(100))
84-
.build()
85-
.expect("failed to build request client");
88+
println!("waiting for balancer to start");
8689
match client
8790
.get(&format!("http://localhost:{}/api/status", opts.port))
8891
.send()
@@ -94,12 +97,19 @@ impl TestRunner {
9497
}
9598
}
9699
Err(_) => {
100+
match child.try_wait() {
101+
Ok(Some(status)) => {
102+
anyhow::bail!("Exited with status {}", status);
103+
}
104+
Ok(None) => {} // process is still running
105+
Err(e) => anyhow::bail!("Error waiting for balancer to start: {}", e),
106+
}
97107
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
98108
}
99109
}
100110
}
101111

102-
child
112+
Ok(child)
103113
}
104114

105115
/// Create a URL that points to the balancer. This creates URLs that clients should connect to when making HTTP requests.
@@ -117,22 +127,38 @@ impl AsyncTestContext for TestRunner {
117127
async fn setup() -> Self {
118128
let mut port;
119129
let mut harness_port;
130+
131+
let mut opts;
132+
let child;
133+
let mut attempts = 0;
120134
loop {
135+
if attempts > 5 {
136+
panic!("Failed to find an unused port after 5 attempts");
137+
}
121138
port = random_unused_port();
122139
harness_port = random_unused_port();
123140
// Ensure that the harness port is different from the balancer port.
124-
if port != harness_port {
125-
break;
141+
if port == harness_port {
142+
continue;
126143
}
127-
}
128-
129-
let opts = BalancerSpawnOptions {
130-
port,
131-
harness_port,
132-
region: None,
133-
};
144+
attempts += 1;
145+
146+
opts = BalancerSpawnOptions {
147+
port,
148+
harness_port,
149+
region: None,
150+
};
151+
152+
child = match Self::spawn_balancer(&opts).await {
153+
Ok(child) => child,
154+
Err(e) => {
155+
println!("Failed to spawn balancer: {}", e);
156+
continue;
157+
}
158+
};
134159

135-
let child = Self::spawn_balancer(&opts).await;
160+
break;
161+
}
136162

137163
let (_provider_task, monolith_add_tx, monolith_remove_tx) =
138164
crate::provider::DiscoveryProvider::connect(harness_port).await;

crates/harness/src/util.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ pub(crate) fn random_unused_port() -> u16 {
1111
}
1212

1313
pub(crate) fn port_is_available(port: u16) -> bool {
14-
std::net::TcpListener::bind(("localhost", port)).is_ok()
14+
std::net::TcpListener::bind(("::1", port)).is_ok()
1515
}

0 commit comments

Comments
 (0)