Skip to content

Commit

Permalink
fix: Omnichannel queue starting multiple times due to race condition (#…
Browse files Browse the repository at this point in the history
…34062)

Co-authored-by: Pierre Lehnen <[email protected]>
  • Loading branch information
aleksandernsilva and pierre-lehnen-rc authored Dec 6, 2024
1 parent 18cea50 commit 072a749
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/green-shirts-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/meteor": patch
---

Fixes condition causing Omnichannel queue to start more than once.
43 changes: 40 additions & 3 deletions apps/meteor/server/services/omnichannel/queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ServiceStarter } from '@rocket.chat/core-services';
import { type InquiryWithAgentInfo, type IOmnichannelQueue } from '@rocket.chat/core-typings';
import { License } from '@rocket.chat/license';
import { LivechatInquiry, LivechatRooms } from '@rocket.chat/models';
Expand All @@ -11,6 +12,17 @@ import { settings } from '../../../app/settings/server';
const DEFAULT_RACE_TIMEOUT = 5000;

export class OmnichannelQueue implements IOmnichannelQueue {
private serviceStarter: ServiceStarter;

private timeoutHandler: ReturnType<typeof setTimeout> | null = null;

constructor() {
this.serviceStarter = new ServiceStarter(
() => this._start(),
() => this._stop(),
);
}

private running = false;

private queues: (string | undefined)[] = [];
Expand All @@ -24,7 +36,7 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return this.running;
}

async start() {
private async _start() {
if (this.running) {
return;
}
Expand All @@ -37,17 +49,31 @@ export class OmnichannelQueue implements IOmnichannelQueue {
return this.execute();
}

async stop() {
private async _stop() {
if (!this.running) {
return;
}

await LivechatInquiry.unlockAll();

this.running = false;

if (this.timeoutHandler !== null) {
clearTimeout(this.timeoutHandler);
this.timeoutHandler = null;
}

queueLogger.info('Service stopped');
}

async start() {
return this.serviceStarter.start();
}

async stop() {
return this.serviceStarter.stop();
}

private async getActiveQueues() {
// undefined = public queue(without department)
return ([undefined] as typeof this.queues).concat(await LivechatInquiry.getDistinctQueuedDepartments({}));
Expand Down Expand Up @@ -118,10 +144,21 @@ export class OmnichannelQueue implements IOmnichannelQueue {
err: e,
});
} finally {
setTimeout(this.execute.bind(this), this.delay());
this.scheduleExecution();
}
}

private scheduleExecution(): void {
if (this.timeoutHandler !== null) {
return;
}

this.timeoutHandler = setTimeout(() => {
this.timeoutHandler = null;
return this.execute();
}, this.delay());
}

async shouldStart() {
if (!settings.get('Livechat_enabled')) {
void this.stop();
Expand Down
6 changes: 1 addition & 5 deletions apps/meteor/server/services/omnichannel/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha
}

async started() {
settings.watch<boolean>('Livechat_enabled', (enabled) => {
void (enabled && RoutingManager.isMethodSet() ? this.queueWorker.shouldStart() : this.queueWorker.stop());
});

settings.watch<string>('Livechat_Routing_Method', async () => {
settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => {
this.queueWorker.shouldStart();
});

Expand Down
1 change: 1 addition & 0 deletions packages/core-services/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export {
} from './types/IOmnichannelAnalyticsService';

export { getConnection, getTrashCollection } from './lib/mongo';
export { ServiceStarter } from './lib/ServiceStarter';

export {
AutoUpdateRecord,
Expand Down
68 changes: 68 additions & 0 deletions packages/core-services/src/lib/ServiceStarter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// This class is used to manage calls to a service's .start and .stop functions
// Specifically for cases where the start function has different conditions that may cause the service to actually start or not,
// or when the start process can take a while to complete
// Using this class, you ensure that calls to .start and .stop will be chained, so you avoid race conditions
// At the same time, it prevents those functions from running more times than necessary if there are several calls to them (for example when loading setting values)
export class ServiceStarter {
private lock = Promise.resolve();

private currentCall?: 'start' | 'stop';

private nextCall?: 'start' | 'stop';

private starterFn: () => Promise<void>;

private stopperFn?: () => Promise<void>;

constructor(starterFn: () => Promise<void>, stopperFn?: () => Promise<void>) {
this.starterFn = starterFn;
this.stopperFn = stopperFn;
}

private async checkStatus(): Promise<void> {
if (this.nextCall === 'start') {
return this.doCall('start');
}

if (this.nextCall === 'stop') {
return this.doCall('stop');
}
}

private async doCall(call: 'start' | 'stop'): Promise<void> {
this.nextCall = undefined;
this.currentCall = call;
try {
if (call === 'start') {
await this.starterFn();
} else if (this.stopperFn) {
await this.stopperFn();
}
} finally {
this.currentCall = undefined;
await this.checkStatus();
}
}

private async call(call: 'start' | 'stop'): Promise<void> {
// If something is already chained to run after the current call, it's okay to replace it with the new call
this.nextCall = call;
if (this.currentCall) {
return this.lock;
}
this.lock = this.checkStatus();
return this.lock;
}

async start(): Promise<void> {
return this.call('start');
}

async stop(): Promise<void> {
return this.call('stop');
}

async wait(): Promise<void> {
return this.lock;
}
}
91 changes: 91 additions & 0 deletions packages/core-services/tests/ServiceStarter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { ServiceStarter } from '../src/lib/ServiceStarter';

const wait = (time: number) => {
return new Promise((resolve) => {
setTimeout(() => resolve(undefined), time);
});
};

describe('ServiceStarter', () => {
it('should call the starterFn and stopperFn when calling .start and .stop', async () => {
const start = jest.fn();
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

expect(start).not.toHaveBeenCalled();
expect(stop).not.toHaveBeenCalled();

await instance.start();

expect(start).toHaveBeenCalled();
expect(stop).not.toHaveBeenCalled();

start.mockReset();

await instance.stop();

expect(start).not.toHaveBeenCalled();
expect(stop).toHaveBeenCalled();
});

it('should only call .start for the second time after the initial call has finished running', async () => {
let running = false;
const start = jest.fn(async () => {
expect(running).toBe(false);

running = true;
await wait(100);
running = false;
});
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

void instance.start();
void instance.start();

await instance.wait();

expect(start).toHaveBeenCalledTimes(2);
expect(stop).not.toHaveBeenCalled();
});

it('should chain up to two calls to .start', async () => {
const start = jest.fn(async () => {
await wait(100);
});
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

void instance.start();
void instance.start();
void instance.start();
void instance.start();

await instance.wait();

expect(start).toHaveBeenCalledTimes(2);
expect(stop).not.toHaveBeenCalled();
});

it('should skip the chained calls to .start if .stop is called', async () => {
const start = jest.fn(async () => {
await wait(100);
});
const stop = jest.fn();

const instance = new ServiceStarter(start, stop);

void instance.start();
void instance.start();
void instance.start();
void instance.stop();

await instance.wait();

expect(start).toHaveBeenCalledTimes(1);
expect(stop).toHaveBeenCalledTimes(1);
});
});

0 comments on commit 072a749

Please sign in to comment.