Skip to content

Commit

Permalink
collector: make it actually collect state from discovered balancers (#…
Browse files Browse the repository at this point in the history
…1418)

* collector: make it actually collect state from discovered balancers

* finish implementing collecting state

* fix lints
  • Loading branch information
dyc3 authored Feb 29, 2024
1 parent e8dbb2e commit e2c84ca
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 142 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/ott-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ edition = "2021"
[dependencies]
anyhow.workspace = true
ott-common.workspace = true
ott-balancer-protocol.workspace = true
once_cell.workspace = true
reqwest.workspace = true
rocket.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
uuid.workspace = true
89 changes: 89 additions & 0 deletions crates/ott-collector/src/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::Arc;

use once_cell::sync::Lazy;
use ott_balancer_protocol::collector::BalancerState;
use ott_common::discovery::{MonolithConnectionConfig, MonolithDiscoveryMsg};
use tokio::sync::Mutex;
use tracing::{error, warn};

use crate::SystemState;

pub static CURRENT_STATE: Lazy<Arc<Mutex<SystemState>>> =
Lazy::new(|| Arc::new(Mutex::new(SystemState(vec![]))));

pub struct Collector {
discovery_rx: tokio::sync::mpsc::Receiver<MonolithDiscoveryMsg>,
interval: tokio::time::Duration,
balancers: Vec<MonolithConnectionConfig>,
}

impl Collector {
pub fn new(
discovery_rx: tokio::sync::mpsc::Receiver<MonolithDiscoveryMsg>,
interval: tokio::time::Duration,
) -> Self {
Self {
discovery_rx,
interval,
balancers: Default::default(),
}
}

#[must_use]
pub fn spawn(mut self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
self.run().await;
warn!("Collector task ended");
})
}

pub async fn run(&mut self) {
loop {
tokio::select! {
_ = tokio::time::sleep(self.interval) => {
let new_state = match self.collect().await {
Ok(new_state) => new_state,
Err(err) => {
error!("Unexpected error collecting system state: {}", err);
continue;
}
};
let mut current = CURRENT_STATE.lock().await;
*current = new_state;
}
Some(msg) = self.discovery_rx.recv() => {
self.handle_discovery(msg);
}
else => {
break;
}
}
}
}

pub fn handle_discovery(&mut self, msg: MonolithDiscoveryMsg) {
self.balancers.retain(|conf| !msg.removed.contains(conf));
self.balancers.extend(msg.added);
}

pub async fn collect(&self) -> anyhow::Result<SystemState> {
let client = reqwest::Client::new();
let mut states = vec![];
for conf in &self.balancers {
let mut url = conf.uri();
url.set_path("/api/state");
url.set_scheme("http").expect("scheme should be valid");

let resp = client.get(url).send().await?;
if !resp.status().is_success() {
error!("Failed to fetch state from {:?}", &conf);
continue;
}
let state = resp.json::<BalancerState>().await?;
states.push(state);
}
info!("Collected state from {} balancers", states.len());

Ok(SystemState(states))
}
}
161 changes: 19 additions & 142 deletions crates/ott-collector/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,168 +1,45 @@
use std::sync::Arc;

use collector::{Collector, CURRENT_STATE};
use ott_balancer_protocol::collector::BalancerState;
use ott_common::discovery::{start_discovery_task, DnsDiscoveryConfig, DnsMonolithDiscoverer};
use rocket::serde::json::Json;
use rocket::{serde::json::Json, State};
use serde::Serialize;
use tokio::sync::Mutex;

#[macro_use]
extern crate rocket;

mod collector;
mod cors;

#[derive(Debug, Clone, Serialize)]
struct SystemState(Vec<Balancer>);

#[derive(Debug, Clone, Serialize)]
struct Balancer {
id: String,
region: String,
monoliths: Vec<Monolith>,
}

#[derive(Debug, Clone, Serialize)]
struct Monolith {
id: String,
region: String,
rooms: Vec<Room>,
}

#[derive(Debug, Clone, Serialize)]
struct Room {
name: String,
clients: i32,
}

