From 268a2d88524553e98401c639bb23b4eced96f1b8 Mon Sep 17 00:00:00 2001 From: Matheus Cardoso Date: Mon, 15 Dec 2025 16:24:19 -0300 Subject: [PATCH] fix(presence): update connections on heartbeat and remove them when stale (#37551) --- .changeset/spicy-nails-design.md | 8 + .../externals/meteor/ddp-common.d.ts | 64 ++++++++ .../definition/externals/meteor/meteor.d.ts | 9 +- apps/meteor/ee/server/startup/presence.ts | 10 ++ ee/apps/ddp-streamer/src/Client.ts | 19 +++ ee/packages/presence/src/Presence.ts | 54 +++++- .../presence/src/lib/PresenceReaper.spec.ts | 154 ++++++++++++++++++ .../presence/src/lib/PresenceReaper.ts | 132 +++++++++++++++ packages/core-services/src/types/IPresence.ts | 1 + 9 files changed, 447 insertions(+), 4 deletions(-) create mode 100644 .changeset/spicy-nails-design.md create mode 100644 ee/packages/presence/src/lib/PresenceReaper.spec.ts create mode 100644 ee/packages/presence/src/lib/PresenceReaper.ts diff --git a/.changeset/spicy-nails-design.md b/.changeset/spicy-nails-design.md new file mode 100644 index 0000000000000..23a5f82e7bb5e --- /dev/null +++ b/.changeset/spicy-nails-design.md @@ -0,0 +1,8 @@ +--- +"@rocket.chat/meteor": patch +"@rocket.chat/core-services": patch +"@rocket.chat/ddp-streamer": patch +"@rocket.chat/presence": patch +--- + +Ensures presence stays accurate by refreshing connections on heartbeats and removing stale sessions. \ No newline at end of file diff --git a/apps/meteor/definition/externals/meteor/ddp-common.d.ts b/apps/meteor/definition/externals/meteor/ddp-common.d.ts index 3a7f1538ab407..cbe96a9722968 100644 --- a/apps/meteor/definition/externals/meteor/ddp-common.d.ts +++ b/apps/meteor/definition/externals/meteor/ddp-common.d.ts @@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' { userId?: string; }); } + + /** + * Heartbeat options + */ + type HeartbeatOptions = { + /** + * interval to send pings, in milliseconds + */ + heartbeatInterval: number; + /** + * timeout to close the connection if a reply isn't received, in milliseconds. + */ + heartbeatTimeout: number; + /** + * function to call to send a ping on the connection. + */ + sendPing: () => void; + /** + * function to call to close the connection + */ + onTimeout: () => void; + }; + + class Heartbeat { + heartbeatInterval: number; + + heartbeatTimeout: number; + + _sendPing: () => void; + + _onTimeout: () => void; + + _seenPacket: boolean; + + _heartbeatIntervalHandle: ReturnType | null; + + _heartbeatTimeoutHandle: ReturnType | null; + + constructor(options: HeartbeatOptions); + + stop(): void; + + start(): void; + + _startHeartbeatIntervalTimer(): void; + + _startHeartbeatTimeoutTimer(): void; + + _clearHeartbeatIntervalTimer(): void; + + _clearHeartbeatTimeoutTimer(): void; + + /** + * The heartbeat interval timer is fired when we should send a ping. + */ + _heartbeatIntervalFired(): void; + + /** + * The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong. + */ + _heartbeatTimeoutFired(): void; + + messageReceived(): void; + } } } diff --git a/apps/meteor/definition/externals/meteor/meteor.d.ts b/apps/meteor/definition/externals/meteor/meteor.d.ts index cab110392c12c..e93445a891a36 100644 --- a/apps/meteor/definition/externals/meteor/meteor.d.ts +++ b/apps/meteor/definition/externals/meteor/meteor.d.ts @@ -1,6 +1,6 @@ import 'meteor/meteor'; import type { ServerMethods } from '@rocket.chat/ddp-client'; -import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer'; +import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common'; type StringifyBuffers = { [P in keyof T]: T[P] extends Buffer ? string : T[P]; @@ -39,7 +39,12 @@ declare module 'meteor/meteor' { isDesktop: () => boolean; } - const server: any; + const server: { + sessions: Map; + publish_handlers: { + meteor_autoupdate_clientVersions(): void; + }; + }; const runAsUser: (userId: string, scope: () => T) => T; diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index e0756e4b4c59d..6bf587b500ba6 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -40,6 +40,16 @@ Meteor.startup(() => { return; } + const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat); + session.heartbeat.messageReceived = function messageReceived() { + if (this._seenPacket === false) { + void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => { + console.error('Error updating connection presence on heartbeat:', err); + }); + } + return _messageReceived(); + }; + void (async function () { await Presence.newConnection(login.user._id, login.connection.id, nodeId); updateConns(); diff --git a/ee/apps/ddp-streamer/src/Client.ts b/ee/apps/ddp-streamer/src/Client.ts index fb75af64b3da0..0ce8762948726 100644 --- a/ee/apps/ddp-streamer/src/Client.ts +++ b/ee/apps/ddp-streamer/src/Client.ts @@ -1,6 +1,7 @@ import { EventEmitter } from 'events'; import type { IncomingMessage } from 'http'; +import { Presence } from '@rocket.chat/core-services'; import type { ISocketConnection } from '@rocket.chat/core-typings'; import { v1 as uuidv1 } from 'uuid'; import type WebSocket from 'ws'; @@ -73,6 +74,8 @@ export class Client extends EventEmitter { public userToken?: string; + private _seenPacket = true; + constructor( public ws: WebSocket, public meteorClient = false, @@ -179,6 +182,18 @@ export class Client extends EventEmitter { this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT); }; + private messageReceived = (): void => { + if (this._seenPacket || !this.userId) { + this._seenPacket = true; + return; + } + + this._seenPacket = true; + void Presence.updateConnection(this.userId, this.connection.id).catch((err) => { + console.error('Error updating connection presence after heartbeat:', err); + }); + }; + ping(id?: string): void { this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) })); } @@ -188,6 +203,9 @@ export class Client extends EventEmitter { } handleIdle = (): void => { + if (this.userId) { + this._seenPacket = false; + } this.ping(); this.timeout = setTimeout(this.closeTimeout, TIMEOUT); }; @@ -200,6 +218,7 @@ export class Client extends EventEmitter { handler = async (payload: WebSocket.Data, isBinary: boolean): Promise => { try { const packet = server.parse(payload, isBinary); + this.messageReceived(); this.emit('message', packet); if (this.wait) { return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet)))); diff --git a/ee/packages/presence/src/Presence.ts b/ee/packages/presence/src/Presence.ts index 5b537857698c9..0fe78b9e47dcd 100755 --- a/ee/packages/presence/src/Presence.ts +++ b/ee/packages/presence/src/Presence.ts @@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings'; import { UserStatus } from '@rocket.chat/core-typings'; import { Settings, Users, UsersSessions } from '@rocket.chat/models'; +import { PresenceReaper } from './lib/PresenceReaper'; import { processPresenceAndStatus } from './lib/processConnectionStatus'; const MAX_CONNECTIONS = 200; @@ -25,9 +26,17 @@ export class Presence extends ServiceClass implements IPresence { private peakConnections = 0; + private reaper: PresenceReaper; + constructor() { super(); + this.reaper = new PresenceReaper({ + batchSize: 500, + staleThresholdMs: 5 * 60 * 1000, // 5 minutes + onUpdate: (userIds) => this.handleReaperUpdates(userIds), + }); + this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise => { if (clientAction === 'removed') { this.connsPerInstance.delete(id); @@ -72,7 +81,8 @@ export class Presence extends ServiceClass implements IPresence { return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); } - async started(): Promise { + override async started(): Promise { + this.reaper.start(); this.lostConTimeout = setTimeout(async () => { const affectedUsers = await this.removeLostConnections(); return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); @@ -89,7 +99,25 @@ export class Presence extends ServiceClass implements IPresence { } } - async stopped(): Promise { + private async handleReaperUpdates(userIds: string[]): Promise { + const results = await Promise.allSettled(userIds.map((uid) => this.updateUserPresence(uid))); + const fulfilled = results.filter((result) => result.status === 'fulfilled'); + const rejected = results.filter((result) => result.status === 'rejected'); + + if (fulfilled.length > 0) { + console.debug(`[PresenceReaper] Successfully updated presence for ${fulfilled.length} users.`); + } + + if (rejected.length > 0) { + console.error( + `[PresenceReaper] Failed to update presence for ${rejected.length} users:`, + rejected.map(({ reason }) => reason), + ); + } + } + + override async stopped(): Promise { + this.reaper.stop(); if (!this.lostConTimeout) { return; } @@ -137,6 +165,28 @@ export class Presence extends ServiceClass implements IPresence { }; } + async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> { + const query = { + '_id': uid, + 'connections.id': connectionId, + }; + + const update = { + $set: { + 'connections.$._updatedAt': new Date(), + }, + }; + + const result = await UsersSessions.updateOne(query, update); + if (result.modifiedCount === 0) { + return; + } + + await this.updateUserPresence(uid); + + return { uid, connectionId }; + } + async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> { if (!uid || !session) { return; diff --git a/ee/packages/presence/src/lib/PresenceReaper.spec.ts b/ee/packages/presence/src/lib/PresenceReaper.spec.ts new file mode 100644 index 0000000000000..f8e6a83c498b2 --- /dev/null +++ b/ee/packages/presence/src/lib/PresenceReaper.spec.ts @@ -0,0 +1,154 @@ +import { UserStatus, type IUserSession, type IUserSessionConnection } from '@rocket.chat/core-typings'; +import { registerModel } from '@rocket.chat/models'; +import type { FindCursor, WithId } from 'mongodb'; + +import { PresenceReaper } from './PresenceReaper'; + +let sessions = 0; +const createSession = (overrides: Partial = {}): IUserSession => ({ + _id: `user-${sessions++}`, + connections: [], + ...overrides, +}); + +let connections = 0; +const createConnection = (overrides: Partial = {}): IUserSessionConnection => ({ + id: `conn-${connections++}`, + instanceId: `instanceId`, + status: UserStatus.ONLINE, + _createdAt: new Date(), + _updatedAt: new Date(), + ...overrides, +}); + +const createDates = () => { + const now = new Date(); + const stale = new Date(now.getTime() - 10 * 60 * 1000); // 10 mins ago + const active = new Date(now.getTime() - 1 * 60 * 1000); // 1 min ago + const cutoff = new Date(now.getTime() - 5 * 60 * 1000); // 5 mins ago + + return { now, stale, active, cutoff }; +}; + +const createCursor = (documents: WithId[]): FindCursor> => { + let index = 0; + return { + async *[Symbol.asyncIterator]() { + while (index < documents.length) { + yield documents[index++]; + } + }, + } as FindCursor>; +}; + +describe('PresenceReaper', () => { + let reaper: PresenceReaper; + const bulkWriteMock = jest.fn(); + const findMock = jest.fn(); + const onUpdateMock = jest.fn(); + registerModel('IUsersSessionsModel', { + find: findMock, + col: { + bulkWrite: bulkWriteMock, + }, + } as any); + + beforeEach(() => { + bulkWriteMock.mockClear(); + findMock.mockClear(); + onUpdateMock.mockClear(); + + reaper = new PresenceReaper({ + onUpdate: onUpdateMock, + staleThresholdMs: 5 * 60 * 1000, // 5 minutes + batchSize: 2, // small batch size for testing + }); + }); + + it('should not call onUpdate when there no connections', async () => { + findMock.mockReturnValue(createCursor([])); + + await reaper.run(); + + expect(onUpdateMock).not.toHaveBeenCalled(); + }); + + it('should process users with stale connections correctly', async () => { + const { stale } = createDates(); + + findMock.mockReturnValue( + createCursor([ + createSession({ + _id: 'user-789', + connections: [createConnection({ _updatedAt: stale })], + }), + ]), + ); + + await reaper.run(); + + expect(onUpdateMock).toHaveBeenCalledTimes(1); + expect(onUpdateMock).toHaveBeenCalledWith(['user-789']); + }); + + it('should process multiple users and batch updates correctly', async () => { + const { stale } = createDates(); + + findMock.mockReturnValue( + createCursor([ + createSession({ + _id: 'user-1', + connections: [createConnection({ _updatedAt: stale })], + }), + createSession({ + _id: 'user-2', + connections: [createConnection({ _updatedAt: stale })], + }), + createSession({ + _id: 'user-3', + connections: [createConnection({ _updatedAt: stale })], + }), + ]), + ); + + // Execute Run + await reaper.run(); + + // Verify 'users' Update called twice due to batch size of 2 + expect(onUpdateMock).toHaveBeenCalledTimes(2); + expect(onUpdateMock).toHaveBeenNthCalledWith(1, ['user-1', 'user-2']); + expect(onUpdateMock).toHaveBeenNthCalledWith(2, ['user-3']); + }); + + it('should process users with mixed connection states correctly', async () => { + const { stale, active } = createDates(); + + findMock.mockReturnValue( + createCursor([ + createSession({ + _id: 'no-connections', + connections: [], + }), + createSession({ + _id: 'all-active', + connections: [createConnection({ _updatedAt: active })], + }), + createSession({ + _id: 'all-stale', + connections: [createConnection({ _updatedAt: stale })], + }), + createSession({ + _id: 'mixed', + connections: [createConnection({ _updatedAt: stale }), createConnection({ _updatedAt: active })], + }), + ]), + ); + + // Execute Run + await reaper.run(); + + // Verify 'users' Update called for both users + expect(onUpdateMock).toHaveBeenCalledTimes(1); + expect(onUpdateMock).toHaveBeenNthCalledWith(1, ['all-stale', 'mixed']); + }); +}); diff --git a/ee/packages/presence/src/lib/PresenceReaper.ts b/ee/packages/presence/src/lib/PresenceReaper.ts new file mode 100644 index 0000000000000..969465229cad1 --- /dev/null +++ b/ee/packages/presence/src/lib/PresenceReaper.ts @@ -0,0 +1,132 @@ +import { setInterval } from 'node:timers'; + +import type { IUserSession } from '@rocket.chat/core-typings'; +import { UsersSessions } from '@rocket.chat/models'; +import type { AnyBulkWriteOperation } from 'mongodb'; + +type ReaperPlan = { + userId: string; + removeIds: NonEmptyArray; + cutoffDate: Date; +}; + +type NonEmptyArray = Omit<[T, ...T[]], 'map'> & { + map(callbackfn: (value: T, index: number, array: T[]) => U): NonEmptyArray; +}; + +const isNonEmptyArray = (arr: T[]): arr is NonEmptyArray => arr.length > 0; + +type ReaperCallback = (userIds: NonEmptyArray) => Promise; + +type ReaperOptions = { + onUpdate: ReaperCallback; + staleThresholdMs: number; + batchSize: number; +}; + +export class PresenceReaper { + private staleThresholdMs: number; + + private batchSize: number; + + private running: boolean; + + private onUpdate: ReaperCallback; + + private intervalId?: NodeJS.Timeout; + + constructor(options: ReaperOptions) { + this.onUpdate = options.onUpdate; + this.staleThresholdMs = options.staleThresholdMs; + this.batchSize = options.batchSize; + this.running = false; + } + + public start() { + if (this.running) return; + this.running = true; + + // Run every 1 minute + this.intervalId = setInterval(() => { + this.run().catch((err) => console.error('[PresenceReaper] Error:', err)); + }, 60 * 1000); + } + + public stop() { + if (!this.running) return; + this.running = false; + + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = undefined; + } + } + + public async run(): Promise { + const cutoffDate = new Date(Date.now() - this.staleThresholdMs); + + // 1. Find users with potentially stale connections + const cursor = UsersSessions.find( + { 'connections._updatedAt': { $lte: cutoffDate } }, + { + projection: { _id: 1, connections: 1 }, + }, + ); + + const userChangeSet = new Map(); + + for await (const sessionDoc of cursor) { + this.processDocument(sessionDoc, cutoffDate, userChangeSet); + + if (userChangeSet.size >= this.batchSize) { + await this.flushBatch(userChangeSet); + userChangeSet.clear(); + } + } + + if (userChangeSet.size > 0) { + await this.flushBatch(userChangeSet); + } + } + + private processDocument(sessionDoc: IUserSession, cutoffDate: Date, changeMap: Map): void { + const userId = sessionDoc._id; + const allConnections = sessionDoc.connections || []; + + // Filter connections based on the cutoff + const staleConnections = allConnections.filter((c) => c._updatedAt <= cutoffDate); + + if (isNonEmptyArray(staleConnections)) { + changeMap.set(userId, { + userId, + removeIds: staleConnections.map((c) => c.id), + cutoffDate, // Keep reference for race-condition check + }); + } + } + + private async flushBatch(changeMap: Map): Promise { + const operations = []; + + for (const plan of changeMap.values()) { + operations.push({ + updateOne: { + filter: { _id: plan.userId }, + update: { + $pull: { + connections: { + id: { $in: plan.removeIds }, + _updatedAt: { $lte: plan.cutoffDate }, + }, + }, + }, + }, + } satisfies AnyBulkWriteOperation); + } + + if (isNonEmptyArray(operations)) { + await UsersSessions.col.bulkWrite(operations); + await this.onUpdate(operations.map((op) => op.updateOne.filter._id)); + } + } +} diff --git a/packages/core-services/src/types/IPresence.ts b/packages/core-services/src/types/IPresence.ts index 5f7c57d679955..268fed8cd1e66 100644 --- a/packages/core-services/src/types/IPresence.ts +++ b/packages/core-services/src/types/IPresence.ts @@ -13,6 +13,7 @@ export interface IPresence extends IServiceClass { session: string | undefined, nodeId: string, ): Promise<{ uid: string; session: string } | undefined>; + updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined>; removeLostConnections(nodeID: string): Promise; setStatus(uid: string, status: UserStatus, statusText?: string): Promise; setConnectionStatus(uid: string, status: UserStatus, session: string): Promise;