Skip to content

Commit

Permalink
server: don't consider balancer fully connected until init message is…
Browse files Browse the repository at this point in the history
… received
  • Loading branch information
dyc3 committed Mar 23, 2024
1 parent 72babac commit 6e630e4
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 10 deletions.
56 changes: 48 additions & 8 deletions server/balancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,52 @@ class BalancerManager {
balancerConnections: BalancerConnection[] = [];
bus = new EventEmitter();

addBalancerConnection(conn: BalancerConnection) {
this.balancerConnections.push(conn);
this.onBalancerConnect(conn);
async addBalancerConnection(conn: BalancerConnection) {
this.onBalancerPreconnect(conn);
conn.on("connect", () => this.onBalancerConnect(conn));
conn.on("disconnect", (code, reason) => this.onBalancerDisconnect(conn, code, reason));
conn.on("message", msg => this.onBalancerMessage(conn, msg));
conn.on("error", error => this.onBalancerError(conn, error));

const waitForInit = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
conn.disconnect();
reject(new Error("Balancer did not send init message"));

Check warning on line 67 in server/balancer.ts

View check run for this annotation

Codecov / codecov/patch

server/balancer.ts#L66-L67

Added lines #L66 - L67 were not covered by tests
}, 1000 * 10);
const handler = (msg: MsgB2M) => {
if (msg.type === "init") {
conn.id = msg.payload.id;
conn.off("message", handler);
conn.off("disconnect", disconnectHandler);
clearTimeout(timeout);
resolve();
}
};
const disconnectHandler = () => {
clearTimeout(timeout);
reject(new Error("Balancer disconnected before sending init message"));
};

Check warning on line 81 in server/balancer.ts

View check run for this annotation

Codecov / codecov/patch

server/balancer.ts#L79-L81

Added lines #L79 - L81 were not covered by tests
conn.on("message", handler);
conn.on("disconnect", disconnectHandler);
});

try {
log.debug("Waiting for balancer init message");
await waitForInit;
} catch (e) {
log.error(`Balancer did not send init message in time: ${e}`);
return;

Check warning on line 91 in server/balancer.ts

View check run for this annotation

Codecov / codecov/patch

server/balancer.ts#L90-L91

Added lines #L90 - L91 were not covered by tests
}

conn.on("message", msg => this.onBalancerMessage(conn, msg));
this.onBalancerConnect(conn);
this.balancerConnections.push(conn);
}

getConnection(id: string): BalancerConnection | undefined {
return this.balancerConnections.find(conn => conn.id === id);
}

private onBalancerConnect(conn: BalancerConnection) {
log.info(`Connected to balancer ${conn.id}`);
this.emit("connect", conn);

private onBalancerPreconnect(conn: BalancerConnection) {
const init: MsgM2B = {
type: "init",
payload: {
Expand All @@ -83,6 +112,11 @@ class BalancerManager {
conn.send(init);
}

private onBalancerConnect(conn: BalancerConnection) {
log.info(`Connected to balancer ${conn.id}`);
this.emit("connect", conn);
}

private onBalancerDisconnect(conn: BalancerConnection, code: number, reason: string) {
log.debug(`Disconnected from balancer ${conn.id}: ${code} ${reason}`);
this.emit("disconnect", conn);
Expand Down Expand Up @@ -159,7 +193,13 @@ export abstract class BalancerConnection {
this.bus.on(event, handler);
}

off<E extends BalancerConnectionEvents>(event: E, handler: BalancerConnectionEventHandlers<E>) {
this.bus.off(event, handler);
}

abstract send(message: MsgM2B): Result<void, Error>;

abstract disconnect(): Result<void, Error>;
}

/** Manages the websocket connection to a Balancer. */
Expand Down
36 changes: 34 additions & 2 deletions server/tests/unit/clientmanager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import roommanager from "../../roommanager";
import { loadModels } from "../../models";
import type { Request } from "express";
import { loadConfigFile, conf } from "../../ott-config";
import { M2BInit } from "server/generated";
import { M2BInit, type MsgB2M } from "server/generated";
import { v4 as uuidv4 } from "uuid";

class TestClient extends Client {
sendRawMock = vi.fn();
Expand All @@ -34,6 +35,7 @@ class TestClient extends Client {

class BalancerConnectionMock extends BalancerConnection {
sendMock = vi.fn<[MsgM2B], void>();
disconnectMock = vi.fn<[], void>();

constructor() {
super();
Expand All @@ -44,6 +46,21 @@ class BalancerConnectionMock extends BalancerConnection {
return ok(undefined);
}

disconnect(): Result<void, Error> {
this.disconnectMock();
return ok(undefined);
}

async emitInit() {
const init: MsgB2M = {
type: "init",
payload: {
id: this.id,
},
};
this.emit("message", init);
}

public emit<E extends BalancerConnectionEvents>(
event: E,
...args: Parameters<BalancerConnectionEventHandlers<E>>
Expand Down Expand Up @@ -135,6 +152,7 @@ describe("ClientManager", () => {
it("should disconnect all clients when a balancer disconnects", async () => {
const mockBalancerCon = new BalancerConnectionMock();
balancerManager.addBalancerConnection(mockBalancerCon);
await mockBalancerCon.emitInit();
const client = new BalancerClient("foo", "foo", mockBalancerCon);
clientmanager.addClient(client);
client.emit("auth", client, "token", { isLoggedIn: false, username: "foo" });
Expand All @@ -153,7 +171,9 @@ describe("ClientManager", () => {
const mockBalancerCon = new BalancerConnectionMock();
const mockBalancerCon2 = new BalancerConnectionMock();
balancerManager.addBalancerConnection(mockBalancerCon);
await mockBalancerCon.emitInit();
balancerManager.addBalancerConnection(mockBalancerCon2);
await mockBalancerCon2.emitInit();
const client1 = new BalancerClient("foo", "foo1", mockBalancerCon);
const client2 = new BalancerClient("foo", "foo2", mockBalancerCon);
const client3 = new TestClient("foo");
Expand Down Expand Up @@ -186,14 +206,17 @@ describe("BalancerManager", () => {
balancerManager.balancerConnections.splice(0, balancerManager.balancerConnections.length);
});

it("should remove the correct balancer from the list when it disconnects", () => {
it("should remove the correct balancer from the list when it disconnects", async () => {
const con1 = new BalancerConnectionMock();
const con2 = new BalancerConnectionMock();
const con3 = new BalancerConnectionMock();

balancerManager.addBalancerConnection(con1);
await con1.emitInit();
balancerManager.addBalancerConnection(con2);
await con2.emitInit();
balancerManager.addBalancerConnection(con3);
await con3.emitInit();

expect(balancerManager.balancerConnections).toHaveLength(3);

Expand All @@ -202,6 +225,15 @@ describe("BalancerManager", () => {
expect(balancerManager.balancerConnections).toHaveLength(2);
expect(balancerManager.balancerConnections).not.toContain(con2);
});

it("should not add balancers before they send the init message", async () => {
const con1 = new BalancerConnectionMock();

balancerManager.addBalancerConnection(con1);
expect(balancerManager.balancerConnections).toHaveLength(0);
await con1.emitInit();
expect(balancerManager.balancerConnections).toHaveLength(1);
});
});

describe("MonolithId", () => {
Expand Down

0 comments on commit 6e630e4

Please sign in to comment.