Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
222943a
fix: d
cardoso Nov 18, 2025
b2818bd
fix: renew with ddp over rest
cardoso Nov 18, 2025
2c79e92
chore: improve types
cardoso Nov 18, 2025
cb85c50
refactor: take userId in renewConnection
cardoso Nov 19, 2025
818db84
fix: remove expired connections on renew
cardoso Nov 19, 2025
bcda88d
refactor: simplify logic & separate tests
cardoso Nov 21, 2025
926642b
chore: add changeset
cardoso Nov 24, 2025
d75b0cf
Merge branch 'develop' into fix/presence-dangling
cardoso Nov 27, 2025
b03787e
Merge branch 'develop' into fix/presence-dangling
cardoso Nov 28, 2025
a7f2be2
Merge branch 'develop' into fix/presence-dangling
cardoso Nov 28, 2025
6bde2cf
refactor: remove UserPresence:ping
cardoso Nov 28, 2025
d04d556
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 1, 2025
d842b11
fix: meteor types
cardoso Dec 1, 2025
992a691
chore: updateConnectionStatus throttle
cardoso Dec 1, 2025
e542ec5
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 1, 2025
7f7a552
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 1, 2025
1e7a950
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
62d2466
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
e1a7a4f
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
d56714a
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
7d3d039
chore: normalize monolith & microservice behavior
cardoso Dec 2, 2025
2e3592d
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
628b508
fix: clean up throttle tracking for connections
cardoso Dec 3, 2025
1688b9c
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
049ef81
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
bcf2d04
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
a65f79b
fix: periodically remove stale connections
cardoso Dec 3, 2025
682230a
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
efff1b3
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
60f6657
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
3f106f3
refactor: move throttling logic to presence service
cardoso Dec 4, 2025
8db5df2
refactor: separate cleaning logic into its own class
cardoso Dec 4, 2025
5112f8a
fix: only update connection if no packet was seen
cardoso Dec 4, 2025
e7dbb88
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
0ab1249
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 5, 2025
ff983de
chore: remove comments & unused properties
cardoso Dec 5, 2025
1675085
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 5, 2025
96b25ce
improve: presence reaper robustness and connection updates
cardoso Dec 5, 2025
3b0c9d0
fix(ddp-streamer): only update connection on heartbeat
cardoso Dec 5, 2025
964f024
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 5, 2025
2926cfe
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 6, 2025
5633225
chore: remove test code
cardoso Dec 6, 2025
d3b27fb
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 8, 2025
181ac93
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 8, 2025
56c55a1
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 9, 2025
a9565a1
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 9, 2025
0fc67cd
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 9, 2025
d1405af
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 9, 2025
5b2b9da
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 9, 2025
0882ac2
fix(presence): reaper error handling and logic
cardoso Dec 9, 2025
94aef54
chore: update changeset
cardoso Dec 9, 2025
44ccab9
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 9, 2025
fbf7e4e
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 10, 2025
7e327d7
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 10, 2025
485ad1d
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 10, 2025
dcb7fee
chore(presence): remove shouldMarkOffline
cardoso Dec 10, 2025
bb920ea
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 11, 2025
de91ab0
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 11, 2025
b6e3ea3
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 11, 2025
93eedcc
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 11, 2025
c230094
Merge branch 'develop' into fix/presence-dangling
kodiakhq[bot] Dec 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/spicy-nails-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@rocket.chat/meteor": patch
"@rocket.chat/core-services": patch
"@rocket.chat/ddp-streamer": patch
"@rocket.chat/presence": patch
---

Ensures presence stays accurate by refreshing connections on heartbeats and removing stale sessions.
64 changes: 64 additions & 0 deletions apps/meteor/definition/externals/meteor/ddp-common.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' {
userId?: string;
});
}

/**
* Heartbeat options
*/
type HeartbeatOptions = {
/**
* interval to send pings, in milliseconds
*/
heartbeatInterval: number;
/**
* timeout to close the connection if a reply isn't received, in milliseconds.
*/
heartbeatTimeout: number;
/**
* function to call to send a ping on the connection.
*/
sendPing: () => void;
/**
* function to call to close the connection
*/
onTimeout: () => void;
};

class Heartbeat {
heartbeatInterval: number;

heartbeatTimeout: number;

_sendPing: () => void;

_onTimeout: () => void;

_seenPacket: boolean;

_heartbeatIntervalHandle: ReturnType<typeof setTimeout> | null;

_heartbeatTimeoutHandle: ReturnType<typeof setTimeout> | null;

constructor(options: HeartbeatOptions);

stop(): void;

start(): void;

_startHeartbeatIntervalTimer(): void;

_startHeartbeatTimeoutTimer(): void;

_clearHeartbeatIntervalTimer(): void;

_clearHeartbeatTimeoutTimer(): void;

/**
* The heartbeat interval timer is fired when we should send a ping.
*/
_heartbeatIntervalFired(): void;

/**
* The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong.
*/
_heartbeatTimeoutFired(): void;

messageReceived(): void;
}
}
}
9 changes: 7 additions & 2 deletions apps/meteor/definition/externals/meteor/meteor.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'meteor/meteor';
import type { ServerMethods } from '@rocket.chat/ddp-client';
import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer';
import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common';

