Skip to content

Commit

Permalink
Merge branch 'master' into balancer-rework
Browse files Browse the repository at this point in the history
  • Loading branch information
dyc3 authored Dec 18, 2023
2 parents a6276b4 + 7431ab3 commit 9635e00
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 93 deletions.
76 changes: 72 additions & 4 deletions crates/harness-tests/src/routing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use harness::{BehaviorTrackClients, Client, MockRespParts, Monolith, MonolithBuilder, TestRunner};
use ott_balancer_protocol::monolith::M2BRoomMsg;
use ott_balancer_protocol::monolith::{M2BRoomMsg, MsgB2M};
use serde_json::value::RawValue;
use test_context::test_context;

Expand Down Expand Up @@ -136,7 +136,7 @@ async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {
m.show().await;
tokio::time::sleep(Duration::from_millis(200)).await; // ensure that the monoliths are fully connected before sending the room load message

for i in 0..20 {
for i in 0..100 {
let room_name = format!("foo{}", i);
m.load_room(room_name.clone()).await;

Expand All @@ -151,6 +151,10 @@ async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {
result.expect("msg recv timeout");
break;
},
result = tokio::time::timeout(Duration::from_secs(1), dummy.wait_recv()) => {
result.expect("msg recv timeout");
continue; // because we are waiting for the client to reconnect
},
_ = client.wait_for_disconnect() => {
println!("client disconnected, retrying");
client.join(room_name.clone()).await;
Expand All @@ -161,10 +165,9 @@ async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {

let recvd = m.collect_recv();
assert_eq!(recvd.len(), 1);
assert!(matches!(recvd[0], MsgB2M::Join(_)));
m.clear_recv();
dummy.clear_recv();

tokio::time::sleep(Duration::from_millis(100)).await;
}
}

Expand Down Expand Up @@ -237,3 +240,68 @@ async fn unicast_messaging(ctx: &mut TestRunner) {
assert_eq!(oks.len(), 1);
assert_eq!(oks[0].as_ref().unwrap().to_string(), "{}");
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_prioritize_same_region_http(ctx: &mut TestRunner) {
ctx.set_region("foo").await;

let mut m1 = MonolithBuilder::new()
.region("foo")
.add_mock_http_json(
"/api/foo",
MockRespParts::default(),
serde_json::json!({
"data": "foo",
}),
)
.build(ctx)
.await;

let mut m2 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m3 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m4 = MonolithBuilder::new().region("bar").build(ctx).await;

m1.show().await;
m2.show().await;
m3.show().await;
m4.show().await;

tokio::time::sleep(Duration::from_millis(200)).await;

reqwest::get(ctx.url("/api/foo"))
.await
.expect("http request failed");

let reqs1 = m1.collect_mock_http();
assert_eq!(reqs1.len(), 1);
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_prioritize_same_region_ws(ctx: &mut TestRunner) {
ctx.set_region("foo").await;

let mut m1 = MonolithBuilder::new().region("foo").build(ctx).await;
let mut m2 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m3 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m4 = MonolithBuilder::new().region("bar").build(ctx).await;

m1.show().await;
m2.show().await;
m3.show().await;
m4.show().await;

tokio::time::sleep(Duration::from_millis(200)).await;

let mut c1 = Client::new(ctx).unwrap();
c1.join("foo").await;

tokio::time::timeout(Duration::from_millis(100), m1.wait_recv())
.await
.expect("timeout waiting for join message");

let recvd = m1.collect_recv();
assert_eq!(recvd.len(), 1);
assert!(matches!(recvd[0], MsgB2M::Join(_)));
}
46 changes: 39 additions & 7 deletions crates/harness/src/test_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};

use tokio::process::{Child, Command};

Expand All @@ -23,6 +23,25 @@ impl TestRunner {
self.spawn_options.port
}

/// The region that the balancer is configured to use. If `None`, the balancer will use the default region.
pub fn region(&self) -> Option<&str> {
self.spawn_options.region.as_deref()
}

/// Set the region that the balancer should use.
/// **This will also restart the balancer.**
pub async fn set_region(&mut self, region: impl AsRef<str>) {
self.spawn_options.region = Some(region.as_ref().to_owned());
self.restart_balancer().await;
}

/// Clear the region that the balancer should use.
/// **This will also restart the balancer.**
pub async fn clear_region(&mut self) {
self.spawn_options.region = None;
self.restart_balancer().await;
}

/// Kill the balancer and start a new one with the same configuration.
pub async fn restart_balancer(&mut self) {
println!("restarting balancer");
Expand All @@ -35,6 +54,17 @@ impl TestRunner {

/// Spawn a new balancer and wait for it to be ready.
async fn spawn_balancer(opts: &BalancerSpawnOptions) -> Child {
let mut envs: HashMap<_, _> = HashMap::from_iter([
("BALANCER_PORT", format!("{}", opts.port)),
(
"BALANCER_DISCOVERY",
format!("{{method=\"harness\", port={}}}", opts.harness_port),
),
]);
if let Some(region) = &opts.region {
envs.insert("BALANCER_REGION", region.clone());
}

let child = Command::new("cargo")
.args([
"run",
Expand All @@ -44,11 +74,7 @@ impl TestRunner {
"--log-level",
"debug",
])
.env("BALANCER_PORT", format!("{}", opts.port))
.env(
"BALANCER_DISCOVERY",
format!("{{method=\"harness\", port={}}}", opts.harness_port),
)
.envs(envs)
.spawn()
.expect("Failed to start balancer");

Expand Down Expand Up @@ -100,7 +126,11 @@ impl AsyncTestContext for TestRunner {
}
}

let opts = BalancerSpawnOptions { port, harness_port };
let opts = BalancerSpawnOptions {
port,
harness_port,
region: None,
};

let child = Self::spawn_balancer(&opts).await;

Expand All @@ -123,9 +153,11 @@ impl AsyncTestContext for TestRunner {
}
}

#[derive(Debug, Clone)]
struct BalancerSpawnOptions {
pub port: u16,
pub harness_port: u16,
pub region: Option<String>,
}

#[cfg(test)]
Expand Down
40 changes: 35 additions & 5 deletions crates/ott-balancer-bin/src/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::client::ClientLink;
use crate::config::BalancerConfig;
use crate::monolith::Room;
use crate::room::RoomLocator;
use crate::{
Expand Down Expand Up @@ -361,17 +362,42 @@ impl BalancerContext {

/// When loading a room, call this to select the best monolith to load it on.
pub fn select_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
let selected = self
.monoliths
.values()
.min_by(|x, y| x.rooms().len().cmp(&y.rooms().len()));
fn cmp(x: &BalancerMonolith, y: &BalancerMonolith) -> std::cmp::Ordering {
x.rooms().len().cmp(&y.rooms().len())
}

let in_region = self
.monoliths_by_region
.get(BalancerConfig::get().region.as_str());
if let Some(in_region) = in_region {
let selected = in_region
.iter()
.flat_map(|id| self.monoliths.get(id))
.min_by(|x, y| cmp(x, y));
if let Some(s) = selected {
return Ok(s);
}
}
let selected = self.monoliths.values().min_by(|x, y| cmp(x, y));
match selected {
Some(s) => Ok(s),
None => anyhow::bail!("no monoliths available"),
}
}

pub fn random_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
let in_region = self
.monoliths_by_region
.get(BalancerConfig::get().region.as_str());
if let Some(in_region) = in_region {
let selected = in_region.iter().choose(&mut rand::thread_rng());
if let Some(s) = selected {
if let Some(m) = self.monoliths.get(s) {
return Ok(m);
}
}
}

let selected = self
.monoliths
.values()
Expand Down Expand Up @@ -409,7 +435,11 @@ pub async fn join_client(
None => {
// the room is not loaded, randomly select a monolith
let selected = ctx_write.select_monolith()?;
debug!("room is not loaded, selected monolith: {:?}", selected.id());
debug!(
"room is not loaded, selected monolith: {:?} (region: {:?})",
selected.id(),
selected.region()
);
(selected.id(), true)
}
};
Expand Down
1 change: 1 addition & 0 deletions crates/ott-balancer-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn main() -> anyhow::Result<()> {
.with(filter_layer)
.with(fmt_layer)
.init();
info!("Loaded config: {:?}", config);

let (discovery_tx, discovery_rx) = tokio::sync::mpsc::channel(2);

Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"patch-package": "^7.0.1",
"postinstall-postinstall": "^2.1.0",
"ts-essentials": "^9.3.0",
"typescript": "5.1.6",
"typescript": "5.3.3",
"validator": "^13.7.0",
"winston": "^3.10.0"
},
Expand All @@ -47,8 +47,8 @@
"@types/node": "^16.11.7",
"@types/uuid": "^3.4.0",
"@types/validator": "^13.1.3",
"@typescript-eslint/eslint-plugin": "6.0.0",
"@typescript-eslint/parser": "6.0.0",
"@typescript-eslint/eslint-plugin": "6.14.0",
"@typescript-eslint/parser": "6.14.0",
"concurrently": "7.6.0",
"cypress": "12.14.0",
"cypress-iframe": "^1.0.1",
Expand All @@ -66,4 +66,4 @@
"ts-jest": "29.1.1",
"ts-node": "^10.9.1"
}
}
}
Loading

0 comments on commit 9635e00

Please sign in to comment.