diff --git a/.changeset/bump-patch-1766148726545.md b/.changeset/bump-patch-1766148726545.md new file mode 100644 index 0000000000000..e1eaa7980afb1 --- /dev/null +++ b/.changeset/bump-patch-1766148726545.md @@ -0,0 +1,5 @@ +--- +'@rocket.chat/meteor': patch +--- + +Bump @rocket.chat/meteor version. diff --git a/.changeset/fast-ligers-unite.md b/.changeset/fast-ligers-unite.md new file mode 100644 index 0000000000000..eacb88108a0f7 --- /dev/null +++ b/.changeset/fast-ligers-unite.md @@ -0,0 +1,5 @@ +--- +'@rocket.chat/meteor': patch +--- + +Security Hotfix (https://docs.rocket.chat/docs/security-fixes-and-updates) diff --git a/.changeset/spicy-nails-design.md b/.changeset/spicy-nails-design.md new file mode 100644 index 0000000000000..23a5f82e7bb5e --- /dev/null +++ b/.changeset/spicy-nails-design.md @@ -0,0 +1,8 @@ +--- +"@rocket.chat/meteor": patch +"@rocket.chat/core-services": patch +"@rocket.chat/ddp-streamer": patch +"@rocket.chat/presence": patch +--- + +Ensures presence stays accurate by refreshing connections on heartbeats and removing stale sessions. \ No newline at end of file diff --git a/.github/actions/build-docker/action.yml b/.github/actions/build-docker/action.yml index b00709834c389..5100528ac7ae0 100644 --- a/.github/actions/build-docker/action.yml +++ b/.github/actions/build-docker/action.yml @@ -75,7 +75,7 @@ runs: set -o xtrace export DENO_VERSION="${{ inputs.deno-version }}" - # Removes unnecessary swc cores to reduce image sized + # Removes unnecessary swc cores and sharp binaries to reduce image size swc_arch='x64' if [[ "${{ inputs.service }}" == 'rocketchat' ]]; then if [[ "${{ inputs.arch }}" == 'arm64' ]]; then @@ -83,6 +83,14 @@ runs: fi find /tmp/build/bundle/programs/server/npm/node_modules/meteor/babel-compiler/node_modules/@meteorjs/swc-core/.swc/node_modules/@swc -type d -name 'core-*' -not -name "*linux-${swc_arch}-gnu*" -exec rm -rf {} + + + find /tmp/build/bundle/programs/server/npm/node_modules/@img -type d -name 'sharp-*' -not -name "*-linuxmusl-${swc_arch}" -exec rm -rf {} + + + find /tmp/build/bundle/programs/server/npm/node_modules/@napi-rs -type d -name 'pinyin-linux-*' -not -name "*-linux-${swc_arch}-*" -exec rm -rf {} + + + find /tmp/build/bundle/programs/server/npm/node_modules/@esbuild -type d -name 'linux-*' -not -name "*-${swc_arch}" -exec rm -rf {} + + + find /tmp/build/bundle/programs/server/npm/node_modules/@rocket.chat/apps-engine/node_modules/@esbuild -type d -name 'linux-*' -not -name "*-${swc_arch}" -exec rm -rf {} + fi if [[ "${{ inputs.publish-image }}" == 'true' ]]; then diff --git a/.github/actions/setup-node/action.yml b/.github/actions/setup-node/action.yml index 6865b342b2dc9..faa3f6d7ed17d 100644 --- a/.github/actions/setup-node/action.yml +++ b/.github/actions/setup-node/action.yml @@ -51,8 +51,8 @@ runs: apps/meteor/ee/server/services/node_modules packages/apps-engine/node_modules packages/apps-engine/.deno-cache - key: node-modules-${{ inputs.type }}-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('package.json') }}-${{ hashFiles('yarn.lock') }}-deno-v${{ inputs.deno-version }}-${{ hashFiles('packages/apps-engine/deno-runtime/deno.lock') }}-v3 - # key: node-modules-${{ inputs.type }}-${{ runner.os }}-${{ runner.arch }}-${{ hashFiles('package.json') }}-${{ hashFiles('yarn.lock') }}-deno-v${{ inputs.deno-version }}-${{ hashFiles('packages/apps-engine/deno-runtime/deno.lock') }}-v${{ github.run_id }} + key: node-modules-${{ inputs.type }}-${{ runner.arch }}-${{ runner.os }}-${{ hashFiles('package.json') }}-${{ hashFiles('yarn.lock') }}-${{ hashFiles('.yarnrc.yml') }}-deno-v${{ inputs.deno-version }}-${{ hashFiles('packages/apps-engine/deno-runtime/deno.lock') }}-v5 + # key: node-modules-${{ inputs.type }}-${{ runner.arch }}-${{ runner.os }}-${{ hashFiles('package.json') }}-${{ hashFiles('yarn.lock') }}-${{ hashFiles('.yarnrc.yml') }}-deno-v${{ inputs.deno-version }}-${{ hashFiles('packages/apps-engine/deno-runtime/deno.lock') }}-v${{ github.run_id }} # # Could use this command to list all paths to save: # find . -name 'node_modules' -prune | grep -v "/\.meteor/" | grep -v "/meteor/packages/" @@ -75,6 +75,12 @@ runs: run: | echo "//registry.npmjs.org/:_authToken=${{ inputs.NPM_TOKEN }}" > ~/.npmrc + - name: yarn config + if: inputs.install + shell: bash + run: | + yarn config set supportedArchitectures --json '{"os": ["linux"], "cpu": ["arm64", "x64"], "libc": ["glibc", "musl"]}' + - name: yarn install if: inputs.install && inputs.type == 'development' shell: bash diff --git a/apps/meteor/.docker/Dockerfile.alpine b/apps/meteor/.docker/Dockerfile.alpine index dbdeb5abc6391..3761e501dcb00 100644 --- a/apps/meteor/.docker/Dockerfile.alpine +++ b/apps/meteor/.docker/Dockerfile.alpine @@ -9,14 +9,7 @@ COPY . /app ENV NODE_ENV=production RUN cd /app/bundle/programs/server \ - && npm install --omit=dev \ - # Re install sharp dependencies to ensure proper binary for architecture - # We only need the @img folder from sharp dependencies - && cd /app/bundle/programs/server/npm/node_modules/sharp \ - && npm install --omit=dev \ - && rm -rf ../@img \ - && mv node_modules/@img ../@img \ - && rm -rf node_modules + && npm install --omit=dev FROM node:22.16.0-alpine3.20 diff --git a/apps/meteor/app/api/server/lib/isValidQuery.ts b/apps/meteor/app/api/server/lib/isValidQuery.ts index b4dd70c2ae0d6..97620cdb1b9ef 100644 --- a/apps/meteor/app/api/server/lib/isValidQuery.ts +++ b/apps/meteor/app/api/server/lib/isValidQuery.ts @@ -23,16 +23,22 @@ export const isValidQuery: { const verifyQuery = (query: Query, allowedAttributes: string[], allowedOperations: string[], parent = ''): boolean => { return Object.entries(removeDangerousProps(query)).every(([key, value]) => { const path = parent ? `${parent}.${key}` : key; - if (parent === '' && path.startsWith('$')) { - if (!allowedOperations.includes(path)) { - isValidQuery.errors.push(`Invalid operation: ${path}`); + if (key.startsWith('$')) { + if (!allowedOperations.includes(key)) { + isValidQuery.errors.push(`Invalid operation: ${key}`); return false; } - if (!Array.isArray(value)) { - isValidQuery.errors.push(`Invalid parameter for operation: ${path} : ${value}`); - return false; + + if (Array.isArray(value)) { + return value.every((v) => verifyQuery(v, allowedAttributes, allowedOperations)); } - return value.every((v) => verifyQuery(v, allowedAttributes, allowedOperations)); + + if (value instanceof Object) { + return verifyQuery(value, allowedAttributes, allowedOperations, path); + } + + // handles primitive values (strings, numbers, booleans, etc.) + return true; } if ( diff --git a/apps/meteor/app/api/server/v1/users.ts b/apps/meteor/app/api/server/v1/users.ts index d4f0012cd5784..f6058085deeec 100644 --- a/apps/meteor/app/api/server/v1/users.ts +++ b/apps/meteor/app/api/server/v1/users.ts @@ -50,7 +50,7 @@ import { } from '../../../lib/server/functions/checkUsernameAvailability'; import { deleteUser } from '../../../lib/server/functions/deleteUser'; import { getAvatarSuggestionForUser } from '../../../lib/server/functions/getAvatarSuggestionForUser'; -import { getFullUserDataByIdOrUsernameOrImportId } from '../../../lib/server/functions/getFullUserData'; +import { getFullUserDataByIdOrUsernameOrImportId, defaultFields, fullFields } from '../../../lib/server/functions/getFullUserData'; import { generateUsernameSuggestion } from '../../../lib/server/functions/getUsernameSuggestion'; import { saveCustomFields } from '../../../lib/server/functions/saveCustomFields'; import { saveCustomFieldsWithoutValidation } from '../../../lib/server/functions/saveCustomFieldsWithoutValidation'; @@ -503,13 +503,18 @@ API.v1.addRoute( const { offset, count } = await getPaginationItems(this.queryParams); const { sort, fields, query } = await this.parseJsonQuery(); - const nonEmptyQuery = getNonEmptyQuery(query, await hasPermissionAsync(this.userId, 'view-full-other-user-info')); const nonEmptyFields = getNonEmptyFields(fields); const inclusiveFields = getInclusiveFields(nonEmptyFields); const inclusiveFieldsKeys = Object.keys(inclusiveFields); + const hasUserQuery = query && Object.keys(query).length > 0; + + const nonEmptyQuery = getNonEmptyQuery(query, await hasPermissionAsync(this.userId, 'view-full-other-user-info')); + + // if user provided a query, validate it with their allowed operators + // otherwise we use the default query (with $regex and $options) if ( !isValidQuery( nonEmptyQuery, @@ -521,7 +526,7 @@ API.v1.addRoute( inclusiveFieldsKeys.includes('type') && 'type.*', inclusiveFieldsKeys.includes('customFields') && 'customFields.*', ].filter(Boolean) as string[], - this.queryOperations, + hasUserQuery ? this.queryOperations : [...this.queryOperations, '$regex', '$options'], ) ) { throw new Meteor.Error('error-invalid-query', isValidQuery.errors.join('\n')); @@ -1144,8 +1149,13 @@ API.v1.addRoute( const selector: { exceptions: Required['username'][]; conditions: Filter; term: string } = JSON.parse(selectorRaw); try { - if (selector?.conditions && !isValidQuery(selector.conditions, ['*'], ['$or', '$and'])) { - throw new Error('error-invalid-query'); + if (selector?.conditions) { + const canViewFullInfo = await hasPermissionAsync(this.userId, 'view-full-other-user-info'); + const allowedFields = canViewFullInfo ? [...Object.keys(defaultFields), ...Object.keys(fullFields)] : Object.keys(defaultFields); + + if (!isValidQuery(selector.conditions, allowedFields, ['$and', '$ne', '$exists'])) { + throw new Error('error-invalid-query'); + } } } catch (e) { return API.v1.failure(e); diff --git a/apps/meteor/app/lib/server/functions/getFullUserData.ts b/apps/meteor/app/lib/server/functions/getFullUserData.ts index f66f8ecb49c66..c55a5464a2a8b 100644 --- a/apps/meteor/app/lib/server/functions/getFullUserData.ts +++ b/apps/meteor/app/lib/server/functions/getFullUserData.ts @@ -7,7 +7,7 @@ import { settings } from '../../../settings/server'; const logger = new Logger('getFullUserData'); -const defaultFields = { +export const defaultFields = { name: 1, username: 1, nickname: 1, @@ -24,7 +24,7 @@ const defaultFields = { statusLivechat: 1, } as const; -const fullFields = { +export const fullFields = { emails: 1, phone: 1, statusConnection: 1, diff --git a/apps/meteor/definition/externals/meteor/ddp-common.d.ts b/apps/meteor/definition/externals/meteor/ddp-common.d.ts index 3a7f1538ab407..cbe96a9722968 100644 --- a/apps/meteor/definition/externals/meteor/ddp-common.d.ts +++ b/apps/meteor/definition/externals/meteor/ddp-common.d.ts @@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' { userId?: string; }); } + + /** + * Heartbeat options + */ + type HeartbeatOptions = { + /** + * interval to send pings, in milliseconds + */ + heartbeatInterval: number; + /** + * timeout to close the connection if a reply isn't received, in milliseconds. + */ + heartbeatTimeout: number; + /** + * function to call to send a ping on the connection. + */ + sendPing: () => void; + /** + * function to call to close the connection + */ + onTimeout: () => void; + }; + + class Heartbeat { + heartbeatInterval: number; + + heartbeatTimeout: number; + + _sendPing: () => void; + + _onTimeout: () => void; + + _seenPacket: boolean; + + _heartbeatIntervalHandle: ReturnType | null; + + _heartbeatTimeoutHandle: ReturnType | null; + + constructor(options: HeartbeatOptions); + + stop(): void; + + start(): void; + + _startHeartbeatIntervalTimer(): void; + + _startHeartbeatTimeoutTimer(): void; + + _clearHeartbeatIntervalTimer(): void; + + _clearHeartbeatTimeoutTimer(): void; + + /** + * The heartbeat interval timer is fired when we should send a ping. + */ + _heartbeatIntervalFired(): void; + + /** + * The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong. + */ + _heartbeatTimeoutFired(): void; + + messageReceived(): void; + } } } diff --git a/apps/meteor/definition/externals/meteor/meteor.d.ts b/apps/meteor/definition/externals/meteor/meteor.d.ts index cab110392c12c..e93445a891a36 100644 --- a/apps/meteor/definition/externals/meteor/meteor.d.ts +++ b/apps/meteor/definition/externals/meteor/meteor.d.ts @@ -1,6 +1,6 @@ import 'meteor/meteor'; import type { ServerMethods } from '@rocket.chat/ddp-client'; -import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer'; +import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common'; type StringifyBuffers = { [P in keyof T]: T[P] extends Buffer ? string : T[P]; @@ -39,7 +39,12 @@ declare module 'meteor/meteor' { isDesktop: () => boolean; } - const server: any; + const server: { + sessions: Map; + publish_handlers: { + meteor_autoupdate_clientVersions(): void; + }; + }; const runAsUser: (userId: string, scope: () => T) => T; diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index e0756e4b4c59d..6bf587b500ba6 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -40,6 +40,16 @@ Meteor.startup(() => { return; } + const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat); + session.heartbeat.messageReceived = function messageReceived() { + if (this._seenPacket === false) { + void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => { + console.error('Error updating connection presence on heartbeat:', err); + }); + } + return _messageReceived(); + }; + void (async function () { await Presence.newConnection(login.user._id, login.connection.id, nodeId); updateConns(); diff --git a/apps/meteor/tests/unit/app/api/server/v1/lib/isValidQuery.spec.ts b/apps/meteor/tests/unit/app/api/server/v1/lib/isValidQuery.spec.ts index e67ad244066f2..5f421578ed480 100644 --- a/apps/meteor/tests/unit/app/api/server/v1/lib/isValidQuery.spec.ts +++ b/apps/meteor/tests/unit/app/api/server/v1/lib/isValidQuery.spec.ts @@ -174,7 +174,7 @@ describe('isValidQuery', () => { }, }, }; - expect(isValidQuery(query, props, ['$or'])).to.be.true; + expect(isValidQuery(query, props, ['$or', '$regex'])).to.be.true; expect(isValidQuery.errors.length).to.be.equals(0); }); @@ -212,5 +212,15 @@ describe('isValidQuery', () => { ), ).to.be.false; }); + + it('should return false if the query contains nested conditions with disallowed operators', () => { + const props = ['username']; + const allowedOps = ['$and', '$ne']; + const query = { + $and: [{ username: { $exists: true } }, { username: { $ne: '1000' } }], + }; + expect(isValidQuery(query, props, allowedOps)).to.be.false; + expect(isValidQuery.errors.length).to.be.equals(1); + }); }); }); diff --git a/ee/apps/ddp-streamer/src/Client.ts b/ee/apps/ddp-streamer/src/Client.ts index fb75af64b3da0..4807157212efd 100644 --- a/ee/apps/ddp-streamer/src/Client.ts +++ b/ee/apps/ddp-streamer/src/Client.ts @@ -1,7 +1,9 @@ import { EventEmitter } from 'events'; import type { IncomingMessage } from 'http'; +import { Presence } from '@rocket.chat/core-services'; import type { ISocketConnection } from '@rocket.chat/core-typings'; +import { throttle } from 'underscore'; import { v1 as uuidv1 } from 'uuid'; import type WebSocket from 'ws'; @@ -73,6 +75,18 @@ export class Client extends EventEmitter { public userToken?: string; + private updatePresence = throttle( + () => { + if (this.userId) { + void Presence.updateConnection(this.userId, this.connection.id).catch((err) => { + console.error('Error updating connection presence:', err); + }); + } + }, + TIMEOUT, + { leading: true, trailing: false }, + ); + constructor( public ws: WebSocket, public meteorClient = false, @@ -200,6 +214,7 @@ export class Client extends EventEmitter { handler = async (payload: WebSocket.Data, isBinary: boolean): Promise => { try { const packet = server.parse(payload, isBinary); + this.updatePresence(); this.emit('message', packet); if (this.wait) { return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet)))); diff --git a/ee/packages/presence/src/Presence.ts b/ee/packages/presence/src/Presence.ts index 5b537857698c9..0fe78b9e47dcd 100755 --- a/ee/packages/presence/src/Presence.ts +++ b/ee/packages/presence/src/Presence.ts @@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings'; import { UserStatus } from '@rocket.chat/core-typings'; import { Settings, Users, UsersSessions } from '@rocket.chat/models'; +import { PresenceReaper } from './lib/PresenceReaper'; import { processPresenceAndStatus } from './lib/processConnectionStatus'; const MAX_CONNECTIONS = 200; @@ -25,9 +26,17 @@ export class Presence extends ServiceClass implements IPresence { private peakConnections = 0; + private reaper: PresenceReaper; + constructor() { super(); + this.reaper = new PresenceReaper({ + batchSize: 500, + staleThresholdMs: 5 * 60 * 1000, // 5 minutes + onUpdate: (userIds) => this.handleReaperUpdates(userIds), + }); + this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise => { if (clientAction === 'removed') { this.connsPerInstance.delete(id); @@ -72,7 +81,8 @@ export class Presence extends ServiceClass implements IPresence { return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); } - async started(): Promise { + override async started(): Promise { + this.reaper.start(); this.lostConTimeout = setTimeout(async () => { const affectedUsers = await this.removeLostConnections(); return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); @@ -89,7 +99,25 @@ export class Presence extends ServiceClass implements IPresence { } } - async stopped(): Promise { + private async handleReaperUpdates(userIds: string[]): Promise { + const results = await Promise.allSettled(userIds.map((uid) => this.updateUserPresence(uid))); + const fulfilled = results.filter((result) => result.status === 'fulfilled'); + const rejected = results.filter((result) => result.status === 'rejected'); + + if (fulfilled.length > 0) { + console.debug(`[PresenceReaper] Successfully updated presence for ${fulfilled.length} users.`); + } + + if (rejected.length > 0) { + console.error( + `[PresenceReaper] Failed to update presence for ${rejected.length} users:`, + rejected.map(({ reason }) => reason), + ); + } + } + + override async stopped(): Promise { + this.reaper.stop(); if (!this.lostConTimeout) { return; } @@ -137,6 +165,28 @@ export class Presence extends ServiceClass implements IPresence { }; } + async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> { + const query = { + '_id': uid, + 'connections.id': connectionId, + }; + + const update = { + $set: { + 'connections.$._updatedAt': new Date(), + }, + }; + + const result = await UsersSessions.updateOne(query, update); + if (result.modifiedCount === 0) { + return; + } + + await this.updateUserPresence(uid); + + return { uid, connectionId }; + } + async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> { if (!uid || !session) { return; diff --git a/ee/packages/presence/src/lib/PresenceReaper.spec.ts b/ee/packages/presence/src/lib/PresenceReaper.spec.ts new file mode 100644 index 0000000000000..f8e6a83c498b2 --- /dev/null +++ b/ee/packages/presence/src/lib/PresenceReaper.spec.ts @@ -0,0 +1,154 @@ +import { UserStatus, type IUserSession, type IUserSessionConnection } from '@rocket.chat/core-typings'; +import { registerModel } from '@rocket.chat/models'; +import type { FindCursor, WithId } from 'mongodb'; + +import { PresenceReaper } from './PresenceReaper'; + +let sessions = 0; +const createSession = (overrides: Partial = {}): IUserSession => ({ + _id: `user-${sessions++}`, + connections: [], + ...overrides, +}); + +let connections = 0; +const createConnection = (overrides: Partial = {}): IUserSessionConnection => ({ + id: `conn-${connections++}`, + instanceId: `instanceId`, + status: UserStatus.ONLINE, + _createdAt: new Date(), + _updatedAt: new Date(), + ...overrides, +}); + +const createDates = () => { + const now = new Date(); + const stale = new Date(now.getTime() - 10 * 60 * 1000); // 10 mins ago + const active = new Date(now.getTime() - 1 * 60 * 1000); // 1 min ago + const cutoff = new Date(now.getTime() - 5 * 60 * 1000); // 5 mins ago + + return { now, stale, active, cutoff }; +}; + +const createCursor = (documents: WithId[]): FindCursor> => { + let index = 0; + return { + async *[Symbol.asyncIterator]() { + while (index < documents.length) { + yield documents[index++]; + } + }, + } as FindCursor>; +}; + +describe('PresenceReaper', () => { + let reaper: PresenceReaper; + const bulkWriteMock = jest.fn(); + const findMock = jest.fn(); + const onUpdateMock = jest.fn(); + registerModel('IUsersSessionsModel', { + find: findMock, + col: { + bulkWrite: bulkWriteMock, + }, + } as any); + + beforeEach(() => { + bulkWriteMock.mockClear(); + findMock.mockClear(); + onUpdateMock.mockClear(); + + reaper = new PresenceReaper({ + onUpdate: onUpdateMock, + staleThresholdMs: 5 * 60 * 1000, // 5 minutes + batchSize: 2, // small batch size for testing + }); + }); + + it('should not call onUpdate when there no connections', async () => { + findMock.mockReturnValue(createCursor([])); + + await reaper.run(); + + expect(onUpdateMock).not.toHaveBeenCalled(); + }); + + it('should process users with stale connections correctly', async () => { + const { stale } = createDates(); + + findMock.mockReturnValue( + createCursor([ + createSession({ + _id: 'user-789', + connections: [createConnection({ _updatedAt: stale })], + }), + ]), + ); + + await reaper.run(); + + expect(onUpdateMock).toHaveBeenCalledTimes(1); + expect(onUpdateMock).toHaveBeenCalledWith(['user-789']); + }); + + it('should process multiple users and batch updates correctly', async () => { + const { stale } = createDates(); + + findMock.mockReturnValue( + createCursor([ + createSession({ + _id: 'user-1', + connections: [createConnection({ _updatedAt: stale })], + }), + createSession({ + _id: 'user-2', + connections: [createConnection({ _updatedAt: stale })], + }), + createSession({ + _id: 'user-3', + connections: [createConnection({ _updatedAt: stale })], + }), + ]), + ); + + // Execute Run + await reaper.run(); + + // Verify 'users' Update called twice due to batch size of 2 + expect(onUpdateMock).toHaveBeenCalledTimes(2); + expect(onUpdateMock).toHaveBeenNthCalledWith(1, ['user-1', 'user-2']); + expect(onUpdateMock).toHaveBeenNthCalledWith(2, ['user-3']); + }); + + it('should process users with mixed connection states correctly', async () => { + const { stale, active } = createDates(); + + findMock.mockReturnValue( + createCursor([ + createSession({ + _id: 'no-connections', + connections: [], + }), + createSession({ + _id: 'all-active', + connections: [createConnection({ _updatedAt: active })], + }), + createSession({ + _id: 'all-stale', + connections: [createConnection({ _updatedAt: stale })], + }), + createSession({ + _id: 'mixed', + connections: [createConnection({ _updatedAt: stale }), createConnection({ _updatedAt: active })], + }), + ]), + ); + + // Execute Run + await reaper.run(); + + // Verify 'users' Update called for both users + expect(onUpdateMock).toHaveBeenCalledTimes(1); + expect(onUpdateMock).toHaveBeenNthCalledWith(1, ['all-stale', 'mixed']); + }); +}); diff --git a/ee/packages/presence/src/lib/PresenceReaper.ts b/ee/packages/presence/src/lib/PresenceReaper.ts new file mode 100644 index 0000000000000..969465229cad1 --- /dev/null +++ b/ee/packages/presence/src/lib/PresenceReaper.ts @@ -0,0 +1,132 @@ +import { setInterval } from 'node:timers'; + +import type { IUserSession } from '@rocket.chat/core-typings'; +import { UsersSessions } from '@rocket.chat/models'; +import type { AnyBulkWriteOperation } from 'mongodb'; + +type ReaperPlan = { + userId: string; + removeIds: NonEmptyArray; + cutoffDate: Date; +}; + +type NonEmptyArray = Omit<[T, ...T[]], 'map'> & { + map(callbackfn: (value: T, index: number, array: T[]) => U): NonEmptyArray; +}; + +const isNonEmptyArray = (arr: T[]): arr is NonEmptyArray => arr.length > 0; + +type ReaperCallback = (userIds: NonEmptyArray) => Promise; + +type ReaperOptions = { + onUpdate: ReaperCallback; + staleThresholdMs: number; + batchSize: number; +}; + +export class PresenceReaper { + private staleThresholdMs: number; + + private batchSize: number; + + private running: boolean; + + private onUpdate: ReaperCallback; + + private intervalId?: NodeJS.Timeout; + + constructor(options: ReaperOptions) { + this.onUpdate = options.onUpdate; + this.staleThresholdMs = options.staleThresholdMs; + this.batchSize = options.batchSize; + this.running = false; + } + + public start() { + if (this.running) return; + this.running = true; + + // Run every 1 minute + this.intervalId = setInterval(() => { + this.run().catch((err) => console.error('[PresenceReaper] Error:', err)); + }, 60 * 1000); + } + + public stop() { + if (!this.running) return; + this.running = false; + + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = undefined; + } + } + + public async run(): Promise { + const cutoffDate = new Date(Date.now() - this.staleThresholdMs); + + // 1. Find users with potentially stale connections + const cursor = UsersSessions.find( + { 'connections._updatedAt': { $lte: cutoffDate } }, + { + projection: { _id: 1, connections: 1 }, + }, + ); + + const userChangeSet = new Map(); + + for await (const sessionDoc of cursor) { + this.processDocument(sessionDoc, cutoffDate, userChangeSet); + + if (userChangeSet.size >= this.batchSize) { + await this.flushBatch(userChangeSet); + userChangeSet.clear(); + } + } + + if (userChangeSet.size > 0) { + await this.flushBatch(userChangeSet); + } + } + + private processDocument(sessionDoc: IUserSession, cutoffDate: Date, changeMap: Map): void { + const userId = sessionDoc._id; + const allConnections = sessionDoc.connections || []; + + // Filter connections based on the cutoff + const staleConnections = allConnections.filter((c) => c._updatedAt <= cutoffDate); + + if (isNonEmptyArray(staleConnections)) { + changeMap.set(userId, { + userId, + removeIds: staleConnections.map((c) => c.id), + cutoffDate, // Keep reference for race-condition check + }); + } + } + + private async flushBatch(changeMap: Map): Promise { + const operations = []; + + for (const plan of changeMap.values()) { + operations.push({ + updateOne: { + filter: { _id: plan.userId }, + update: { + $pull: { + connections: { + id: { $in: plan.removeIds }, + _updatedAt: { $lte: plan.cutoffDate }, + }, + }, + }, + }, + } satisfies AnyBulkWriteOperation); + } + + if (isNonEmptyArray(operations)) { + await UsersSessions.col.bulkWrite(operations); + await this.onUpdate(operations.map((op) => op.updateOne.filter._id)); + } + } +} diff --git a/packages/core-services/src/types/IPresence.ts b/packages/core-services/src/types/IPresence.ts index 5f7c57d679955..268fed8cd1e66 100644 --- a/packages/core-services/src/types/IPresence.ts +++ b/packages/core-services/src/types/IPresence.ts @@ -13,6 +13,7 @@ export interface IPresence extends IServiceClass { session: string | undefined, nodeId: string, ): Promise<{ uid: string; session: string } | undefined>; + updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined>; removeLostConnections(nodeID: string): Promise; setStatus(uid: string, status: UserStatus, statusText?: string): Promise; setConnectionStatus(uid: string, status: UserStatus, session: string): Promise; diff --git a/yarn.lock b/yarn.lock index c58e0363815ea..60688953181c6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10748,7 +10748,7 @@ __metadata: peerDependencies: "@rocket.chat/layout": "*" "@rocket.chat/tools": 0.2.3 - "@rocket.chat/ui-contexts": 25.0.0 + "@rocket.chat/ui-contexts": 25.0.1 "@tanstack/react-query": "*" react: "*" react-hook-form: "*"