Skip to content

Commit 2cec601

Browse files
abstract balancing logic (#1243)
* add new trait & struct * minor changes to struct & trait * more changes to trait * attempt to add type that implements trait to BalancerContext * add traits to MonolithSelection * manually implement default for BalancerContext * fixed errors and changed function logic * moved trait to a new module * moved impl to new mod and fixed imports * created filter_monoliths * Move filtering logic into dedicated function * implemented new functions * implemented new methods properly * added snippet for unit test * Made some changes to unit test * changed unit test
1 parent abd5d09 commit 2cec601

File tree

3 files changed

+149
-36
lines changed

3 files changed

+149
-36
lines changed

crates/ott-balancer/src/balancer.rs

+28-36
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use ott_balancer_protocol::monolith::{
44
B2MClientMsg, B2MJoin, B2MLeave, B2MUnload, MsgB2M, MsgM2B, RoomMetadata,
55
};
66
use ott_balancer_protocol::*;
7-
use rand::seq::IteratorRandom;
87
use serde_json::value::RawValue;
98
use tokio::sync::RwLock;
109
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
@@ -16,6 +15,7 @@ use crate::client::ClientLink;
1615
use crate::config::BalancerConfig;
1716
use crate::monolith::Room;
1817
use crate::room::RoomLocator;
18+
use crate::selection::{MinRoomsSelector, MonolithSelection};
1919
use crate::{
2020
client::{BalancerClient, NewClient},
2121
messages::*,
@@ -171,12 +171,25 @@ impl BalancerLink {
171171
}
172172
}
173173

174-
#[derive(Debug, Default)]
174+
#[derive(Debug)]
175175
pub struct BalancerContext {
176176
pub clients: HashMap<ClientId, BalancerClient>,
177177
pub monoliths: HashMap<MonolithId, BalancerMonolith>,
178178
pub rooms_to_monoliths: HashMap<RoomName, RoomLocator>,
179179
pub monoliths_by_region: HashMap<String, Vec<MonolithId>>,
180+
pub monolith_selection: Box<dyn MonolithSelection + Send + Sync + 'static>,
181+
}
182+
183+
impl Default for BalancerContext {
184+
fn default() -> Self {
185+
BalancerContext {
186+
clients: HashMap::default(),
187+
monoliths: HashMap::default(),
188+
rooms_to_monoliths: HashMap::default(),
189+
monoliths_by_region: HashMap::default(),
190+
monolith_selection: Box::<MinRoomsSelector>::default(),
191+
}
192+
}
180193
}
181194

182195
impl BalancerContext {
@@ -356,50 +369,29 @@ impl BalancerContext {
356369
Ok(monolith)
357370
}
358371

359-
/// When loading a room, call this to select the best monolith to load it on.
360-
pub fn select_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
361-
fn cmp(x: &BalancerMonolith, y: &BalancerMonolith) -> std::cmp::Ordering {
362-
x.rooms().len().cmp(&y.rooms().len())
363-
}
364-
372+
/// Prioritizes monoliths in the same region
373+
pub fn filter_monoliths(&self) -> Vec<&BalancerMonolith> {
365374
let in_region = self
366375
.monoliths_by_region
367376
.get(BalancerConfig::get().region.as_str());
368377
if let Some(in_region) = in_region {
369-
let selected = in_region
378+
return in_region
370379
.iter()
371380
.flat_map(|id| self.monoliths.get(id))
372-
.min_by(|x, y| cmp(x, y));
373-
if let Some(s) = selected {
374-
return Ok(s);
375-
}
376-
}
377-
let selected = self.monoliths.values().min_by(|x, y| cmp(x, y));
378-
match selected {
379-
Some(s) => Ok(s),
380-
None => anyhow::bail!("no monoliths available"),
381+
.collect();
381382
}
383+
384+
self.monoliths.values().collect()
382385
}
383386

384-
pub fn random_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
385-
let in_region = self
386-
.monoliths_by_region
387-
.get(BalancerConfig::get().region.as_str());
388-
if let Some(in_region) = in_region {
389-
let selected = in_region.iter().choose(&mut rand::thread_rng());
390-
if let Some(s) = selected {
391-
if let Some(m) = self.monoliths.get(s) {
392-
return Ok(m);
393-
}
394-
}
395-
}
387+
pub fn select_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
388+
let filtered = self.filter_monoliths();
389+
return self.monolith_selection.select_monolith(filtered);
390+
}
396391

397-
let selected = self
398-
.monoliths
399-
.values()
400-
.choose(&mut rand::thread_rng())
401-
.ok_or(anyhow::anyhow!("no monoliths available"))?;
402-
Ok(selected)
392+
pub fn random_monolith(&self) -> anyhow::Result<&BalancerMonolith> {
393+
let filtered = self.filter_monoliths();
394+
return self.monolith_selection.random_monolith(filtered);
403395
}
404396

405397
#[instrument(skip(self, monolith), err, fields(monolith_id = %monolith))]

crates/ott-balancer/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod discovery;
2424
pub mod messages;
2525
pub mod monolith;
2626
pub mod room;
27+
pub mod selection;
2728
pub mod service;
2829

2930
pub async fn run() -> anyhow::Result<()> {

crates/ott-balancer/src/selection.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use rand::seq::IteratorRandom;
2+
3+
use crate::monolith::BalancerMonolith;
4+
5+
#[derive(Debug, Default)]
6+
pub struct MinRoomsSelector;
7+
pub trait MonolithSelection: std::fmt::Debug {
8+
fn select_monolith<'a>(
9+
&'a self,
10+
monolith: Vec<&'a BalancerMonolith>,
11+
) -> anyhow::Result<&BalancerMonolith>;
12+
13+
fn random_monolith<'a>(
14+
&'a self,
15+
monolith: Vec<&'a BalancerMonolith>,
16+
) -> anyhow::Result<&BalancerMonolith>;
17+
}
18+
19+
impl MonolithSelection for MinRoomsSelector {
20+
fn select_monolith<'a>(
21+
&'a self,
22+
monolith: Vec<&'a BalancerMonolith>,
23+
) -> anyhow::Result<&BalancerMonolith> {
24+
fn cmp(x: &BalancerMonolith, y: &BalancerMonolith) -> std::cmp::Ordering {
25+
x.rooms().len().cmp(&y.rooms().len())
26+
}
27+
28+
let selected = monolith.iter().min_by(|x, y| cmp(x, y));
29+
match selected {
30+
Some(s) => Ok(s),
31+
None => anyhow::bail!("no monoliths available"),
32+
}
33+
}
34+
35+
fn random_monolith<'a>(
36+
&'a self,
37+
monolith: Vec<&'a BalancerMonolith>,
38+
) -> anyhow::Result<&BalancerMonolith> {
39+
let selected = monolith
40+
.iter()
41+
.choose(&mut rand::thread_rng())
42+
.ok_or_else(|| anyhow::anyhow!("no monoliths available"))?;
43+
Ok(selected)
44+
}
45+
}
46+
47+
#[cfg(test)]
48+
mod test {
49+
use std::net::Ipv4Addr;
50+
use std::sync::Arc;
51+
52+
use crate::discovery::{HostOrIp, MonolithConnectionConfig};
53+
use crate::monolith::{BalancerMonolith, NewMonolith};
54+
use ott_balancer_protocol::*;
55+
56+
use super::{MinRoomsSelector, MonolithSelection};
57+
58+
#[tokio::test]
59+
async fn test_min_by() {
60+
let room_one = RoomName::from("room one");
61+
let room_two = RoomName::from("room two");
62+
let room_three = RoomName::from("room three");
63+
let (monolith_outbound_tx, _monolith_outbound_rx) = tokio::sync::mpsc::channel(100);
64+
let monolith_outbound_tx_one = Arc::new(monolith_outbound_tx);
65+
let (client_inbound_tx_one, _client_inbound_rx) = tokio::sync::mpsc::channel(100);
66+
let monolith_id_one = uuid::Uuid::new_v4().into();
67+
68+
let mut monolith_one = BalancerMonolith::new(
69+
NewMonolith {
70+
id: monolith_id_one,
71+
region: "unknown".into(),
72+
config: MonolithConnectionConfig {
73+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
74+
port: 3002,
75+
},
76+
proxy_port: 3000,
77+
},
78+
monolith_outbound_tx_one,
79+
client_inbound_tx_one,
80+
);
81+
82+
monolith_one
83+
.add_room(&room_one)
84+
.expect("failed to add room");
85+
monolith_one
86+
.add_room(&room_two)
87+
.expect("failed to add room");
88+
89+
let (monolith_outbound_tx, _monolith_outbound_rx) = tokio::sync::mpsc::channel(100);
90+
let monolith_outbound_tx_two = Arc::new(monolith_outbound_tx);
91+
let (client_inbound_tx_two, _client_inbound_rx) = tokio::sync::mpsc::channel(100);
92+
let monolith_id_two = uuid::Uuid::new_v4().into();
93+
94+
let mut monolith_two = BalancerMonolith::new(
95+
NewMonolith {
96+
id: monolith_id_two,
97+
region: "unknown".into(),
98+
config: MonolithConnectionConfig {
99+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
100+
port: 3002,
101+
},
102+
proxy_port: 3000,
103+
},
104+
monolith_outbound_tx_two,
105+
client_inbound_tx_two,
106+
);
107+
108+
monolith_two
109+
.add_room(&room_three)
110+
.expect("failed to add room");
111+
112+
let monoliths: Vec<&BalancerMonolith> = vec![&monolith_one, &monolith_two];
113+
114+
let selected = MinRoomsSelector
115+
.select_monolith(monoliths)
116+
.expect("failed to select monolith");
117+
118+
assert_eq!(selected.id(), monolith_two.id())
119+
}
120+
}

0 commit comments

Comments
 (0)