Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/breezy-dolphins-sing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@rocket.chat/core-services": patch
"@rocket.chat/meteor": patch
---

Fix: Implement batching for presence status updates to prevent broadcast storms during mass user reconnections.
33 changes: 30 additions & 3 deletions apps/meteor/server/modules/listeners/listeners.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ export class ListenersModule {
if (!isMessageParserDisabled && message.msg) {
const customDomains = settings.get<string>('Message_CustomDomain_AutoLink')
? settings
.get<string>('Message_CustomDomain_AutoLink')
.split(',')
.map((domain) => domain.trim())
.get<string>('Message_CustomDomain_AutoLink')
.split(',')
.map((domain) => domain.trim())
: [];

message.md = parse(message.msg, {
Expand Down Expand Up @@ -181,6 +181,33 @@ export class ListenersModule {
}
});

service.onEvent('presence.status.batch', (batch) => {
batch.forEach(({ user }) => {
const { _id, username, name, status, statusText, roles } = user;
if (!status || !username) {
return;
}

notifications.notifyUserInThisInstance(_id, 'userData', {
type: 'updated',
id: _id,
diff: {
status,
...(statusText && { statusText }),
},
unset: {
...(!statusText && { statusText: 1 }),
},
});

notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]);

if (_id) {
notifications.sendPresence(_id, username, STATUS_MAP[status], statusText);
}
});
});

service.onEvent('user.updateCustomStatus', (userStatus) => {
notifications.notifyLoggedInThisInstance('updateCustomUserStatus', {
userStatusData: userStatus,
Expand Down
14 changes: 14 additions & 0 deletions apps/meteor/server/services/apps-engine/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ export class AppsEngineService extends ServiceClassInternal implements IAppsEngi
});
});

this.onEvent('presence.status.batch', async (batch): Promise<void> => {
for (const { user, previousStatus } of batch) {
try {
await Apps.self?.triggerEvent(AppEvents.IPostUserStatusChanged, {
user,
currentStatus: user.status,
previousStatus,
});
} catch (error) {
SystemLogger.error({ msg: 'Error triggering IPostUserStatusChanged event', error });
}
}
});

