Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: don't consider balancer fully connected until init message is received #1559

Merged
merged 1 commit into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions server/balancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
balancerManager.addBalancerConnection(conn);
});
wss.on("error", error => {
log.error(`Balancer websocket error: ${error}`);

Check warning on line 43 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "Error" of template literal expression

Check warning on line 43 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "Error" of template literal expression
});
wss.on("listening", () => {
log.info(
Expand All @@ -55,23 +55,52 @@
balancerConnections: BalancerConnection[] = [];
bus = new EventEmitter();

addBalancerConnection(conn: BalancerConnection) {
this.balancerConnections.push(conn);
this.onBalancerConnect(conn);
async addBalancerConnection(conn: BalancerConnection) {
this.onBalancerPreconnect(conn);
conn.on("connect", () => this.onBalancerConnect(conn));
conn.on("disconnect", (code, reason) => this.onBalancerDisconnect(conn, code, reason));
conn.on("message", msg => this.onBalancerMessage(conn, msg));
conn.on("error", error => this.onBalancerError(conn, error));

const waitForInit = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
conn.disconnect();
reject(new Error("Balancer did not send init message"));

Check warning on line 67 in server/balancer.ts

View check run for this annotation

Codecov / codecov/patch

server/balancer.ts#L66-L67

Added lines #L66 - L67 were not covered by tests
}, 1000 * 10);
const handler = (msg: MsgB2M) => {
if (msg.type === "init") {
conn.id = msg.payload.id;
conn.off("message", handler);
conn.off("disconnect", disconnectHandler);
clearTimeout(timeout);
resolve();
}
};
const disconnectHandler = () => {
clearTimeout(timeout);
reject(new Error("Balancer disconnected before sending init message"));
};

Check warning on line 81 in server/balancer.ts

View check run for this annotation

Codecov / codecov/patch

server/balancer.ts#L79-L81

Added lines #L79 - L81 were not covered by tests
conn.on("message", handler);
conn.on("disconnect", disconnectHandler);
});

try {
log.debug("Waiting for balancer init message");
await waitForInit;
} catch (e) {
log.error(`Balancer did not send init message in time: ${e}`);
return;

Check warning on line 91 in server/balancer.ts

View check run for this annotation

Codecov / codecov/patch

server/balancer.ts#L90-L91

Added lines #L90 - L91 were not covered by tests
}

conn.on("message", msg => this.onBalancerMessage(conn, msg));
this.onBalancerConnect(conn);
this.balancerConnections.push(conn);
}

getConnection(id: string): BalancerConnection | undefined {
return this.balancerConnections.find(conn => conn.id === id);
}

private onBalancerConnect(conn: BalancerConnection) {
log.info(`Connected to balancer ${conn.id}`);
this.emit("connect", conn);

private onBalancerPreconnect(conn: BalancerConnection) {
const init: MsgM2B = {
type: "init",
payload: {
Expand All @@ -83,6 +112,11 @@
conn.send(init);
}

private onBalancerConnect(conn: BalancerConnection) {
log.info(`Connected to balancer ${conn.id}`);
this.emit("connect", conn);
}

private onBalancerDisconnect(conn: BalancerConnection, code: number, reason: string) {
log.debug(`Disconnected from balancer ${conn.id}: ${code} ${reason}`);
this.emit("disconnect", conn);
Expand All @@ -99,7 +133,7 @@
}

private onBalancerError(conn: BalancerConnection, error: WebSocket.ErrorEvent) {
log.error(`Error from balancer ${conn.id}: ${error}`);

Check warning on line 136 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "ErrorEvent" of template literal expression

Check warning on line 136 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "ErrorEvent" of template literal expression
this.emit("error", conn, error);
}

Expand Down Expand Up @@ -159,7 +193,13 @@
this.bus.on(event, handler);
}

off<E extends BalancerConnectionEvents>(event: E, handler: BalancerConnectionEventHandlers<E>) {
this.bus.off(event, handler);
}

abstract send(message: MsgM2B): Result<void, Error>;

abstract disconnect(): Result<void, Error>;
}

/** Manages the websocket connection to a Balancer. */
Expand Down Expand Up @@ -216,7 +256,7 @@
}
this.emit("message", result.value);
} else {
log.error(`Error parsing incoming balancer message: ${result.value} - ${data}`);

Check warning on line 259 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "Error" of template literal expression

Check warning on line 259 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "Data" of template literal expression

Check warning on line 259 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "Error" of template literal expression

Check warning on line 259 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "Data" of template literal expression
}
}

