Skip to content

Commit a599036

Browse files
authored
clientmanager: add some unit tests (#1165)
1 parent 967d3c7 commit a599036

File tree

4 files changed

+167
-25
lines changed

4 files changed

+167
-25
lines changed

server/balancer.ts

+29-20
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export function initBalancerConnections() {
3535
});
3636
wss.on("connection", ws => {
3737
log.debug("New balancer connection");
38-
const conn = new BalancerConnection(ws);
38+
const conn = new BalancerConnectionReal(ws);
3939
balancerManager.addBalancerConnection(conn);
4040
});
4141
wss.on("error", error => {
@@ -115,8 +115,8 @@ class BalancerManager {
115115

116116
export const balancerManager = new BalancerManager();
117117

118-
type BalancerManagerEvemts = BalancerConnectionEvents;
119-
type BalancerManagerEventHandlers<E> = E extends "connect"
118+
export type BalancerManagerEvemts = BalancerConnectionEvents;
119+
export type BalancerManagerEventHandlers<E> = E extends "connect"
120120
? (conn: BalancerConnection) => void
121121
: E extends "disconnect"
122122
? (conn: BalancerConnection) => void
@@ -126,8 +126,8 @@ type BalancerManagerEventHandlers<E> = E extends "connect"
126126
? (conn: BalancerConnection, error: WebSocket.ErrorEvent) => void
127127
: never;
128128

129-
type BalancerConnectionEvents = "connect" | "disconnect" | "message" | "error";
130-
type BalancerConnectionEventHandlers<E> = E extends "connect"
129+
export type BalancerConnectionEvents = "connect" | "disconnect" | "message" | "error";
130+
export type BalancerConnectionEventHandlers<E> = E extends "connect"
131131
? () => void
132132
: E extends "disconnect"
133133
? (code: number, reason: string) => void
@@ -137,15 +137,35 @@ type BalancerConnectionEventHandlers<E> = E extends "connect"
137137
? (error: WebSocket.ErrorEvent) => void
138138
: never;
139139

140-
/** Manages the websocket connection to a Balancer. */
141-
export class BalancerConnection {
140+
export abstract class BalancerConnection {
142141
/** A local identifier for the balancer. Other monoliths will have different IDs for the same balancer. */
143142
id: string;
143+
protected bus: EventEmitter = new EventEmitter();
144+
145+
constructor() {
146+
this.id = uuidv4();
147+
}
148+
149+
protected emit<E extends BalancerConnectionEvents>(
150+
event: E,
151+
...args: Parameters<BalancerConnectionEventHandlers<E>>
152+
) {
153+
this.bus.emit(event, ...args);
154+
}
155+
156+
on<E extends BalancerConnectionEvents>(event: E, handler: BalancerConnectionEventHandlers<E>) {
157+
this.bus.on(event, handler);
158+
}
159+
160+
abstract send(message: MsgM2B): Result<void, Error>;
161+
}
162+
163+
/** Manages the websocket connection to a Balancer. */
164+
export class BalancerConnectionReal extends BalancerConnection {
144165
private socket: WebSocket;
145-
private bus: EventEmitter = new EventEmitter();
146166

147167
constructor(socket: WebSocket) {
148-
this.id = uuidv4();
168+
super();
149169
this.socket = socket;
150170

151171
this.socket.on("open", this.onSocketConnect.bind(this));
@@ -201,17 +221,6 @@ export class BalancerConnection {
201221
this.emit("error", event);
202222
}
203223

204-
private emit<E extends BalancerConnectionEvents>(
205-
event: E,
206-
...args: Parameters<BalancerConnectionEventHandlers<E>>
207-
) {
208-
this.bus.emit(event, ...args);
209-
}
210-
211-
on<E extends BalancerConnectionEvents>(event: E, handler: BalancerConnectionEventHandlers<E>) {
212-
this.bus.on(event, handler);
213-
}
214-
215224
send(message: MsgM2B): Result<void, Error> {
216225
if (this.socket === null) {
217226
return err(new Error("Not connected"));

server/client.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { v4 as uuidv4 } from "uuid";
66
import EventEmitter from "events";
77
import { getLogger } from "./logger";
88
import { getSessionInfo } from "./auth/tokens";
9-
import { BalancerConnection } from "./balancer";
9+
import { BalancerConnection, BalancerConnectionReal } from "./balancer";
1010
import { replacer } from "../common/serialize";
1111

1212
const log = getLogger("client");

server/clientmanager.ts

+17-4
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ export async function setup(): Promise<void> {
5555
balancerManager.on("error", onBalancerError);
5656
initBalancerConnections();
5757

58-
log.silly("creating redis subscriber");
59-
const redisSubscriber = await createSubscriber();
60-
log.silly("subscribing to announcement channel");
61-
await redisSubscriber.subscribe(ANNOUNCEMENT_CHANNEL, onAnnouncement);
58+
if (conf.get("env") !== "test") {
59+
log.silly("creating redis subscriber");
60+
const redisSubscriber = await createSubscriber();
61+
log.silly("subscribing to announcement channel");
62+
await redisSubscriber.subscribe(ANNOUNCEMENT_CHANNEL, onAnnouncement);
63+
}
6264
}
6365

6466
/**
@@ -69,6 +71,10 @@ async function onDirectConnect(socket: WebSocket, req: express.Request) {
6971
const roomName = req.url.split("/").slice(-1)[0];
7072
log.debug(`connection received: ${roomName}, waiting for auth token...`);
7173
const client = new DirectClient(roomName, socket);
74+
addClient(client);
75+
}
76+
77+
export function addClient(client: Client) {
7278
connections.push(client);
7379
client.on("auth", onClientAuth);
7480
client.on("message", onClientMessage);
@@ -427,6 +433,10 @@ function getClient(id: ClientId): Client | undefined {
427433
return undefined;
428434
}
429435

436+
function getClientsInRoom(roomName: string): Client[] {
437+
return roomJoins.get(roomName) ?? [];
438+
}
439+
430440
setInterval(() => {
431441
for (const client of connections) {
432442
if (client instanceof DirectClient) {
@@ -472,4 +482,7 @@ export default {
472482
onUserModified,
473483
getClientByToken,
474484
makeRoomRequest,
485+
addClient,
486+
getClient,
487+
getClientsInRoom,
475488
};
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import clientmanager from "../../clientmanager";
2+
import {
3+
BalancerConnection,
4+
BalancerConnectionEventHandlers,
5+
BalancerConnectionEvents,
6+
BalancerConnectionReal,
7+
MsgM2B,
8+
balancerManager,
9+
} from "../../balancer";
10+
import { BalancerClient, Client } from "../../client";
11+
import { OttWebsocketError } from "common/models/types";
12+
import { buildClients } from "../../redisclient";
13+
import { Result, ok } from "../../../common/result";
14+
import roommanager from "../../roommanager";
15+
import { loadModels } from "../../models";
16+
17+
class TestClient extends Client {
18+
sendRawMock = jest.fn();
19+
kickMock = jest.fn();
20+
21+
sendRaw(msg: string): void {
22+
this.sendRawMock(msg);
23+
}
24+
25+
kick(code: OttWebsocketError): void {
26+
this.kickMock(code);
27+
this.emit("disconnect", this);
28+
}
29+
}
30+
31+
class BalancerConnectionMock extends BalancerConnection {
32+
sendMock = jest.fn();
33+
34+
constructor() {
35+
super();
36+
}
37+
38+
send(msg: MsgM2B): Result<void, Error> {
39+
this.sendMock(msg);
40+
return ok(undefined);
41+
}
42+
43+
public emit<E extends BalancerConnectionEvents>(
44+
event: E,
45+
...args: Parameters<BalancerConnectionEventHandlers<E>>
46+
) {
47+
super.emit(event, ...args);
48+
}
49+
}
50+
51+
describe("ClientManager", () => {
52+
beforeAll(async () => {
53+
loadModels();
54+
await buildClients();
55+
await clientmanager.setup();
56+
});
57+
58+
beforeEach(async () => {
59+
await roommanager.createRoom({
60+
name: "foo",
61+
isTemporary: true,
62+
});
63+
});
64+
65+
afterEach(async () => {
66+
await roommanager.unloadRoom("foo");
67+
});
68+
69+
it("should add clients", () => {
70+
const client = new TestClient("foo");
71+
clientmanager.addClient(client);
72+
expect(clientmanager.getClient(client.id)).toBe(client);
73+
});
74+
75+
it("should add clients to roomJoins when they auth", async () => {
76+
const client = new TestClient("foo");
77+
clientmanager.addClient(client);
78+
client.emit("auth", client, "token", { isLoggedIn: false, username: "foo" });
79+
await new Promise(resolve => setTimeout(resolve, 100));
80+
const joins = clientmanager.getClientsInRoom("foo");
81+
expect(joins).toHaveLength(1);
82+
});
83+
84+
it("should remove clients when they disconnect", () => {
85+
const client = new TestClient("foo");
86+
clientmanager.addClient(client);
87+
client.emit("disconnect", client);
88+
expect(clientmanager.getClient(client.id)).toBeUndefined();
89+
});
90+
91+
it("should remove clients from roomJoins when they disconnect", async () => {
92+
const client = new TestClient("foo");
93+
clientmanager.addClient(client);
94+
client.emit("auth", client, "token", { isLoggedIn: false, username: "foo" });
95+
await new Promise(resolve => setTimeout(resolve, 100));
96+
const joins = clientmanager.getClientsInRoom("foo");
97+
expect(joins).toHaveLength(1);
98+
99+
client.emit("disconnect", client);
100+
const joins2 = clientmanager.getClientsInRoom("foo");
101+
expect(joins2).toHaveLength(0);
102+
});
103+
104+
it("should disconnect all clients when a balancer disconnects", async () => {
105+
const mockBalancerCon = new BalancerConnectionMock();
106+
balancerManager.addBalancerConnection(mockBalancerCon);
107+
const client = new BalancerClient("foo", "foo", mockBalancerCon);
108+
clientmanager.addClient(client);
109+
client.emit("auth", client, "token", { isLoggedIn: false, username: "foo" });
110+
await new Promise(resolve => setTimeout(resolve, 100));
111+
const joins = clientmanager.getClientsInRoom("foo");
112+
expect(joins).toHaveLength(1);
113+
114+
mockBalancerCon.emit("disconnect", 1000, "reason");
115+
116+
expect(clientmanager.getClient(client.id)).toBeUndefined();
117+
const joins2 = clientmanager.getClientsInRoom("foo");
118+
expect(joins2).toHaveLength(0);
119+
});
120+
});

0 commit comments

Comments
 (0)