type StringifyBuffers<T extends unknown[]> = {
[P in keyof T]: T[P] extends Buffer ? string : T[P];
Expand Down Expand Up @@ -39,7 +39,12 @@ declare module 'meteor/meteor' {
isDesktop: () => boolean;
}

const server: any;
const server: {
sessions: Map<string, { userId: string; heartbeat: DDPCommon.Heartbeat }>;
publish_handlers: {
meteor_autoupdate_clientVersions(): void;
};
};

const runAsUser: <T>(userId: string, scope: () => T) => T;

Expand Down
10 changes: 10 additions & 0 deletions apps/meteor/ee/server/startup/presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ Meteor.startup(() => {
return;
}

const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat);
session.heartbeat.messageReceived = function messageReceived() {
if (this._seenPacket === false) {
void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => {
console.error('Error updating connection presence on heartbeat:', err);
});
}
return _messageReceived();
};

void (async function () {
await Presence.newConnection(login.user._id, login.connection.id, nodeId);
updateConns();
Expand Down
19 changes: 19 additions & 0 deletions ee/apps/ddp-streamer/src/Client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import type { IncomingMessage } from 'http';

import { Presence } from '@rocket.chat/core-services';
import type { ISocketConnection } from '@rocket.chat/core-typings';
import { v1 as uuidv1 } from 'uuid';
import type WebSocket from 'ws';
Expand Down Expand Up @@ -73,6 +74,8 @@ export class Client extends EventEmitter {

public userToken?: string;

private _seenPacket = true;

constructor(
public ws: WebSocket,
public meteorClient = false,
Expand Down Expand Up @@ -179,6 +182,18 @@ export class Client extends EventEmitter {
this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT);
};

private messageReceived = (): void => {
if (this._seenPacket || !this.userId) {
this._seenPacket = true;
return;
}

this._seenPacket = true;
void Presence.updateConnection(this.userId, this.connection.id).catch((err) => {
console.error('Error updating connection presence after heartbeat:', err);
});
};

ping(id?: string): void {
this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) }));
}
Expand All @@ -188,6 +203,9 @@ export class Client extends EventEmitter {
}

handleIdle = (): void => {
if (this.userId) {
this._seenPacket = false;
}
this.ping();
this.timeout = setTimeout(this.closeTimeout, TIMEOUT);
};
Expand All @@ -200,6 +218,7 @@ export class Client extends EventEmitter {
handler = async (payload: WebSocket.Data, isBinary: boolean): Promise<void> => {
try {
const packet = server.parse(payload, isBinary);
this.messageReceived();
this.emit('message', packet);
if (this.wait) {
return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet))));
Expand Down
50 changes: 50 additions & 0 deletions ee/packages/presence/src/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings';
import { UserStatus } from '@rocket.chat/core-typings';
import { Settings, Users, UsersSessions } from '@rocket.chat/models';

import { PresenceReaper } from './lib/PresenceReaper';
import { processPresenceAndStatus } from './lib/processConnectionStatus';

const MAX_CONNECTIONS = 200;
Expand All @@ -25,9 +26,17 @@ export class Presence extends ServiceClass implements IPresence {

private peakConnections = 0;

private reaper: PresenceReaper;

constructor() {
super();

this.reaper = new PresenceReaper({
batchSize: 500,
staleThresholdMs: 5 * 60 * 1000, // 5 minutes
onUpdate: (userIds) => this.handleReaperUpdates(userIds),
});

this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise<void> => {
if (clientAction === 'removed') {
this.connsPerInstance.delete(id);
Expand Down Expand Up @@ -73,6 +82,7 @@ export class Presence extends ServiceClass implements IPresence {
}

override async started(): Promise<void> {
this.reaper.start();
this.lostConTimeout = setTimeout(async () => {
const affectedUsers = await this.removeLostConnections();
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
Expand All @@ -89,7 +99,25 @@ export class Presence extends ServiceClass implements IPresence {
}
}

private async handleReaperUpdates(userIds: string[]): Promise<void> {
const results = await Promise.allSettled(userIds.map((uid) => this.updateUserPresence(uid)));
const fulfilled = results.filter((result) => result.status === 'fulfilled');
const rejected = results.filter((result) => result.status === 'rejected');

if (fulfilled.length > 0) {
console.debug(`[PresenceReaper] Successfully updated presence for ${fulfilled.length} users.`);
}

if (rejected.length > 0) {
console.error(
`[PresenceReaper] Failed to update presence for ${rejected.length} users:`,
rejected.map(({ reason }) => reason),
);
}
}

override async stopped(): Promise<void> {
this.reaper.stop();
if (!this.lostConTimeout) {
return;
}
Expand Down Expand Up @@ -137,6 +165,28 @@ export class Presence extends ServiceClass implements IPresence {
};
}

async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> {
const query = {
'_id': uid,
'connections.id': connectionId,
};

const update = {
$set: {
'connections.$._updatedAt': new Date(),
},
};

const result = await UsersSessions.updateOne(query, update);
if (result.modifiedCount === 0) {
return;
}

await this.updateUserPresence(uid);

return { uid, connectionId };
}

async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
if (!uid || !session) {
return;
Expand Down
Loading
Loading