Expand Down Expand Up @@ -278,7 +318,7 @@
const result = await roommanager.getRoom(roomName, { mustAlreadyBeLoaded: true });
if (!result.ok) {
log.error(
`Failed to grab room that should have been loaded. Can't inform balancers. room=${roomName}: ${result.value}`

Check warning on line 321 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "RoomNotFoundException | RoomAlreadyLoadedException" of template literal expression

Check warning on line 321 in server/balancer.ts

View workflow job for this annotation

GitHub Actions / lint (18.x)

Invalid type "RoomNotFoundException | RoomAlreadyLoadedException" of template literal expression
);
return;
}
Expand Down
36 changes: 34 additions & 2 deletions server/tests/unit/clientmanager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import roommanager from "../../roommanager";
import { loadModels } from "../../models";
import type { Request } from "express";
import { loadConfigFile, conf } from "../../ott-config";
import { M2BInit } from "server/generated";
import { M2BInit, type MsgB2M } from "server/generated";
import { v4 as uuidv4 } from "uuid";

class TestClient extends Client {
sendRawMock = vi.fn();
Expand All @@ -34,6 +35,7 @@ class TestClient extends Client {

class BalancerConnectionMock extends BalancerConnection {
sendMock = vi.fn<[MsgM2B], void>();
disconnectMock = vi.fn<[], void>();

constructor() {
super();
Expand All @@ -44,6 +46,21 @@ class BalancerConnectionMock extends BalancerConnection {
return ok(undefined);
}

disconnect(): Result<void, Error> {
this.disconnectMock();
return ok(undefined);
}

async emitInit() {
const init: MsgB2M = {
type: "init",
payload: {
id: this.id,
},
};
this.emit("message", init);
}

public emit<E extends BalancerConnectionEvents>(
event: E,
...args: Parameters<BalancerConnectionEventHandlers<E>>
Expand Down Expand Up @@ -135,6 +152,7 @@ describe("ClientManager", () => {
it("should disconnect all clients when a balancer disconnects", async () => {
const mockBalancerCon = new BalancerConnectionMock();
balancerManager.addBalancerConnection(mockBalancerCon);
await mockBalancerCon.emitInit();
const client = new BalancerClient("foo", "foo", mockBalancerCon);
clientmanager.addClient(client);
client.emit("auth", client, "token", { isLoggedIn: false, username: "foo" });
Expand All @@ -153,7 +171,9 @@ describe("ClientManager", () => {
const mockBalancerCon = new BalancerConnectionMock();
const mockBalancerCon2 = new BalancerConnectionMock();
balancerManager.addBalancerConnection(mockBalancerCon);
await mockBalancerCon.emitInit();
balancerManager.addBalancerConnection(mockBalancerCon2);
await mockBalancerCon2.emitInit();
const client1 = new BalancerClient("foo", "foo1", mockBalancerCon);
const client2 = new BalancerClient("foo", "foo2", mockBalancerCon);
const client3 = new TestClient("foo");
Expand Down Expand Up @@ -186,14 +206,17 @@ describe("BalancerManager", () => {
balancerManager.balancerConnections.splice(0, balancerManager.balancerConnections.length);
});

it("should remove the correct balancer from the list when it disconnects", () => {
it("should remove the correct balancer from the list when it disconnects", async () => {
const con1 = new BalancerConnectionMock();
const con2 = new BalancerConnectionMock();
const con3 = new BalancerConnectionMock();

balancerManager.addBalancerConnection(con1);
await con1.emitInit();
balancerManager.addBalancerConnection(con2);
await con2.emitInit();
balancerManager.addBalancerConnection(con3);
await con3.emitInit();

expect(balancerManager.balancerConnections).toHaveLength(3);

Expand All @@ -202,6 +225,15 @@ describe("BalancerManager", () => {
expect(balancerManager.balancerConnections).toHaveLength(2);
expect(balancerManager.balancerConnections).not.toContain(con2);
});

it("should not add balancers before they send the init message", async () => {
const con1 = new BalancerConnectionMock();

balancerManager.addBalancerConnection(con1);
expect(balancerManager.balancerConnections).toHaveLength(0);
await con1.emitInit();
expect(balancerManager.balancerConnections).toHaveLength(1);
});
});

describe("MonolithId", () => {
Expand Down
Loading