Skip to content

Commit 6760a05

Browse files
committed
vis: add more streamed events
1 parent 21bf04e commit 6760a05

File tree

7 files changed

+116
-8
lines changed

7 files changed

+116
-8
lines changed

crates/ott-balancer/src/client.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub async fn client_entry<'r>(
128128
let client_id = Uuid::new_v4().into();
129129
let client = UnauthorizedClient {
130130
id: client_id,
131-
room: room_name,
131+
room: room_name.clone(),
132132
};
133133
tracing::Span::current().record("client_id", client_id.to_string());
134134
info!("client connected");
@@ -199,7 +199,7 @@ pub async fn client_entry<'r>(
199199
tokio::select! {
200200
Ok(msg) = client_link.outbound_recv() => {
201201
if let SocketMessage::Message(msg) = msg {
202-
debug!(event = "ws", node_id = %client_id, direction = "tx");
202+
debug!(event = "ws", node_id = %client_id, room = %room_name, direction = "tx");
203203
if let Err(err) = stream.send(msg).await {
204204
error!("Error sending ws message to client: {:?}", err);
205205
break;
@@ -220,7 +220,7 @@ pub async fn client_entry<'r>(
220220
continue;
221221
}
222222

223-
debug!(event = "ws", node_id = %client_id, direction = "rx");
223+
debug!(event = "ws", node_id = %client_id, room = %room_name, direction = "rx");
224224
if let Err(err) = client_link.inbound_send(msg).await {
225225
error!("Error sending client message to balancer: {:?}", err);
226226
break;

crates/ott-balancer/src/monolith.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use ott_balancer_protocol::monolith::*;
66
use ott_balancer_protocol::*;
77
use ott_common::discovery::ConnectionConfig;
88
use tokio_tungstenite::tungstenite::Message;
9-
use tracing::error;
9+
use tracing::{debug, error};
1010

1111
use crate::messages::*;
1212

@@ -194,6 +194,7 @@ impl Room {
194194

195195
/// Broadcast a message to all clients in this room.
196196
pub fn broadcast(&self, msg: impl Into<SocketMessage>) -> anyhow::Result<()> {
197+
debug!(event = "broadcast", node_id = %self.name, direction = "tx");
197198
self.broadcast_tx.send(msg.into())?;
198199
Ok(())
199200
}

crates/ott-balancer/src/service.rs

+2
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ impl Service<Request<IncomingBody>> for BalancerService {
246246
};
247247
if let Some(monolith) = monolith {
248248
info!("proxying request to monolith {}", monolith.id());
249+
debug!(event = "proxy", direction = "tx", room = %room_name, node_id = %monolith.id());
249250
match proxy_request(req, monolith).await {
250251
Ok(res) => Ok(res),
251252
Err(err) => {
@@ -266,6 +267,7 @@ impl Service<Request<IncomingBody>> for BalancerService {
266267
message = "proxying request to monolith",
267268
monolith = %monolith.id(),
268269
);
270+
debug!(event = "proxy", direction = "tx", node_id = %monolith.id());
269271
match proxy_request(req, monolith).await {
270272
Ok(res) => Ok(res),
271273
Err(err) => {

crates/ott-collector/src/collector.rs

+82-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use once_cell::sync::Lazy;
4-
use ott_balancer_protocol::collector::BalancerState;
4+
use ott_balancer_protocol::{collector::BalancerState, RoomName};
55
use ott_common::discovery::{ConnectionConfig, ServiceDiscoveryMsg};
66
use rocket::futures::StreamExt;
77
use serde::Deserialize;
@@ -205,6 +205,8 @@ fn should_send(event: &str) -> bool {
205205
#[non_exhaustive]
206206
enum Event {
207207
Ws(EventWebsocketMessage),
208+
Proxy(EventProxyRequest),
209+
Broadcast(EventBroadcast),
208210
}
209211

210212
/// Indicates that a message was sent or received on a websocket connection to a balancer.
@@ -213,6 +215,22 @@ enum Event {
213215
struct EventWebsocketMessage {
214216
node_id: Uuid,
215217
direction: MsgDirection,
218+
room: Option<RoomName>,
219+
}
220+
221+
#[derive(Debug, Deserialize)]
222+
#[allow(dead_code)]
223+
struct EventProxyRequest {
224+
node_id: Uuid,
225+
direction: MsgDirection,
226+
room: Option<RoomName>,
227+
}
228+
229+
#[derive(Debug, Deserialize)]
230+
#[allow(dead_code)]
231+
struct EventBroadcast {
232+
node_id: String,
233+
direction: MsgDirection,
216234
}
217235

218236
#[derive(Debug, Deserialize)]
@@ -228,7 +246,7 @@ mod tests {
228246
use uuid::uuid;
229247

230248
#[test]
231-
fn test_deserialize_event() {
249+
fn test_deserialize_event_ws() {
232250
let event =
233251
r#"{"event":"ws","node_id":"f47ac10b-58cc-4372-a567-0e02b2c3d479", "direction": "tx"}"#;
234252
let event: Event = serde_json::from_str(event).unwrap();
@@ -242,4 +260,66 @@ mod tests {
242260
}
243261
}
244262
}
263+
264+
#[test]
265+
fn test_deserialize_event_ws2() {
266+
let event = r#"{"event":"ws","node_id":"f47ac10b-58cc-4372-a567-0e02b2c3d479", "room": "foo", "direction": "tx"}"#;
267+
let event: Event = serde_json::from_str(event).unwrap();
268+
#[allow(unreachable_patterns)]
269+
match event {
270+
Event::Ws(EventWebsocketMessage { node_id, room, .. }) => {
271+
assert_eq!(node_id, uuid!("f47ac10b-58cc-4372-a567-0e02b2c3d479"));
272+
assert_eq!(room, Some("foo".into()));
273+
}
274+
_ => {
275+
panic!("unexpected event type: {:?}", event);
276+
}
277+
}
278+
}
279+
280+
#[test]
281+
fn test_deserialize_event_proxy() {
282+
let event = r#"{"event":"proxy","node_id":"f47ac10b-58cc-4372-a567-0e02b2c3d479", "direction": "tx"}"#;
283+
let event: Event = serde_json::from_str(event).unwrap();
284+
#[allow(unreachable_patterns)]
285+
match event {
286+
Event::Proxy(EventProxyRequest { node_id, .. }) => {
287+
assert_eq!(node_id, uuid!("f47ac10b-58cc-4372-a567-0e02b2c3d479"));
288+
}
289+
_ => {
290+
panic!("unexpected event type: {:?}", event);
291+
}
292+
}
293+
}
294+
295+
#[test]
296+
fn test_deserialize_event_proxy2() {
297+
let event = r#"{"event":"proxy","node_id":"f47ac10b-58cc-4372-a567-0e02b2c3d479", "room": "foo", "direction": "tx"}"#;
298+
let event: Event = serde_json::from_str(event).unwrap();
299+
#[allow(unreachable_patterns)]
300+
match event {
301+
Event::Proxy(EventProxyRequest { node_id, room, .. }) => {
302+
assert_eq!(node_id, uuid!("f47ac10b-58cc-4372-a567-0e02b2c3d479"));
303+
assert_eq!(room, Some("foo".into()));
304+
}
305+
_ => {
306+
panic!("unexpected event type: {:?}", event);
307+
}
308+
}
309+
}
310+
311+
#[test]
312+
fn test_deserialize_event_broadcast() {
313+
let event = r#"{"event":"broadcast","node_id":"foo", "direction": "tx"}"#;
314+
let event: Event = serde_json::from_str(event).unwrap();
315+
#[allow(unreachable_patterns)]
316+
match event {
317+
Event::Broadcast(EventBroadcast { node_id, .. }) => {
318+
assert_eq!(node_id, "foo");
319+
}
320+
_ => {
321+
panic!("unexpected event type: {:?}", event);
322+
}
323+
}
324+
}
245325
}

packages/ott-vis-datasource/src/datasource.ts

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export class DataSource extends DataSourceApi<MyQuery, MyDataSourceOptions> {
3737
frame.addField({ name: "event", type: FieldType.string });
3838
frame.addField({ name: "node_id", type: FieldType.string });
3939
frame.addField({ name: "direction", type: FieldType.string });
40+
frame.addField({ name: "room", type: FieldType.string });
4041

4142
const base = this.baseUrl.replace(/^http/, "ws");
4243

packages/ott-vis-panel/src/components/views/TopologyView.tsx

+25-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import "./topology-view.css";
1717
import { useColorProvider } from "colors";
1818
import { useD3Zoom } from "chartutils";
1919
import { dedupeItems } from "aggregate";
20-
import { useEventBus } from "eventbus";
20+
import { useEventBus, type BusEvent } from "eventbus";
2121

2222
/**
2323
* The goal of this component is to show a more accurate topology view from the perspective of actual network connections.
@@ -410,6 +410,7 @@ export const TopologyView: React.FC<TopologyViewProps> = ({
410410
useEffect(() => {
411411
const rxColor = "#0f0";
412412
const txColor = "#00f";
413+
const proxyColor = "#f58d05";
413414

414415
function animateNode(
415416
node: d3.Selection<any, d3.HierarchyNode<TreeNode>, any, any>,
@@ -461,8 +462,18 @@ export const TopologyView: React.FC<TopologyViewProps> = ({
461462
.attrTween("stroke-width", () => t => d3.interpolateNumber(4, 1.5)(t).toString());
462463
}
463464

465+
function getColor(event: BusEvent): string {
466+
if (event.event === "ws" || event.event === "broadcast") {
467+
return event.direction === "rx" ? rxColor : txColor;
468+
} else if (event.event === "proxy") {
469+
return proxyColor;
470+
} else {
471+
return "#f00";
472+
}
473+
}
474+
464475
const sub = eventBus.subscribe(event => {
465-
const color = event.direction === "rx" ? rxColor : txColor;
476+
const color = getColor(event);
466477
const node = d3.select<d3.BaseType, d3.HierarchyNode<TreeNode>>(
467478
`[data-nodeid="${event.node_id}"]`
468479
);
@@ -478,6 +489,18 @@ export const TopologyView: React.FC<TopologyViewProps> = ({
478489
`[data-nodeid-target="${event.node_id}"]`
479490
);
480491
animateLinks(links, color);
492+
493+
if (event.room) {
494+
const room = d3.select<d3.BaseType, d3.HierarchyNode<TreeNode>>(
495+
`[data-nodeid="${event.room}"]`
496+
);
497+
animateNode(room, color);
498+
499+
const roomLink = d3.select<d3.BaseType, d3.HierarchyLink<TreeNode>>(
500+
`[data-nodeid-target="${event.room}"]`
501+
);
502+
animateLinks(roomLink, color);
503+
}
481504
});
482505

483506
return () => {

packages/ott-vis-panel/src/eventbus.ts

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ export type BusEvent = {
1111
event: string;
1212
node_id: string;
1313
direction: "tx" | "rx";
14+
room?: string;
1415
};

0 commit comments

Comments
 (0)