Skip to content

Commit 2f5016c

Browse files
committed
balancer: check and handle closed channels more aggressively
1 parent 23e6ee5 commit 2f5016c

File tree

3 files changed

+42
-13
lines changed

3 files changed

+42
-13
lines changed

crates/ott-balancer/src/balancer.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -528,18 +528,31 @@ pub async fn join_monolith(
528528
let handle = tokio::task::Builder::new()
529529
.name(format!("monolith {}", monolith_id).as_ref())
530530
.spawn(async move {
531-
while let Some(msg) = client_inbound_rx.recv().await {
531+
loop {
532+
let Some(msg) = (tokio::select! {
533+
biased;
534+
_ = monolith_outbound_tx.closed() => {
535+
None
536+
}
537+
msg = client_inbound_rx.recv() => {
538+
msg
539+
}
540+
}) else {
541+
info!("monolith disconnected, stopping client inbound handler");
542+
break;
543+
};
544+
532545
if let Err(e) =
533546
handle_client_inbound(ctx.clone(), msg, monolith_outbound_tx.clone()).await
534547
{
535548
error!("failed to handle client inbound: {:?}", e);
536549
if monolith_outbound_tx.is_closed() {
537-
// the monolith has disconnected
538550
info!("monolith disconnected, stopping client inbound handler");
539551
break;
540552
}
541553
}
542554
}
555+
client_inbound_rx.close();
543556
})?;
544557
Ok(handle)
545558
}

crates/ott-balancer/src/client.rs

+23-11
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ impl ClientLink {
6565
/// Receive the next message from the Balancer that needs to be sent to this client.
6666
pub async fn outbound_recv(&mut self) -> Result<SocketMessage, RecvError> {
6767
let msg = tokio::select! {
68+
_ = self.room_tx.closed() => {
69+
return Err(RecvError::Closed);
70+
}
6871
msg = self.unicast_rx.recv() => {
6972
match msg {
7073
Some(msg) => Ok(msg),
@@ -197,8 +200,8 @@ pub async fn client_entry<'r>(
197200

198201
loop {
199202
tokio::select! {
200-
Ok(msg) = client_link.outbound_recv() => {
201-
if let SocketMessage::Message(msg) = msg {
203+
msg = client_link.outbound_recv() => {
204+
if let Ok(SocketMessage::Message(msg)) = msg {
202205
debug!(event = "ws", node_id = %client_id, room = %room_name, direction = "tx");
203206
if let Err(err) = stream.send(msg).await {
204207
error!("Error sending ws message to client: {:?}", err);
@@ -239,15 +242,24 @@ pub async fn client_entry<'r>(
239242
}
240243

241244
info!("ending client connection");
242-
client_link
243-
.room_tx
244-
.send(Context::new(
245-
client_id,
246-
SocketMessage::Message(Message::Close(Some(CloseFrame {
247-
code: CloseCode::Normal,
248-
reason: "client connection ended".into(),
249-
}))),
250-
))
245+
if !client_link.room_tx.is_closed() {
246+
client_link
247+
.room_tx
248+
.send(Context::new(
249+
client_id,
250+
SocketMessage::Message(Message::Close(Some(CloseFrame {
251+
code: CloseCode::Normal,
252+
reason: "client connection ended".into(),
253+
}))),
254+
))
255+
.await?;
256+
}
257+
258+
stream
259+
.close(Some(CloseFrame {
260+
code: CloseCode::Normal,
261+
reason: "client connection ended".into(),
262+
}))
251263
.await?;
252264

253265
Ok(())

crates/ott-balancer/src/connection.rs

+4
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,10 @@ async fn connect_and_maintain(
263263
}
264264
}
265265
}
266+
// drain the outbound queue
267+
outbound_rx.close();
268+
while let Some(_) = outbound_rx.recv().await {}
269+
266270
if cancel.is_cancelled() {
267271
break;
268272
}

0 commit comments

Comments
 (0)