Skip to content

Commit

Permalink
balancer: prioritize routing new rooms to monoliths in the same region (
Browse files Browse the repository at this point in the history
#1182)

* balancer: prioritize routing new rooms to monoliths in the same region

* fix lints

* fix some potential test flakyness
  • Loading branch information
dyc3 authored Dec 18, 2023
1 parent ea90d9d commit 7431ab3
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 12 deletions.
65 changes: 65 additions & 0 deletions crates/harness-tests/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,68 @@ async fn unicast_messaging(ctx: &mut TestRunner) {
assert_eq!(oks.len(), 1);
assert_eq!(oks[0].as_ref().unwrap().to_string(), "{}");
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_prioritize_same_region_http(ctx: &mut TestRunner) {
ctx.set_region("foo").await;

let mut m1 = MonolithBuilder::new()
.region("foo")
.add_mock_http_json(
"/api/foo",
MockRespParts::default(),
serde_json::json!({
"data": "foo",
}),
)
.build(ctx)
.await;

let mut m2 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m3 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m4 = MonolithBuilder::new().region("bar").build(ctx).await;

m1.show().await;
m2.show().await;
m3.show().await;
m4.show().await;

tokio::time::sleep(Duration::from_millis(200)).await;

reqwest::get(ctx.url("/api/foo"))
.await
.expect("http request failed");

let reqs1 = m1.collect_mock_http();
assert_eq!(reqs1.len(), 1);
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_prioritize_same_region_ws(ctx: &mut TestRunner) {
ctx.set_region("foo").await;

let mut m1 = MonolithBuilder::new().region("foo").build(ctx).await;
let mut m2 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m3 = MonolithBuilder::new().region("bar").build(ctx).await;
let mut m4 = MonolithBuilder::new().region("bar").build(ctx).await;

m1.show().await;
m2.show().await;
m3.show().await;
m4.show().await;

tokio::time::sleep(Duration::from_millis(200)).await;

let mut c1 = Client::new(ctx).unwrap();
c1.join("foo").await;

tokio::time::timeout(Duration::from_millis(100), m1.wait_recv())
.await
.expect("timeout waiting for join message");

let recvd = m1.collect_recv();
assert_eq!(recvd.len(), 1);
assert!(matches!(recvd[0], MsgB2M::Join(_)));
}
46 changes: 39 additions & 7 deletions crates/harness/src/test_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{collections::HashMap, net::SocketAddr, sync::Arc};

use tokio::process::{Child, Command};

Expand All @@ -23,6 +23,25 @@ impl TestRunner {
self.spawn_options.port
}

/// The region that the balancer is configured to use. If `None`, the balancer will use the default region.
pub fn region(&self) -> Option<&str> {
self.spawn_options.region.as_deref()
}

/// Set the region that the balancer should use.
/// **This will also restart the balancer.**
pub async fn set_region(&mut self, region: impl AsRef<str>) {
self.spawn_options.region = Some(region.as_ref().to_owned());
self.restart_balancer().await;
}

/// Clear the region that the balancer should use.
/// **This will also restart the balancer.**
pub async fn clear_region(&mut self) {
self.spawn_options.region = None;
self.restart_balancer().await;
}

/// Kill the balancer and start a new one with the same configuration.
pub async fn restart_balancer(&mut self) {
println!("restarting balancer");
Expand All @@ -35,6 +54,17 @@ impl TestRunner {

/// Spawn a new balancer and wait for it to be ready.
async fn spawn_balancer(opts: &BalancerSpawnOptions) -> Child {
let mut envs: HashMap<_, _> = HashMap::from_iter([
("BALANCER_PORT", format!("{}", opts.port)),
(
"BALANCER_DISCOVERY",
format!("{{method=\"harness\", port={}}}", opts.harness_port),
),
]);
if let Some(region) = &opts.region {
envs.insert("BALANCER_REGION", region.clone());
}

let child = Command::new("cargo")
.args([
"run",
Expand All @@ -44,11 +74,7 @@ impl TestRunner {
"--log-level",
"debug",
])
.env("BALANCER_PORT", format!("{}", opts.port))
.env(
"BALANCER_DISCOVERY",
format!("{{method=\"harness\", port={}}}", opts.harness_port),
)
.envs(envs)
.spawn()
.expect("Failed to start balancer");

Expand Down Expand Up @@ -100,7 +126,11 @@ impl AsyncTestContext for TestRunner {
}
}

let opts = BalancerSpawnOptions { port, harness_port };
let opts = BalancerSpawnOptions {
port,
harness_port,
region: None,
};

let child = Self::spawn_balancer(&opts).await;

Expand All @@ -123,9 +153,11 @@ impl AsyncTestContext for TestRunner {
}
}

#[derive(Debug, Clone)]
struct BalancerSpawnOptions {
pub port: u16,
pub harness_port: u16,
pub region: Option<String>,
}

#[cfg(test)]
Expand Down
40 changes: 35 additions & 5 deletions crates/ott-balancer-bin/src/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, instrument, trace, warn};

use crate::config::BalancerConfig;
use crate::monolith::Room;
use crate::room::RoomLocator;
use crate::{
Expand Down Expand Up @@ -413,17 +414,42 @@ impl BalancerContext {

/// When loading a room, call this to select the best monolith to load it on.
pub fn select_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
let selected = self
.monoliths
.values()
.min_by(|x, y| x.rooms().len().cmp(&y.rooms().len()));
fn cmp(x: &BalancerMonolith, y: &BalancerMonolith) -> std::cmp::Ordering {
x.rooms().len().cmp(&y.rooms().len())
}

let in_region = self
.monoliths_by_region
.get(BalancerConfig::get().region.as_str());
if let Some(in_region) = in_region {
let selected = in_region
.iter()
.flat_map(|id| self.monoliths.get(id))
.min_by(|x, y| cmp(x, y));
if let Some(s) = selected {
return Ok(s);
}
}
let selected = self.monoliths.values().min_by(|x, y| cmp(x, y));
match selected {
Some(s) => Ok(s),
None => anyhow::bail!("no monoliths available"),
}
}

pub fn random_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
let in_region = self
.monoliths_by_region
.get(BalancerConfig::get().region.as_str());
if let Some(in_region) = in_region {
let selected = in_region.iter().choose(&mut rand::thread_rng());
if let Some(s) = selected {
if let Some(m) = self.monoliths.get(s) {
return Ok(m);
}
}
}

let selected = self
.monoliths
.values()
Expand Down Expand Up @@ -468,7 +494,11 @@ pub async fn join_client(
None => {
// the room is not loaded, randomly select a monolith
let selected = ctx_write.select_monolith()?;
debug!("room is not loaded, selected monolith: {:?}", selected.id());
debug!(
"room is not loaded, selected monolith: {:?} (region: {:?})",
selected.id(),
selected.region()
);
(selected.id(), true)
}
};
Expand Down
1 change: 1 addition & 0 deletions crates/ott-balancer-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn main() -> anyhow::Result<()> {
.with(filter_layer)
.with(fmt_layer)
.init();
info!("Loaded config: {:?}", config);

let (discovery_tx, discovery_rx) = tokio::sync::mpsc::channel(2);

Expand Down

0 comments on commit 7431ab3

Please sign in to comment.