diff --git a/.changeset/breezy-dolphins-sing.md b/.changeset/breezy-dolphins-sing.md new file mode 100644 index 0000000000000..ea09c16c0c772 --- /dev/null +++ b/.changeset/breezy-dolphins-sing.md @@ -0,0 +1,6 @@ +--- +"@rocket.chat/core-services": patch +"@rocket.chat/meteor": patch +--- + +Fix: Implement batching for presence status updates to prevent broadcast storms during mass user reconnections. diff --git a/apps/meteor/server/modules/listeners/listeners.module.ts b/apps/meteor/server/modules/listeners/listeners.module.ts index 8b04d04b869ba..4b6269973cf68 100644 --- a/apps/meteor/server/modules/listeners/listeners.module.ts +++ b/apps/meteor/server/modules/listeners/listeners.module.ts @@ -54,9 +54,9 @@ export class ListenersModule { if (!isMessageParserDisabled && message.msg) { const customDomains = settings.get('Message_CustomDomain_AutoLink') ? settings - .get('Message_CustomDomain_AutoLink') - .split(',') - .map((domain) => domain.trim()) + .get('Message_CustomDomain_AutoLink') + .split(',') + .map((domain) => domain.trim()) : []; message.md = parse(message.msg, { @@ -181,6 +181,33 @@ export class ListenersModule { } }); + service.onEvent('presence.status.batch', (batch) => { + batch.forEach(({ user }) => { + const { _id, username, name, status, statusText, roles } = user; + if (!status || !username) { + return; + } + + notifications.notifyUserInThisInstance(_id, 'userData', { + type: 'updated', + id: _id, + diff: { + status, + ...(statusText && { statusText }), + }, + unset: { + ...(!statusText && { statusText: 1 }), + }, + }); + + notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]); + + if (_id) { + notifications.sendPresence(_id, username, STATUS_MAP[status], statusText); + } + }); + }); + service.onEvent('user.updateCustomStatus', (userStatus) => { notifications.notifyLoggedInThisInstance('updateCustomUserStatus', { userStatusData: userStatus, diff --git a/apps/meteor/server/services/apps-engine/service.ts b/apps/meteor/server/services/apps-engine/service.ts index 0be60a579908d..e7d5f7642146c 100644 --- a/apps/meteor/server/services/apps-engine/service.ts +++ b/apps/meteor/server/services/apps-engine/service.ts @@ -30,6 +30,20 @@ export class AppsEngineService extends ServiceClassInternal implements IAppsEngi }); }); + this.onEvent('presence.status.batch', async (batch): Promise => { + for (const { user, previousStatus } of batch) { + try { + await Apps.self?.triggerEvent(AppEvents.IPostUserStatusChanged, { + user, + currentStatus: user.status, + previousStatus, + }); + } catch (error) { + SystemLogger.error({ msg: 'Error triggering IPostUserStatusChanged event', error }); + } + } + }); + this.onEvent('apps.added', async (appId: string): Promise => { Apps.self?.getRocketChatLogger().debug({ msg: '"apps.added" event received for app', diff --git a/apps/meteor/server/services/omnichannel/service.ts b/apps/meteor/server/services/omnichannel/service.ts index 067cd9474ce7d..5a5c39f132fea 100644 --- a/apps/meteor/server/services/omnichannel/service.ts +++ b/apps/meteor/server/services/omnichannel/service.ts @@ -1,6 +1,6 @@ import { ServiceClassInternal } from '@rocket.chat/core-services'; import type { IOmnichannelService } from '@rocket.chat/core-services'; -import type { AtLeast, IOmnichannelQueue, IOmnichannelRoom } from '@rocket.chat/core-typings'; +import type { AtLeast, IOmnichannelQueue, IOmnichannelRoom, IUser, UserStatus } from '@rocket.chat/core-typings'; import { License } from '@rocket.chat/license'; import moment from 'moment'; @@ -21,17 +21,27 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha override async created() { this.onEvent('presence.status', async ({ user }): Promise => { - if (!user?._id) { - return; - } - const hasRole = user.roles.some((role) => ['livechat-manager', 'livechat-monitor', 'livechat-agent'].includes(role)); - if (hasRole) { - // TODO change `Livechat.notifyAgentStatusChanged` to a service call - await notifyAgentStatusChanged(user._id, user.status); + await this.handlePresenceUpdate(user); + }); + + this.onEvent('presence.status.batch', async (batch): Promise => { + for (const { user } of batch) { + await this.handlePresenceUpdate(user); } }); } + private async handlePresenceUpdate(user: Pick): Promise { + if (!user?._id) { + return; + } + const hasRole = user.roles.some((role) => ['livechat-manager', 'livechat-monitor', 'livechat-agent'].includes(role)); + if (hasRole) { + // TODO change `Livechat.notifyAgentStatusChanged` to a service call + await notifyAgentStatusChanged(user._id, user.status as UserStatus); + } + } + override async started() { settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => { this.queueWorker.shouldStart(); diff --git a/ee/packages/presence/src/Presence.spec.ts b/ee/packages/presence/src/Presence.spec.ts new file mode 100644 index 0000000000000..0b97340ae6fa7 --- /dev/null +++ b/ee/packages/presence/src/Presence.spec.ts @@ -0,0 +1,112 @@ +import { Presence } from './Presence'; + +jest.mock('@rocket.chat/core-services', () => ({ + ServiceClass: class { + api = { + broadcast: jest.fn(), + }; + onEvent() { } + }, + License: { + hasModule: jest.fn().mockResolvedValue(true), + }, +})); + +jest.mock('@rocket.chat/models', () => ({ + Settings: { updateValueById: jest.fn() }, + Users: {}, + UsersSessions: {}, + registerServiceModels: jest.fn(), +})); + +jest.mock('./lib/PresenceReaper', () => ({ + PresenceReaper: class { + start() { } + stop() { } + }, +})); + +describe('Presence Batching', () => { + let presence: Presence; + + beforeEach(() => { + jest.useFakeTimers(); + presence = new Presence(); + (presence as any).broadcastEnabled = true; + (presence as any).hasLicense = true; + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('should buffer broadcast events', () => { + const user = { _id: 'u1', username: 'user1', status: 'online' } as any; + (presence as any).broadcast(user, 'offline'); + + // Check buffer + expect((presence as any).presenceBatch.size).toBe(1); + + // Assert API not called yet + expect((presence as any).api.broadcast).not.toHaveBeenCalled(); + + // Advance timers + jest.advanceTimersByTime(500); + + // Assert API called + expect((presence as any).api.broadcast).toHaveBeenCalledWith('presence.status.batch', expect.any(Array)); + expect((presence as any).api.broadcast).toHaveBeenCalledTimes(1); + + const batch = (presence as any).api.broadcast.mock.calls[0][1]; + expect(batch).toHaveLength(1); + expect(batch[0].user._id).toBe('u1'); + }); + + it('should batch multiple updates', () => { + const u1 = { _id: 'u1', username: 'user1', status: 'online' } as any; + const u2 = { _id: 'u2', username: 'user2', status: 'busy' } as any; + + (presence as any).broadcast(u1, 'offline'); + (presence as any).broadcast(u2, 'online'); + + expect((presence as any).presenceBatch.size).toBe(2); + + jest.advanceTimersByTime(500); + + expect((presence as any).api.broadcast).toHaveBeenCalledWith('presence.status.batch', expect.any(Array)); + const batch = (presence as any).api.broadcast.mock.calls[0][1]; + expect(batch).toHaveLength(2); + const ids = batch.map((item: any) => item.user._id).sort(); + expect(ids).toEqual(['u1', 'u2']); + }); + + it('should debounce updates for same user', () => { + const u1_v1 = { _id: 'u1', username: 'user1', status: 'online' } as any; + const u1_v2 = { _id: 'u1', username: 'user1', status: 'offline' } as any; + + (presence as any).broadcast(u1_v1, 'busy'); + (presence as any).broadcast(u1_v2, 'online'); + + // Should update the existing entry in map + expect((presence as any).presenceBatch.size).toBe(1); + const stored = (presence as any).presenceBatch.get('u1'); + expect(stored.user.status).toBe('offline'); + + jest.advanceTimersByTime(500); + expect((presence as any).api.broadcast).toHaveBeenCalledTimes(1); + const batch = (presence as any).api.broadcast.mock.calls[0][1]; + expect(batch).toHaveLength(1); + expect(batch[0].user.status).toBe('offline'); + }); + + it('should not broadcast if broadcastEnabled is false', () => { + (presence as any).broadcastEnabled = false; + const user = { _id: 'u1', username: 'user1', status: 'online' } as any; + + (presence as any).broadcast(user, 'offline'); + + expect((presence as any).presenceBatch.size).toBe(0); + jest.advanceTimersByTime(500); + expect((presence as any).api.broadcast).not.toHaveBeenCalled(); + }); +}); diff --git a/ee/packages/presence/src/Presence.ts b/ee/packages/presence/src/Presence.ts index 0fe78b9e47dcd..31ed3caa0d273 100755 --- a/ee/packages/presence/src/Presence.ts +++ b/ee/packages/presence/src/Presence.ts @@ -22,6 +22,16 @@ export class Presence extends ServiceClass implements IPresence { private lostConTimeout?: NodeJS.Timeout; + private presenceBatch = new Map< + string, + { + user: Pick; + previousStatus: UserStatus | undefined; + } + >(); + + private batchTimeout?: NodeJS.Timeout; + private connsPerInstance = new Map(); private peakConnections = 0; @@ -118,10 +128,13 @@ export class Presence extends ServiceClass implements IPresence { override async stopped(): Promise { this.reaper.stop(); - if (!this.lostConTimeout) { - return; + if (this.lostConTimeout) { + clearTimeout(this.lostConTimeout); + } + if (this.batchTimeout) { + clearTimeout(this.batchTimeout); + this.batchTimeout = undefined; } - clearTimeout(this.lostConTimeout); } async toggleBroadcast(enabled: boolean): Promise { @@ -233,8 +246,8 @@ export class Presence extends ServiceClass implements IPresence { async setStatus(uid: string, statusDefault: UserStatus, statusText?: string): Promise { const userSessions = (await UsersSessions.findOneById(uid)) || { connections: [] }; - const user = await Users.findOneById>(uid, { - projection: { username: 1, roles: 1, status: 1 }, + const user = await Users.findOneById>(uid, { + projection: { username: 1, name: 1, roles: 1, status: 1 }, }); const { status, statusConnection } = processPresenceAndStatus(userSessions.connections, statusDefault); @@ -247,7 +260,10 @@ export class Presence extends ServiceClass implements IPresence { }); if (result.modifiedCount > 0) { - this.broadcast({ _id: uid, username: user?.username, status, statusText, roles: user?.roles || [] }, user?.status); + this.broadcast( + { _id: uid, username: user?.username, status, statusText, name: user?.name, roles: user?.roles || [] }, + user?.status, + ); } return !!result.modifiedCount; @@ -262,9 +278,10 @@ export class Presence extends ServiceClass implements IPresence { } async updateUserPresence(uid: string): Promise { - const user = await Users.findOneById>(uid, { + const user = await Users.findOneById>(uid, { projection: { username: 1, + name: 1, statusDefault: 1, statusText: 1, roles: 1, @@ -287,21 +304,42 @@ export class Presence extends ServiceClass implements IPresence { }); if (result.modifiedCount > 0) { - this.broadcast({ _id: uid, username: user.username, status, statusText: user.statusText, roles: user.roles }, user.status); + this.broadcast( + { _id: uid, username: user.username, status, statusText: user.statusText, name: user.name, roles: user.roles }, + user.status, + ); } } private broadcast( - user: Pick, + user: Pick, previousStatus: UserStatus | undefined, ): void { if (!this.broadcastEnabled) { return; } - this.api?.broadcast('presence.status', { - user, - previousStatus, - }); + + this.presenceBatch.set(user._id, { user, previousStatus }); + + if (this.batchTimeout) { + return; + } + + this.batchTimeout = setTimeout(() => { + this.batchTimeout = undefined; + if (this.presenceBatch.size === 0) { + return; + } + + const batch = Array.from(this.presenceBatch.values()); + this.presenceBatch.clear(); + + if (!this.broadcastEnabled) { + return; + } + + this.api?.broadcast('presence.status.batch', batch); + }, 500); } private async validateAvailability(): Promise { diff --git a/packages/core-services/src/events/Events.ts b/packages/core-services/src/events/Events.ts index b80b6a7d90061..f1856f222bc09 100644 --- a/packages/core-services/src/events/Events.ts +++ b/packages/core-services/src/events/Events.ts @@ -162,6 +162,12 @@ export type EventSignatures = { user: Pick; previousStatus: UserStatus | undefined; }): void; + 'presence.status.batch'( + data: { + user: Pick; + previousStatus: UserStatus | undefined; + }[], + ): void; 'watch.messages'(data: { message: IMessage }): void; 'watch.roles'( data: