Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add region to emulated monolith #1149

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/harness-tests/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::time::Duration;

use harness::{Client, Monolith, TestRunner};
use harness::{Client, Monolith, MonolithBuilder, TestRunner};
use ott_balancer_protocol::monolith::{B2MUnload, MsgB2M};
use test_context::test_context;

Expand Down
42 changes: 40 additions & 2 deletions crates/harness/src/monolith.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct MonolithState {
rooms: HashMap<RoomName, RoomMetadata>,
room_load_epoch: Arc<AtomicU32>,
clients: HashSet<ClientId>,
region: String,
}

impl Monolith {
Expand All @@ -77,7 +78,6 @@ impl Monolith {
let notif_connect = Arc::new(Notify::new());
let notif_disconnect = Arc::new(Notify::new());
let notif_recv = Arc::new(Notify::new());

let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(50);

let state = Arc::new(Mutex::new(MonolithState {
Expand All @@ -100,7 +100,7 @@ impl Monolith {
let mut ws = tokio_tungstenite::accept_async(stream).await.unwrap();
let init = M2BInit {
port: http_port,
region: "unknown".into(),
region: state.lock().unwrap().region.clone(),
};
let msg = serde_json::to_string(&MsgM2B::Init(init)).unwrap();
ws.send(Message::Text(msg)).await.unwrap();
Expand Down Expand Up @@ -209,6 +209,10 @@ impl Monolith {
self.state.lock().unwrap().clients.clone()
}

pub fn region(&self) -> String {
self.state.lock().unwrap().region.clone()
}

/// Tell the provider to add this monolith to the list of available monoliths.
pub async fn show(&mut self) {
println!("showing monolith");
Expand Down Expand Up @@ -265,6 +269,10 @@ impl Monolith {
self.state.lock().unwrap().response_mocks = mocks;
}

pub fn set_region(&mut self, region: &str) {
self.state.lock().unwrap().region = region.to_string();
}

pub fn collect_mock_http(&self) -> Vec<MockRequest> {
self.state.lock().unwrap().received_http.clone()
}
Expand Down Expand Up @@ -416,11 +424,13 @@ pub struct MockRequest {
pub struct MonolithBuilder {
response_mocks: HashMap<String, (MockRespParts, Bytes)>,
behavior: Option<Box<dyn Behavior + Send + 'static>>,
region: String,
cjrkoa marked this conversation as resolved.
Show resolved Hide resolved
}

impl MonolithBuilder {
pub fn new() -> Self {
Self {
region: "unknown".to_string(),
..Default::default()
}
}
Expand All @@ -431,6 +441,7 @@ impl MonolithBuilder {
.await
.unwrap();
monolith.set_all_mock_http(self.response_mocks);
monolith.set_region("unknown");
cjrkoa marked this conversation as resolved.
Show resolved Hide resolved
monolith
}

Expand Down Expand Up @@ -460,4 +471,31 @@ impl MonolithBuilder {
self.behavior = Some(Box::new(behavior));
self
}

pub fn region(mut self, region: impl AsRef<str>) -> Self {
let region_str: String = region.as_ref().to_string();
self.region = region_str;
self
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::{Monolith, TestRunner};
use test_context::test_context;

#[test_context(TestRunner)]
#[tokio::test]
async fn default_region(ctx: &TestRunner) {
let mb: Monolith = MonolithBuilder::new().build(ctx).await.into();
assert_eq!(mb.region(), "unknown");
}

#[test_context(TestRunner)]
#[tokio::test]
async fn change_region(ctx: &TestRunner) {
let mb: Monolith = MonolithBuilder::new().region("foo").build(ctx).await.into();
assert_eq!(mb.region(), "foo");
}
}