Skip to content

Commit 066a0d9

Browse files
committed
collector: fix becoming unresponsive when scaling down balancers
1 parent f783f6f commit 066a0d9

File tree

1 file changed

+31
-17
lines changed

1 file changed

+31
-17
lines changed

crates/ott-collector/src/collector.rs

+31-17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, sync::Arc};
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use once_cell::sync::Lazy;
44
use ott_balancer_protocol::collector::BalancerState;
@@ -74,11 +74,17 @@ impl Collector {
7474
}
7575

7676
pub fn handle_discovery(&mut self, msg: ServiceDiscoveryMsg) {
77+
debug!(
78+
"Balancer discovery: {} added, {} removed",
79+
msg.added.len(),
80+
msg.removed.len()
81+
);
7782
self.balancers.retain(|conf| !msg.removed.contains(conf));
7883
self.balancers.extend(msg.added);
7984
}
8085

8186
pub async fn collect(&mut self) -> anyhow::Result<SystemState> {
87+
info!("Collecting system state");
8288
let client = reqwest::Client::new();
8389
let mut states = vec![];
8490
for conf in &self.balancers {
@@ -89,6 +95,7 @@ impl Collector {
8995
let resp = client
9096
.get(url)
9197
.header("Authorization", format!("Bearer {}", self.balancer_api_key))
98+
.timeout(Duration::from_secs(3))
9299
.send()
93100
.await?;
94101
if !resp.status().is_success() {
@@ -105,6 +112,7 @@ impl Collector {
105112
if self.stream_tasks.contains_key(conf) {
106113
continue;
107114
}
115+
debug!("Starting stream from balancer: {:?}", &conf);
108116
let _conf = conf.clone();
109117
let events_tx = self.events_tx.clone();
110118
let _balancer_api_key = self.balancer_api_key.clone();
@@ -121,6 +129,9 @@ impl Collector {
121129
self.stream_tasks.insert(conf.clone(), task);
122130
}
123131

132+
// cleanup stream tasks that have finished
133+
self.stream_tasks.retain(|_conf, task| !task.is_finished());
134+
124135
Ok(SystemState(states))
125136
}
126137

@@ -149,25 +160,28 @@ impl Collector {
149160
loop {
150161
tokio::select! {
151162
msg = ws.next() => {
152-
if let Some(Ok(msg)) = msg {
153-
if msg.is_close() {
154-
break;
155-
}
156-
let msg = msg.to_string();
157-
if !should_send(&msg) {
158-
continue;
159-
}
160-
if let Err(err) = events_tx.try_send(msg) {
161-
match err {
162-
tokio::sync::mpsc::error::TrySendError::Full(_) => {
163-
warn!("Event bus is full, dropping event");
164-
}
165-
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
166-
warn!("Event bus is closed, stopping stream");
167-
break;
163+
match msg {
164+
Some(Ok(msg)) => {
165+
if msg.is_close() {
166+
break;
167+
}
168+
let msg = msg.to_string();
169+
if !should_send(&msg) {
170+
continue;
171+
}
172+
if let Err(err) = events_tx.try_send(msg) {
173+
match err {
174+
tokio::sync::mpsc::error::TrySendError::Full(_) => {
175+
warn!("Event bus is full, dropping event");
176+
}
177+
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
178+
warn!("Event bus is closed, stopping stream");
179+
break;
180+
}
168181
}
169182
}
170183
}
184+
_ => break,
171185
}
172186
}
173187
else => {

0 commit comments

Comments
 (0)