this.onEvent('apps.added', async (appId: string): Promise<void> => {
Apps.self?.getRocketChatLogger().debug({
msg: '"apps.added" event received for app',
Expand Down
26 changes: 18 additions & 8 deletions apps/meteor/server/services/omnichannel/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ServiceClassInternal } from '@rocket.chat/core-services';
import type { IOmnichannelService } from '@rocket.chat/core-services';
import type { AtLeast, IOmnichannelQueue, IOmnichannelRoom } from '@rocket.chat/core-typings';
import type { AtLeast, IOmnichannelQueue, IOmnichannelRoom, IUser, UserStatus } from '@rocket.chat/core-typings';
import { License } from '@rocket.chat/license';
import moment from 'moment';

Expand All @@ -21,17 +21,27 @@ export class OmnichannelService extends ServiceClassInternal implements IOmnicha

override async created() {
this.onEvent('presence.status', async ({ user }): Promise<void> => {
if (!user?._id) {
return;
}
const hasRole = user.roles.some((role) => ['livechat-manager', 'livechat-monitor', 'livechat-agent'].includes(role));
if (hasRole) {
// TODO change `Livechat.notifyAgentStatusChanged` to a service call
await notifyAgentStatusChanged(user._id, user.status);
await this.handlePresenceUpdate(user);
});

this.onEvent('presence.status.batch', async (batch): Promise<void> => {
for (const { user } of batch) {
await this.handlePresenceUpdate(user);
}
});
}

private async handlePresenceUpdate(user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'>): Promise<void> {
if (!user?._id) {
return;
}
const hasRole = user.roles.some((role) => ['livechat-manager', 'livechat-monitor', 'livechat-agent'].includes(role));
if (hasRole) {
// TODO change `Livechat.notifyAgentStatusChanged` to a service call
await notifyAgentStatusChanged(user._id, user.status as UserStatus);
}
}

override async started() {
settings.watchMultiple(['Livechat_enabled', 'Livechat_Routing_Method'], () => {
this.queueWorker.shouldStart();
Expand Down
112 changes: 112 additions & 0 deletions ee/packages/presence/src/Presence.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { Presence } from './Presence';

jest.mock('@rocket.chat/core-services', () => ({
ServiceClass: class {
api = {
broadcast: jest.fn(),
};
onEvent() { }
},
License: {
hasModule: jest.fn().mockResolvedValue(true),
},
}));

jest.mock('@rocket.chat/models', () => ({
Settings: { updateValueById: jest.fn() },
Users: {},
UsersSessions: {},
registerServiceModels: jest.fn(),
}));

jest.mock('./lib/PresenceReaper', () => ({
PresenceReaper: class {
start() { }
stop() { }
},
}));

describe('Presence Batching', () => {
let presence: Presence;

beforeEach(() => {
jest.useFakeTimers();
presence = new Presence();
(presence as any).broadcastEnabled = true;
(presence as any).hasLicense = true;
});

afterEach(() => {
jest.useRealTimers();
});

it('should buffer broadcast events', () => {
const user = { _id: 'u1', username: 'user1', status: 'online' } as any;
(presence as any).broadcast(user, 'offline');

// Check buffer
expect((presence as any).presenceBatch.size).toBe(1);

// Assert API not called yet
expect((presence as any).api.broadcast).not.toHaveBeenCalled();

// Advance timers
jest.advanceTimersByTime(500);

// Assert API called
expect((presence as any).api.broadcast).toHaveBeenCalledWith('presence.status.batch', expect.any(Array));
expect((presence as any).api.broadcast).toHaveBeenCalledTimes(1);

const batch = (presence as any).api.broadcast.mock.calls[0][1];
expect(batch).toHaveLength(1);
expect(batch[0].user._id).toBe('u1');
});

it('should batch multiple updates', () => {
const u1 = { _id: 'u1', username: 'user1', status: 'online' } as any;
const u2 = { _id: 'u2', username: 'user2', status: 'busy' } as any;

(presence as any).broadcast(u1, 'offline');
(presence as any).broadcast(u2, 'online');

expect((presence as any).presenceBatch.size).toBe(2);

jest.advanceTimersByTime(500);

expect((presence as any).api.broadcast).toHaveBeenCalledWith('presence.status.batch', expect.any(Array));
const batch = (presence as any).api.broadcast.mock.calls[0][1];
expect(batch).toHaveLength(2);
const ids = batch.map((item: any) => item.user._id).sort();
expect(ids).toEqual(['u1', 'u2']);
});

it('should debounce updates for same user', () => {
const u1_v1 = { _id: 'u1', username: 'user1', status: 'online' } as any;
const u1_v2 = { _id: 'u1', username: 'user1', status: 'offline' } as any;

(presence as any).broadcast(u1_v1, 'busy');
(presence as any).broadcast(u1_v2, 'online');

// Should update the existing entry in map
expect((presence as any).presenceBatch.size).toBe(1);
const stored = (presence as any).presenceBatch.get('u1');
expect(stored.user.status).toBe('offline');

jest.advanceTimersByTime(500);
expect((presence as any).api.broadcast).toHaveBeenCalledTimes(1);
const batch = (presence as any).api.broadcast.mock.calls[0][1];
expect(batch).toHaveLength(1);
expect(batch[0].user.status).toBe('offline');
});

it('should not broadcast if broadcastEnabled is false', () => {
(presence as any).broadcastEnabled = false;
const user = { _id: 'u1', username: 'user1', status: 'online' } as any;

(presence as any).broadcast(user, 'offline');

expect((presence as any).presenceBatch.size).toBe(0);
jest.advanceTimersByTime(500);
expect((presence as any).api.broadcast).not.toHaveBeenCalled();
});
});
64 changes: 51 additions & 13 deletions ee/packages/presence/src/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ export class Presence extends ServiceClass implements IPresence {

private lostConTimeout?: NodeJS.Timeout;

private presenceBatch = new Map<
string,
{
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'>;
previousStatus: UserStatus | undefined;
}
>();

private batchTimeout?: NodeJS.Timeout;

private connsPerInstance = new Map<string, number>();

private peakConnections = 0;
Expand Down Expand Up @@ -118,10 +128,13 @@ export class Presence extends ServiceClass implements IPresence {

override async stopped(): Promise<void> {
this.reaper.stop();
if (!this.lostConTimeout) {
return;
if (this.lostConTimeout) {
clearTimeout(this.lostConTimeout);
}
if (this.batchTimeout) {
clearTimeout(this.batchTimeout);
this.batchTimeout = undefined;
}
clearTimeout(this.lostConTimeout);
}

async toggleBroadcast(enabled: boolean): Promise<void> {
Expand Down Expand Up @@ -233,8 +246,8 @@ export class Presence extends ServiceClass implements IPresence {
async setStatus(uid: string, statusDefault: UserStatus, statusText?: string): Promise<boolean> {
const userSessions = (await UsersSessions.findOneById(uid)) || { connections: [] };

const user = await Users.findOneById<Pick<IUser, 'username' | 'roles' | 'status'>>(uid, {
projection: { username: 1, roles: 1, status: 1 },
const user = await Users.findOneById<Pick<IUser, 'username' | 'name' | 'roles' | 'status'>>(uid, {
projection: { username: 1, name: 1, roles: 1, status: 1 },
});

const { status, statusConnection } = processPresenceAndStatus(userSessions.connections, statusDefault);
Expand All @@ -247,7 +260,10 @@ export class Presence extends ServiceClass implements IPresence {
});

if (result.modifiedCount > 0) {
this.broadcast({ _id: uid, username: user?.username, status, statusText, roles: user?.roles || [] }, user?.status);
this.broadcast(
{ _id: uid, username: user?.username, status, statusText, name: user?.name, roles: user?.roles || [] },
user?.status,
);
}

return !!result.modifiedCount;
Expand All @@ -262,9 +278,10 @@ export class Presence extends ServiceClass implements IPresence {
}

async updateUserPresence(uid: string): Promise<void> {
const user = await Users.findOneById<Pick<IUser, 'username' | 'statusDefault' | 'statusText' | 'roles' | 'status'>>(uid, {
const user = await Users.findOneById<Pick<IUser, 'username' | 'name' | 'statusDefault' | 'statusText' | 'roles' | 'status'>>(uid, {
projection: {
username: 1,
name: 1,
statusDefault: 1,
statusText: 1,
roles: 1,
Expand All @@ -287,21 +304,42 @@ export class Presence extends ServiceClass implements IPresence {
});

if (result.modifiedCount > 0) {
this.broadcast({ _id: uid, username: user.username, status, statusText: user.statusText, roles: user.roles }, user.status);
this.broadcast(
{ _id: uid, username: user.username, status, statusText: user.statusText, name: user.name, roles: user.roles },
user.status,
);
}
}

private broadcast(
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'roles'>,
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'>,
previousStatus: UserStatus | undefined,
): void {
if (!this.broadcastEnabled) {
return;
}
this.api?.broadcast('presence.status', {
user,
previousStatus,
});

this.presenceBatch.set(user._id, { user, previousStatus });

if (this.batchTimeout) {
return;
}

this.batchTimeout = setTimeout(() => {
this.batchTimeout = undefined;
if (this.presenceBatch.size === 0) {
return;
}

const batch = Array.from(this.presenceBatch.values());
this.presenceBatch.clear();

if (!this.broadcastEnabled) {
return;
}

this.api?.broadcast('presence.status.batch', batch);
}, 500);
}

private async validateAvailability(): Promise<void> {
Expand Down
6 changes: 6 additions & 0 deletions packages/core-services/src/events/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ export type EventSignatures = {
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'>;
previousStatus: UserStatus | undefined;
}): void;
'presence.status.batch'(
data: {
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'>;
previousStatus: UserStatus | undefined;
}[],
): void;
'watch.messages'(data: { message: IMessage }): void;
'watch.roles'(
data:
Expand Down