fn return_sample_state() -> SystemState {
SystemState({
vec![
Balancer {
id: "154d9d41-128c-45ab-83d8-28661882c9e3".to_string(),
region: "ewr".to_string(),
monoliths: vec![
Monolith {
id: "2bd5e4a7-14f6-4da4-bedd-72946864a7bf".to_string(),
region: "ewr".to_string(),
rooms: vec![
Room {
name: "foo".to_string(),
clients: 2,
},
Room {
name: "bar".to_string(),
clients: 0,
},
],
},
Monolith {
id: "419580cb-f576-4314-8162-45340c94bae1".to_string(),
region: "ewr".to_string(),
rooms: vec![Room {
name: "baz".to_string(),
clients: 3,
}],
},
Monolith {
id: "0c85b46e-d343-46a3-ae4f-5f2aa1a8bdac".to_string(),
region: "cdg".to_string(),
rooms: vec![Room {
name: "qux".to_string(),
clients: 0,
}],
},
],
},
Balancer {
id: "c91d183c-980e-4160-b196-43658148f469".to_string(),
region: "ewr".to_string(),
monoliths: vec![
Monolith {
id: "2bd5e4a7-14f6-4da4-bedd-72946864a7bf".to_string(),
region: "ewr".to_string(),
rooms: vec![
Room {
name: "foo".to_string(),
clients: 1,
},
Room {
name: "bar".to_string(),
clients: 2,
},
],
},
Monolith {
id: "419580cb-f576-4314-8162-45340c94bae1".to_string(),
region: "ewr".to_string(),
rooms: vec![Room {
name: "baz".to_string(),
clients: 0,
}],
},
Monolith {
id: "0c85b46e-d343-46a3-ae4f-5f2aa1a8bdac".to_string(),
region: "cdg".to_string(),
rooms: vec![Room {
name: "qux".to_string(),
clients: 0,
}],
},
],
},
Balancer {
id: "5a2e3b2d-f27b-4e3d-9b59-c921442f7ff0".to_string(),
region: "cdg".to_string(),
monoliths: vec![
Monolith {
id: "2bd5e4a7-14f6-4da4-bedd-72946864a7bf".to_string(),
region: "ewr".to_string(),
rooms: vec![
Room {
name: "foo".to_string(),
clients: 0,
},
Room {
name: "bar".to_string(),
clients: 0,
},
],
},
Monolith {
id: "419580cb-f576-4314-8162-45340c94bae1".to_string(),
region: "ewr".to_string(),
rooms: vec![Room {
name: "baz".to_string(),
clients: 0,
}],
},
Monolith {
id: "0c85b46e-d343-46a3-ae4f-5f2aa1a8bdac".to_string(),
region: "cdg".to_string(),
rooms: vec![Room {
name: "qux".to_string(),
clients: 4,
}],
},
],
},
]
})
}
pub struct SystemState(Vec<BalancerState>);

/// Serve the current system state
#[get("/state")]
fn serve_state() -> Json<SystemState> {
Json(return_sample_state())
async fn serve_state(state: &State<Arc<Mutex<SystemState>>>) -> Json<SystemState> {
let s = state.lock().await.clone();
Json(s)
}

#[rocket::main]
async fn main() -> Result<(), rocket::Error> {
let (discovery_tx, _discovery_rx) = tokio::sync::mpsc::channel(2);
async fn main() -> anyhow::Result<()> {
let (discovery_tx, discovery_rx) = tokio::sync::mpsc::channel(2);
let discovery = DnsMonolithDiscoverer::new(DnsDiscoveryConfig {
monolith_port: 8081,
dns_server: None,
query: "balancer".to_string(),
});
start_discovery_task(discovery, discovery_tx);
let _rocket = rocket::build()

let _collector_handle =
Collector::new(discovery_rx, tokio::time::Duration::from_secs(5)).spawn();

rocket::build()
.attach(cors::Cors)
.mount("/", routes![status, cors::handle_preflight, serve_state])
.manage(CURRENT_STATE.clone())
.launch()
.await?;

Expand Down

0 comments on commit e2c84ca

Please sign in to comment.