diff --git a/.changeset/thirty-needles-speak.md b/.changeset/thirty-needles-speak.md new file mode 100644 index 0000000000000..70aac86bbad3c --- /dev/null +++ b/.changeset/thirty-needles-speak.md @@ -0,0 +1,5 @@ +--- +"@rocket.chat/meteor": patch +--- + +Fixes an issue that caused the queue worker to stop processing if something failed when checking MAC limits or while fetching the list of queues diff --git a/apps/meteor/server/services/omnichannel/queue.ts b/apps/meteor/server/services/omnichannel/queue.ts index 31a159492cfa9..7e64b90679994 100644 --- a/apps/meteor/server/services/omnichannel/queue.ts +++ b/apps/meteor/server/services/omnichannel/queue.ts @@ -27,6 +27,8 @@ export class OmnichannelQueue implements IOmnichannelQueue { private running = false; + private errorDelay = 10 * 1000; // 10 seconds + private delay() { const timeout = settings.get('Omnichannel_queue_delay_timeout') ?? 5; return timeout < 1 ? DEFAULT_RACE_TIMEOUT : timeout * 1000; @@ -79,28 +81,38 @@ export class OmnichannelQueue implements IOmnichannelQueue { } private async execute() { - if (!this.running) { - queueLogger.debug('Queue stopped. Cannot execute'); - return; - } + try { + if (!this.running) { + queueLogger.debug('Queue stopped. Cannot execute'); + return; + } - if (await License.shouldPreventAction('monthlyActiveContacts', 1)) { - queueLogger.debug('MAC limit reached. Queue wont execute'); - this.running = false; - return; - } + if (await License.shouldPreventAction('monthlyActiveContacts', 1)) { + queueLogger.debug('MAC limit reached. Queue wont execute'); + this.running = false; + return; + } + + // We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle + // And we get tracing :) + const queues = await this.getActiveQueues(); + for await (const queue of queues) { + await tracerSpan( + 'omnichannel.queue', + { attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true }, + () => this.checkQueue(queue), + ); + } - // We still go 1 by 1, but we go with every queue every cycle instead of just 1 queue per cycle - // And we get tracing :) - const queues = await this.getActiveQueues(); - for await (const queue of queues) { - await tracerSpan( - 'omnichannel.queue', - { attributes: { workerTime: new Date().toISOString(), queue: queue || 'Public' }, root: true }, - () => this.checkQueue(queue), - ); + this.scheduleExecution(); + } catch (e) { + queueLogger.error({ + msg: 'Queue Worker Error. Rescheduling with extra delay', + extraDelay: this.errorDelay, + err: e, + }); + this.scheduleExecution(this.errorDelay); } - this.scheduleExecution(); } private async checkQueue(queue: string | null) { @@ -136,15 +148,18 @@ export class OmnichannelQueue implements IOmnichannelQueue { } } - private scheduleExecution(): void { + private scheduleExecution(extraDelay?: number): void { if (this.timeoutHandler !== null) { return; } - this.timeoutHandler = setTimeout(() => { - this.timeoutHandler = null; - return this.execute(); - }, this.delay()); + this.timeoutHandler = setTimeout( + () => { + this.timeoutHandler = null; + return this.execute(); + }, + this.delay() + (extraDelay || 0), + ); } async shouldStart() {