Skip to content

Commit deafef0

Browse files
committed
balancer: unload duplicates tests
1 parent 2038690 commit deafef0

File tree

2 files changed

+285
-1
lines changed

2 files changed

+285
-1
lines changed

crates/ott-balancer/src/balancer.rs

+268
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ pub async fn dispatch_monolith_message(
723723
mod test {
724724
use ott_common::discovery::{ConnectionConfig, HostOrIp};
725725
use std::net::Ipv4Addr;
726+
use tokio::sync::mpsc::error::TryRecvError;
726727

727728
use super::*;
728729

@@ -858,4 +859,271 @@ mod test {
858859
.contains(&client_id));
859860
assert_eq!(ctx.monoliths.get(&monolith_id).unwrap().rooms().len(), 1);
860861
}
862+
863+
#[tokio::test]
864+
async fn should_unload_duplicate_rooms_where_2nd_load_invalid() {
865+
// a bunch of setup
866+
let mut ctx = BalancerContext::new();
867+
let (monolith_outbound_tx_1, _monolith_outbound_rx_1) = tokio::sync::mpsc::channel(100);
868+
let monolith_outbound_tx_1 = Arc::new(monolith_outbound_tx_1);
869+
let (client_inbound_tx_1, _client_inbound_rx_1) = tokio::sync::mpsc::channel(100);
870+
let m1_id = uuid::Uuid::new_v4().into();
871+
let m1 = BalancerMonolith::new(
872+
NewMonolith {
873+
id: m1_id,
874+
region: "unknown".into(),
875+
config: ConnectionConfig {
876+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
877+
port: 3002,
878+
},
879+
proxy_port: 3000,
880+
},
881+
monolith_outbound_tx_1,
882+
client_inbound_tx_1,
883+
);
884+
let (monolith_outbound_tx_2, mut monolith_outbound_rx_2) = tokio::sync::mpsc::channel(100);
885+
let monolith_outbound_tx_2 = Arc::new(monolith_outbound_tx_2);
886+
let (client_inbound_tx_2, _client_inbound_rx_2) = tokio::sync::mpsc::channel(100);
887+
let m2_id = uuid::Uuid::new_v4().into();
888+
let m2 = BalancerMonolith::new(
889+
NewMonolith {
890+
id: m2_id,
891+
region: "unknown".into(),
892+
config: ConnectionConfig {
893+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
894+
port: 3004,
895+
},
896+
proxy_port: 3000,
897+
},
898+
monolith_outbound_tx_2,
899+
client_inbound_tx_2,
900+
);
901+
ctx.add_monolith(m1);
902+
ctx.add_monolith(m2);
903+
904+
let room_name = RoomName::from("foo");
905+
ctx.add_or_sync_room(RoomMetadata::default_with_name(room_name.clone()), m1_id, 8)
906+
.await
907+
.expect("failed to add room to m1");
908+
909+
// drain the channel
910+
while match monolith_outbound_rx_2.try_recv() {
911+
Ok(_) => true,
912+
Err(TryRecvError::Empty) => false,
913+
Err(TryRecvError::Disconnected) => false,
914+
} {}
915+
916+
ctx.add_or_sync_room(
917+
RoomMetadata::default_with_name(room_name.clone()),
918+
m2_id,
919+
10,
920+
)
921+
.await
922+
.expect_err("should not be able to add room to m2 because it's already loaded on m1");
923+
924+
assert_eq!(
925+
ctx.rooms_to_monoliths.get(&room_name),
926+
Some(&RoomLocator::new(m1_id, 8))
927+
);
928+
let m1 = ctx.monoliths.get(&m1_id).unwrap();
929+
assert!(m1.has_room(&room_name));
930+
let m2 = ctx.monoliths.get(&m2_id).unwrap();
931+
assert!(!m2.has_room(&room_name));
932+
933+
let msg = monolith_outbound_rx_2.try_recv();
934+
assert!(matches!(msg, Ok(SocketMessage::Message(Message::Text(_)))));
935+
match msg {
936+
Ok(SocketMessage::Message(Message::Text(text))) => {
937+
let msg: MsgB2M =
938+
serde_json::from_str(&text).expect("failed to deserialize message");
939+
assert!(matches!(msg, MsgB2M::Unload(_)));
940+
}
941+
_ => unreachable!(),
942+
}
943+
}
944+
945+
#[tokio::test]
946+
async fn should_unload_duplicate_rooms_where_2nd_load_overrides() {
947+
// a bunch of setup
948+
let mut ctx = BalancerContext::new();
949+
let (monolith_outbound_tx_1, mut monolith_outbound_rx_1) = tokio::sync::mpsc::channel(100);
950+
let monolith_outbound_tx_1 = Arc::new(monolith_outbound_tx_1);
951+
let (client_inbound_tx_1, _client_inbound_rx_1) = tokio::sync::mpsc::channel(100);
952+
let m1_id = uuid::Uuid::new_v4().into();
953+
let m1 = BalancerMonolith::new(
954+
NewMonolith {
955+
id: m1_id,
956+
region: "unknown".into(),
957+
config: ConnectionConfig {
958+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
959+
port: 3002,
960+
},
961+
proxy_port: 3000,
962+
},
963+
monolith_outbound_tx_1,
964+
client_inbound_tx_1,
965+
);
966+
let (monolith_outbound_tx_2, _monolith_outbound_rx_2) = tokio::sync::mpsc::channel(100);
967+
let monolith_outbound_tx_2 = Arc::new(monolith_outbound_tx_2);
968+
let (client_inbound_tx_2, _client_inbound_rx_2) = tokio::sync::mpsc::channel(100);
969+
let m2_id = uuid::Uuid::new_v4().into();
970+
let m2 = BalancerMonolith::new(
971+
NewMonolith {
972+
id: m2_id,
973+
region: "unknown".into(),
974+
config: ConnectionConfig {
975+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
976+
port: 3004,
977+
},
978+
proxy_port: 3000,
979+
},
980+
monolith_outbound_tx_2,
981+
client_inbound_tx_2,
982+
);
983+
ctx.add_monolith(m1);
984+
ctx.add_monolith(m2);
985+
986+
let room_name = RoomName::from("foo");
987+
ctx.add_or_sync_room(RoomMetadata::default_with_name(room_name.clone()), m1_id, 8)
988+
.await
989+
.expect("failed to add room to m1");
990+
991+
// drain the channel
992+
while match monolith_outbound_rx_1.try_recv() {
993+
Ok(_) => true,
994+
Err(TryRecvError::Empty) => false,
995+
Err(TryRecvError::Disconnected) => false,
996+
} {}
997+
998+
ctx.add_or_sync_room(RoomMetadata::default_with_name(room_name.clone()), m2_id, 6)
999+
.await
1000+
.expect("should be able to add room to m2 because it's an older version");
1001+
1002+
assert_eq!(
1003+
ctx.rooms_to_monoliths.get(&room_name),
1004+
Some(&RoomLocator::new(m2_id, 6))
1005+
);
1006+
let m1 = ctx.monoliths.get(&m1_id).unwrap();
1007+
assert!(!m1.has_room(&room_name));
1008+
let m2 = ctx.monoliths.get(&m2_id).unwrap();
1009+
assert!(m2.has_room(&room_name));
1010+
1011+
let msg = monolith_outbound_rx_1.try_recv();
1012+
assert!(matches!(msg, Ok(SocketMessage::Message(Message::Text(_)))));
1013+
match msg {
1014+
Ok(SocketMessage::Message(Message::Text(text))) => {
1015+
let msg: MsgB2M =
1016+
serde_json::from_str(&text).expect("failed to deserialize message");
1017+
assert!(matches!(msg, MsgB2M::Unload(_)));
1018+
}
1019+
_ => unreachable!(),
1020+
}
1021+
}
1022+
1023+
#[tokio::test]
1024+
async fn should_unload_duplicate_rooms_where_2nd_load_overrides_should_disconnect_clients() {
1025+
// a bunch of setup
1026+
let ctx = Arc::new(RwLock::new(BalancerContext::new()));
1027+
let (monolith_outbound_tx_1, _monolith_outbound_rx_1) = tokio::sync::mpsc::channel(100);
1028+
let monolith_outbound_tx_1 = Arc::new(monolith_outbound_tx_1);
1029+
let (client_inbound_tx_1, _client_inbound_rx_1) = tokio::sync::mpsc::channel(100);
1030+
let m1_id = uuid::Uuid::new_v4().into();
1031+
let m1 = BalancerMonolith::new(
1032+
NewMonolith {
1033+
id: m1_id,
1034+
region: "unknown".into(),
1035+
config: ConnectionConfig {
1036+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
1037+
port: 3002,
1038+
},
1039+
proxy_port: 3000,
1040+
},
1041+
monolith_outbound_tx_1,
1042+
client_inbound_tx_1,
1043+
);
1044+
let (monolith_outbound_tx_2, _monolith_outbound_rx_2) = tokio::sync::mpsc::channel(100);
1045+
let monolith_outbound_tx_2 = Arc::new(monolith_outbound_tx_2);
1046+
let (client_inbound_tx_2, _client_inbound_rx_2) = tokio::sync::mpsc::channel(100);
1047+
let m2_id = uuid::Uuid::new_v4().into();
1048+
let m2 = BalancerMonolith::new(
1049+
NewMonolith {
1050+
id: m2_id,
1051+
region: "unknown".into(),
1052+
config: ConnectionConfig {
1053+
host: HostOrIp::Ip(Ipv4Addr::LOCALHOST.into()),
1054+
port: 3004,
1055+
},
1056+
proxy_port: 3000,
1057+
},
1058+
monolith_outbound_tx_2,
1059+
client_inbound_tx_2,
1060+
);
1061+
ctx.write().await.add_monolith(m1);
1062+
ctx.write().await.add_monolith(m2);
1063+
1064+
let room_name = RoomName::from("foo");
1065+
ctx.write()
1066+
.await
1067+
.add_or_sync_room(RoomMetadata::default_with_name(room_name.clone()), m1_id, 8)
1068+
.await
1069+
.expect("failed to add room to m1");
1070+
1071+
// add client to room
1072+
let client_id = uuid::Uuid::new_v4().into();
1073+
let (client_link_tx, client_link_rx) = tokio::sync::oneshot::channel();
1074+
join_client(
1075+
&ctx,
1076+
NewClient {
1077+
id: client_id,
1078+
room: room_name.clone(),
1079+
token: "test".into(),
1080+
},
1081+
client_link_tx,
1082+
)
1083+
.await
1084+
.expect("failed to add client");
1085+
let mut client_link = client_link_rx.await.expect("failed to get client link");
1086+
let _client = ctx
1087+
.read()
1088+
.await
1089+
.clients
1090+
.get(&client_id)
1091+
.expect("client not found");
1092+
1093+
ctx.write()
1094+
.await
1095+
.add_or_sync_room(RoomMetadata::default_with_name(room_name.clone()), m2_id, 6)
1096+
.await
1097+
.expect("should be able to add room to m2 because it's an older version");
1098+
1099+
while client_link.outbound_try_recv().is_ok() {}
1100+
let msg = client_link
1101+
.outbound_try_recv()
1102+
.expect_err("channel should be closed");
1103+
assert!(matches!(
1104+
msg,
1105+
tokio::sync::broadcast::error::TryRecvError::Closed
1106+
));
1107+
1108+
// make sure new clients get routed to the new room
1109+
let (client_link_tx, client_link_rx) = tokio::sync::oneshot::channel();
1110+
join_client(
1111+
&ctx,
1112+
NewClient {
1113+
id: client_id,
1114+
room: room_name.clone(),
1115+
token: "test".into(),
1116+
},
1117+
client_link_tx,
1118+
)
1119+
.await
1120+
.expect("failed to add client");
1121+
let _client_link = client_link_rx.await.expect("failed to get client link");
1122+
let ctx_read = ctx.read().await;
1123+
let _client = ctx_read.clients.get(&client_id).expect("client not found");
1124+
1125+
let m2 = ctx_read.monoliths.get(&m2_id).expect("monolith not found");
1126+
let room = m2.rooms().get(&room_name).expect("room not found on m2");
1127+
assert!(room.clients().contains(&client_id));
1128+
}
8611129
}

crates/ott-balancer/src/client.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::time::Duration;
22

33
use futures_util::{SinkExt, StreamExt};
4-
use tokio::sync::broadcast::error::RecvError;
4+
use tokio::sync::broadcast::error::{RecvError, TryRecvError};
55
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
66
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
77
use tokio_tungstenite::tungstenite::Message;
@@ -82,6 +82,22 @@ impl ClientLink {
8282
Ok(msg)
8383
}
8484

85+
pub fn outbound_try_recv(&mut self) -> Result<SocketMessage, TryRecvError> {
86+
if self.room_tx.is_closed() {
87+
return Err(TryRecvError::Closed);
88+
}
89+
90+
match self.unicast_rx.try_recv() {
91+
Ok(msg) => return Ok(msg),
92+
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
93+
return Err(TryRecvError::Closed)
94+
}
95+
Err(_) => {}
96+
}
97+
98+
self.broadcast_rx.try_recv()
99+
}
100+
85101
/// Send a message to the Room this client is in via the Balancer
86102
pub async fn inbound_send(&mut self, msg: impl Into<SocketMessage>) -> anyhow::Result<()> {
87103
self.room_tx.send(Context::new(self.id, msg.into())).await?;

0 commit comments

Comments
 (0)