Skip to content

Commit e8dbb2e

Browse files
moreno-michaeldyc3
andauthored
allow ott-collector to discover balancers (#1419)
* allow ott-collector to discover balancers * updated imports to reflect changes --------- Co-authored-by: Carson McManus <[email protected]>
1 parent c6ea6ea commit e8dbb2e

File tree

11 files changed

+115
-109
lines changed

11 files changed

+115
-109
lines changed

crates/ott-balancer/benches/joins.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use ott_balancer::{
88
balancer::{start_dispatcher, Balancer, BalancerContext, BalancerLink},
99
client::NewClient,
1010
config::BalancerConfig,
11-
discovery::{HostOrIp, MonolithConnectionConfig},
1211
monolith::NewMonolith,
1312
};
13+
use ott_common::discovery::{HostOrIp, MonolithConnectionConfig};
1414

1515
fn set_up_balancer() -> (BalancerLink, JoinHandle<()>) {
1616
let ctx = Arc::new(RwLock::new(BalancerContext::new()));

crates/ott-balancer/benches/latency.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use ott_balancer::{
1515
balancer::{start_dispatcher, Balancer, BalancerContext, BalancerLink},
1616
client::{ClientLink, NewClient},
1717
config::BalancerConfig,
18-
discovery::{HostOrIp, MonolithConnectionConfig},
1918
messages::SocketMessage,
2019
monolith::NewMonolith,
2120
};
21+
use ott_common::discovery::{HostOrIp, MonolithConnectionConfig};
2222

2323
async fn send_msg_monolith(
2424
link: &BalancerLink,

crates/ott-balancer/src/balancer.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -701,10 +701,9 @@ pub async fn dispatch_monolith_message(
701701

702702
#[cfg(test)]
703703
mod test {
704+
use ott_common::discovery::{HostOrIp, MonolithConnectionConfig};
704705
use std::net::Ipv4Addr;
705706

706-
use crate::discovery::{HostOrIp, MonolithConnectionConfig};
707-
708707
use super::*;
709708

710709
#[tokio::test]

crates/ott-balancer/src/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use clap::{Parser, ValueEnum};
44
use figment::providers::Format;
55
use serde::Deserialize;
66

7-
use crate::discovery::{
7+
use ott_common::discovery::{
88
DnsDiscoveryConfig, FlyDiscoveryConfig, HarnessDiscoveryConfig, ManualDiscoveryConfig,
99
};
1010

crates/ott-balancer/src/connection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::time::Duration;
55

66
use futures_util::{SinkExt, StreamExt};
77
use ott_balancer_protocol::monolith::MsgM2B;
8+
use ott_common::discovery::{MonolithConnectionConfig, MonolithDiscoveryMsg};
89
use tokio_tungstenite::{connect_async, WebSocketStream};
910
use tokio_util::sync::CancellationToken;
1011
use tracing::{debug, error, info, warn};
@@ -13,7 +14,6 @@ use tungstenite::protocol::CloseFrame;
1314
use tungstenite::Message;
1415

1516
use crate::balancer::BalancerLink;
16-
use crate::discovery::{MonolithConnectionConfig, MonolithDiscoveryMsg};
1717
use crate::messages::SocketMessage;
1818
use crate::monolith::NewMonolith;
1919

crates/ott-balancer/src/discovery.rs

-89
This file was deleted.

crates/ott-balancer/src/lib.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ use tracing_subscriber::util::SubscriberInitExt;
1313
use tracing_subscriber::EnvFilter;
1414

1515
use crate::config::{BalancerConfig, DiscoveryConfig};
16-
use crate::discovery::start_discovery_task;
1716
use crate::service::BalancerService;
18-
17+
use ott_common::discovery::{
18+
start_discovery_task, DnsMonolithDiscoverer, FlyMonolithDiscoverer, HarnessMonolithDiscoverer,
19+
ManualMonolithDiscoverer,
20+
};
1921
pub mod balancer;
2022
pub mod client;
2123
pub mod config;
2224
pub mod connection;
23-
pub mod discovery;
2425
pub mod messages;
2526
pub mod monolith;
2627
pub mod room;
@@ -81,19 +82,19 @@ pub async fn run() -> anyhow::Result<()> {
8182
info!("Starting monolith discovery");
8283
let _discovery_handle = match &config.discovery {
8384
DiscoveryConfig::Dns(config) => {
84-
let discovery = discovery::DnsMonolithDiscoverer::new(config.clone());
85+
let discovery = DnsMonolithDiscoverer::new(config.clone());
8586
start_discovery_task(discovery, discovery_tx)
8687
}
8788
DiscoveryConfig::Fly(config) => {
88-
let discovery = discovery::FlyMonolithDiscoverer::new(config.clone());
89+
let discovery = FlyMonolithDiscoverer::new(config.clone());
8990
start_discovery_task(discovery, discovery_tx)
9091
}
9192
DiscoveryConfig::Manual(config) => {
92-
let discovery = discovery::ManualMonolithDiscoverer::new(config.clone());
93+
let discovery = ManualMonolithDiscoverer::new(config.clone());
9394
start_discovery_task(discovery, discovery_tx)
9495
}
9596
DiscoveryConfig::Harness(config) => {
96-
let discovery = discovery::HarnessMonolithDiscoverer::new(config.clone());
97+
let discovery = HarnessMonolithDiscoverer::new(config.clone());
9798
start_discovery_task(discovery, discovery_tx)
9899
}
99100
};

crates/ott-balancer/src/monolith.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use std::sync::Arc;
44
use anyhow::bail;
55
use ott_balancer_protocol::monolith::*;
66
use ott_balancer_protocol::*;
7+
use ott_common::discovery::MonolithConnectionConfig;
78
use tokio_tungstenite::tungstenite::Message;
89
use tracing::error;
910

10-
use crate::discovery::MonolithConnectionConfig;
1111
use crate::messages::*;
1212

1313
/// A Monolith refers to the NodeJS server that manages rooms and performs all business logic.

crates/ott-balancer/src/selection.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ mod test {
4949
use std::net::Ipv4Addr;
5050
use std::sync::Arc;
5151

52-
use crate::discovery::{HostOrIp, MonolithConnectionConfig};
5352
use crate::monolith::{BalancerMonolith, NewMonolith};
5453
use ott_balancer_protocol::*;
54+
use ott_common::discovery::{HostOrIp, MonolithConnectionConfig};
5555

5656
use super::{MinRoomsSelector, MonolithSelection};
5757

crates/ott-collector/src/main.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use ott_common::discovery::{start_discovery_task, DnsDiscoveryConfig, DnsMonolithDiscoverer};
12
use rocket::serde::json::Json;
23
use serde::Serialize;
34

@@ -150,13 +151,22 @@ fn serve_state() -> Json<SystemState> {
150151
Json(return_sample_state())
151152
}
152153

153-
#[launch]
154-
fn rocket() -> _ {
155-
// TODO: spawn discovery tokio task here
156-
157-
rocket::build()
154+
#[rocket::main]
155+
async fn main() -> Result<(), rocket::Error> {
156+
let (discovery_tx, _discovery_rx) = tokio::sync::mpsc::channel(2);
157+
let discovery = DnsMonolithDiscoverer::new(DnsDiscoveryConfig {
158+
monolith_port: 8081,
159+
dns_server: None,
160+
query: "balancer".to_string(),
161+
});
162+
start_discovery_task(discovery, discovery_tx);
163+
let _rocket = rocket::build()
158164
.attach(cors::Cors)
159165
.mount("/", routes![status, cors::handle_preflight, serve_state])
166+
.launch()
167+
.await?;
168+
169+
Ok(())
160170
}
161171

162172
#[get("/status")]

crates/ott-common/src/discovery.rs

+85
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
//! Handles discovery of Monoliths.
22
3+
use std::collections::HashSet;
34
use std::net::IpAddr;
45
use std::net::SocketAddr;
56
use std::time::Duration;
7+
use tracing::{debug, error, warn};
68

79
mod dns;
810
mod fly;
@@ -82,3 +84,86 @@ pub enum DiscoveryMode {
8284
Polling(Duration),
8385
Continuous,
8486
}
87+
pub struct DiscoveryTask {
88+
discovery: Box<dyn MonolithDiscoverer + Send + Sync>,
89+
90+
monoliths: HashSet<MonolithConnectionConfig>,
91+
discovery_tx: tokio::sync::mpsc::Sender<MonolithDiscoveryMsg>,
92+
}
93+
94+
impl DiscoveryTask {
95+
pub fn new(
96+
discovery: impl MonolithDiscoverer + Send + Sync + 'static,
97+
discovery_tx: tokio::sync::mpsc::Sender<MonolithDiscoveryMsg>,
98+
) -> Self {
99+
Self {
100+
discovery: Box::new(discovery),
101+
monoliths: Default::default(),
102+
discovery_tx,
103+
}
104+
}
105+
106+
pub async fn do_continuous_discovery(&mut self) {
107+
loop {
108+
if let Err(e) = self.do_discovery().await {
109+
error!("Monolith Discovery failed: {:?}", e);
110+
tokio::time::sleep(Duration::from_secs(5)).await;
111+
}
112+
}
113+
}
114+
115+
async fn do_discovery(&mut self) -> anyhow::Result<()> {
116+
let monoliths = self.discovery.discover().await?;
117+
debug!("Discovered monoliths: {:?}", monoliths);
118+
let monoliths_new: HashSet<_> = monoliths.into_iter().collect();
119+
let msg = build_discovery_msg(&self.monoliths, &monoliths_new);
120+
// apply the changes to our state
121+
for m in &msg.removed {
122+
self.monoliths.remove(m);
123+
}
124+
self.monoliths.extend(monoliths_new);
125+
// send the message
126+
self.discovery_tx.send(msg).await?;
127+
128+
if self.monoliths.is_empty() {
129+
warn!("No monoliths discovered");
130+
}
131+
132+
if let DiscoveryMode::Polling(d) = self.discovery.mode() {
133+
tokio::time::sleep(d).await;
134+
}
135+
136+
Ok(())
137+
}
138+
}
139+
140+
fn build_discovery_msg(
141+
current: &HashSet<MonolithConnectionConfig>,
142+
new: &HashSet<MonolithConnectionConfig>,
143+
) -> MonolithDiscoveryMsg {
144+
let monoliths_added = new.difference(current).cloned().collect::<Vec<_>>();
145+
let monoliths_removed = current.difference(new).cloned().collect::<Vec<_>>();
146+
MonolithDiscoveryMsg {
147+
added: monoliths_added,
148+
removed: monoliths_removed,
149+
}
150+
}
151+
152+
pub fn start_discovery_task(
153+
discovery: impl MonolithDiscoverer + Send + Sync + 'static,
154+
discovery_tx: tokio::sync::mpsc::Sender<MonolithDiscoveryMsg>,
155+
) -> JoinHandle<()> {
156+
tokio::task::Builder::new()
157+
.name("discovery")
158+
.spawn(async move {
159+
let mut task = DiscoveryTask::new(discovery, discovery_tx);
160+
task.do_continuous_discovery().await;
161+
})
162+
.expect("failed to spawn discovery task")
163+
}
164+
165+
#[derive(Debug, Clone)]
166+
pub struct MonolithDiscoveryMsg {
167+
pub added: Vec<MonolithConnectionConfig>,
168+
pub removed: Vec<MonolithConnectionConfig>,
169+
}

0 commit comments

Comments
 (0)