diff --git a/apps/meteor/app/lib/server/functions/createRoom.ts b/apps/meteor/app/lib/server/functions/createRoom.ts index bc9cc189d4f7a..7102708c34b7b 100644 --- a/apps/meteor/app/lib/server/functions/createRoom.ts +++ b/apps/meteor/app/lib/server/functions/createRoom.ts @@ -7,6 +7,7 @@ import { Rooms, Subscriptions, Users } from '@rocket.chat/models'; import { Meteor } from 'meteor/meteor'; import { createDirectRoom } from './createDirectRoom'; +import { setupTypingEventListenerForRoom } from '../../../../ee/server/hooks/federation'; import { callbacks } from '../../../../lib/callbacks'; import { beforeCreateRoomCallback } from '../../../../lib/callbacks/beforeCreateRoomCallback'; import { calculateRoomRolePriorityFromRoles } from '../../../../lib/roles/calculateRoomRolePriorityFromRoles'; @@ -276,7 +277,9 @@ export const createRoom = async ( } if (shouldBeHandledByFederation && federationVersion === 'native') { + // TODO: move this to the hooks folder await FederationMatrix.createRoom(room, owner, members); + setupTypingEventListenerForRoom(room._id); } void Apps.self?.triggerEvent(AppEvents.IPostRoomCreate, room); diff --git a/apps/meteor/ee/server/hooks/federation/index.ts b/apps/meteor/ee/server/hooks/federation/index.ts index e6b261079206d..a3ea763bb3902 100644 --- a/apps/meteor/ee/server/hooks/federation/index.ts +++ b/apps/meteor/ee/server/hooks/federation/index.ts @@ -1,7 +1,8 @@ -import { FederationMatrix } from '@rocket.chat/core-services'; +import { api, FederationMatrix } from '@rocket.chat/core-services'; import type { IMessage, IRoom, IUser } from '@rocket.chat/core-typings'; -import { Messages } from '@rocket.chat/models'; +import { Messages, Rooms } from '@rocket.chat/models'; +import notifications from '../../../../app/notifications/server/lib/Notifications'; import { callbacks } from '../../../../lib/callbacks'; import { afterLeaveRoomCallback } from '../../../../lib/callbacks/afterLeaveRoomCallback'; import { afterRemoveFromRoomCallback } from '../../../../lib/callbacks/afterRemoveFromRoomCallback'; @@ -85,3 +86,22 @@ afterRemoveFromRoomCallback.add( callbacks.priority.HIGH, 'federation-matrix-after-remove-from-room', ); + +export const setupTypingEventListenerForRoom = (roomId: string): void => { + notifications.streamRoom.on(`${roomId}/user-activity`, (username, activity) => { + if (Array.isArray(activity) && (!activity.length || activity.includes('user-typing'))) { + void api.broadcast('user.typing', { + user: { username }, + isTyping: activity.includes('user-typing'), + roomId, + }); + } + }); +}; + +export const setupInternalEDUEventListeners = async () => { + const federatedRooms = await Rooms.findFederatedRooms({ projection: { _id: 1 } }).toArray(); + for (const room of federatedRooms) { + setupTypingEventListenerForRoom(room._id); + } +}; diff --git a/apps/meteor/ee/server/index.ts b/apps/meteor/ee/server/index.ts index 2ae1821824806..960b3b2f66245 100644 --- a/apps/meteor/ee/server/index.ts +++ b/apps/meteor/ee/server/index.ts @@ -20,5 +20,6 @@ export * from './apps/startup'; export { registerEEBroker } from './startup'; await License.onLicense('federation', async () => { - await import('./hooks/federation'); + const { setupInternalEDUEventListeners } = await import('./hooks/federation'); + await setupInternalEDUEventListeners(); }); diff --git a/apps/meteor/server/modules/listeners/listeners.module.ts b/apps/meteor/server/modules/listeners/listeners.module.ts index d0894913cec15..61c4e05a31277 100644 --- a/apps/meteor/server/modules/listeners/listeners.module.ts +++ b/apps/meteor/server/modules/listeners/listeners.module.ts @@ -151,28 +151,7 @@ export class ListenersModule { notifications.notifyRoom(rid, 'videoconf', callId); }); - service.onEvent('presence.status', ({ user }) => { - const { _id, username, name, status, statusText, roles } = user; - if (!status || !username) { - return; - } - - notifications.notifyUserInThisInstance(_id, 'userData', { - type: 'updated', - id: _id, - diff: { - status, - ...(statusText && { statusText }), - }, - unset: {}, - }); - - notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]); - - if (_id) { - notifications.sendPresence(_id, username, STATUS_MAP[status], statusText); - } - }); + service.onEvent('presence.status', ({ user }) => this.handlePresence({ user }, notifications)); service.onEvent('user.updateCustomStatus', (userStatus) => { notifications.notifyLoggedInThisInstance('updateCustomUserStatus', { @@ -180,6 +159,12 @@ export class ListenersModule { }); }); + service.onEvent('federation-matrix.user.typing', ({ isTyping, roomId, username }) => { + notifications.notifyRoom(roomId, 'user-activity', username, isTyping ? ['user-typing'] : []); + }); + + service.onEvent('federation-matrix.user.presence.status', ({ user }) => this.handlePresence({ user }, notifications)); + service.onEvent('watch.messages', async ({ message }) => { if (!message.rid) { return; @@ -506,4 +491,30 @@ export class ListenersModule { notifications.streamRoomMessage.emit(roomId, acknowledgeMessage); }); } + + private handlePresence( + { user }: { user: Pick }, + notifications: NotificationsModule, + ): void { + const { _id, username, name, status, statusText, roles } = user; + if (!status || !username) { + return; + } + + notifications.notifyUserInThisInstance(_id, 'userData', { + type: 'updated', + id: _id, + diff: { + status, + ...(statusText && { statusText }), + }, + unset: {}, + }); + + notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]); + + if (_id) { + notifications.sendPresence(_id, username, STATUS_MAP[status], statusText); + } + } } diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index 74581ebcc661a..0fe3a873972cc 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -1,9 +1,10 @@ import 'reflect-metadata'; +import type { PresenceState } from '@hs/core'; import { ConfigService, createFederationContainer, getAllServices } from '@hs/federation-sdk'; import type { HomeserverEventSignatures, HomeserverServices, FederationContainerOptions } from '@hs/federation-sdk'; import { type IFederationMatrixService, Room, ServiceClass, Settings } from '@rocket.chat/core-services'; -import { isDeletedMessage, isMessageFromMatrixFederation, type IMessage, type IRoom, type IUser } from '@rocket.chat/core-typings'; +import { isDeletedMessage, isMessageFromMatrixFederation, UserStatus, type IMessage, type IRoom, type IUser } from '@rocket.chat/core-typings'; import { Emitter } from '@rocket.chat/emitter'; import { Router } from '@rocket.chat/http-router'; import { Logger } from '@rocket.chat/logger'; @@ -63,6 +64,58 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS createFederationContainer(containerOptions, config); instance.homeserverServices = getAllServices(); instance.buildMatrixHTTPRoutes(); + instance.onEvent('user.typing', async ({ isTyping, roomId, user: { username } }): Promise => { + if (!roomId || !username) { + return; + } + const externalRoomId = await MatrixBridgedRoom.getExternalRoomId(roomId); + if (!externalRoomId) { + return; + } + const localUser = await Users.findOneByUsername(username, { projection: { _id: 1 } }); + if (!localUser) { + return; + } + const externalUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(localUser._id); + if (!externalUserId) { + return; + } + void instance.homeserverServices.edu.sendTypingNotification(externalRoomId, externalUserId, isTyping); + }); + instance.onEvent( + 'presence.status', + async ({ user }: { user: Pick }): Promise => { + if (!user.username || !user.status) { + return; + } + const localUser = await Users.findOneByUsername(user.username, { projection: { _id: 1 } }); + if (!localUser) { + return; + } + const externalUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(localUser._id); + if (!externalUserId) { + return; + } + + const roomsUserIsMemberOf = await Subscriptions.findUserFederatedRoomIds(localUser._id).toArray(); + const statusMap: Record = { + [UserStatus.ONLINE]: 'online', + [UserStatus.OFFLINE]: 'offline', + [UserStatus.AWAY]: 'unavailable', + [UserStatus.BUSY]: 'unavailable', + [UserStatus.DISABLED]: 'offline', + }; + void instance.homeserverServices.edu.sendPresenceUpdateToRooms( + [ + { + user_id: externalUserId, + presence: statusMap[user.status] || 'offline', + }, + ], + roomsUserIsMemberOf.map(({ externalRoomId }) => externalRoomId), + ); + }, + ); return instance; } diff --git a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts index 9fd59bd2312b4..f6b76acd4a0d5 100644 --- a/ee/packages/federation-matrix/src/api/_matrix/transactions.ts +++ b/ee/packages/federation-matrix/src/api/_matrix/transactions.ts @@ -172,19 +172,15 @@ export const getMatrixTransactionsRoutes = (services: HomeserverServices) => { async (c) => { const body = await c.req.json(); - const { pdus = [] } = body; - - if (pdus.length === 0) { - return { - body: { - pdus: {}, - edus: {}, - }, - statusCode: 200, - }; + const { pdus = [], edus = [] } = body; + + if (pdus.length > 0) { + await event.processIncomingPDUs(pdus); } - await event.processIncomingPDUs(pdus); + if (edus.length > 0) { + await event.processIncomingEDUs(edus); + } return { body: { diff --git a/ee/packages/federation-matrix/src/events/edu.ts b/ee/packages/federation-matrix/src/events/edu.ts new file mode 100644 index 0000000000000..425dcb2de8daf --- /dev/null +++ b/ee/packages/federation-matrix/src/events/edu.ts @@ -0,0 +1,84 @@ +import type { HomeserverEventSignatures } from '@hs/federation-sdk'; +import { api } from '@rocket.chat/core-services'; +import { UserStatus } from '@rocket.chat/core-typings'; +import type { Emitter } from '@rocket.chat/emitter'; +import { Logger } from '@rocket.chat/logger'; +import { MatrixBridgedUser, MatrixBridgedRoom, Users } from '@rocket.chat/models'; + +import { convertExternalUserIdToInternalUsername } from '../helpers/identifiers'; + +const logger = new Logger('federation-matrix:edu'); + +export const edus = async (emitter: Emitter) => { + emitter.on('homeserver.matrix.typing', async (data) => { + try { + const matrixRoom = await MatrixBridgedRoom.getLocalRoomId(data.room_id); + if (!matrixRoom) { + logger.debug(`No bridged room found for Matrix room_id: ${data.room_id}`); + return; + } + + const matrixUser = await MatrixBridgedUser.findOne({ mui: convertExternalUserIdToInternalUsername(data.user_id) }); + if (!matrixUser) { + logger.debug(`No bridged user found for Matrix user_id: ${data.user_id}`); + return; + } + + const user = await Users.findOneById(matrixUser.uid, { projection: { _id: 1, username: 1 } }); + if (!user || !user.username) { + logger.debug(`User not found for uid: ${matrixUser.uid}`); + return; + } + + void api.broadcast('federation-matrix.user.typing', { + username: user.username, + isTyping: data.typing, + roomId: matrixRoom, + }); + } catch (error) { + logger.error('Error handling Matrix typing event:', error); + } + }); + + emitter.on('homeserver.matrix.presence', async (data) => { + try { + const matrixUser = await MatrixBridgedUser.findOne({ mui: convertExternalUserIdToInternalUsername(data.user_id) }); + if (!matrixUser) { + logger.debug(`No bridged user found for Matrix user_id: ${data.user_id}`); + return; + } + const user = await Users.findOneById(matrixUser.uid, { + projection: { _id: 1, username: 1, statusText: 1, roles: 1, name: 1, status: 1 }, + }); + if (!user) { + logger.debug(`User not found for uid: ${matrixUser.uid}`); + return; + } + + const statusMap = { + online: UserStatus.ONLINE, + offline: UserStatus.OFFLINE, + unavailable: UserStatus.AWAY, + }; + + const status = statusMap[data.presence] || UserStatus.OFFLINE; + await Users.updateOne( + { _id: user._id }, + { + $set: { + status, + statusDefault: status, + }, + }, + ); + + const { _id, username, statusText, roles, name } = user; + void api.broadcast('federation-matrix.user.presence.status', { + user: { status, _id, username, statusText, roles, name }, + }); + logger.debug(`Updated presence for user ${matrixUser.uid} to ${status} from Matrix federation`); + } catch (error) { + logger.error('Error handling Matrix presence event:', error); + } + }); +}; diff --git a/ee/packages/federation-matrix/src/events/index.ts b/ee/packages/federation-matrix/src/events/index.ts index 3c5910aff0767..9d91e2d8ec8a3 100644 --- a/ee/packages/federation-matrix/src/events/index.ts +++ b/ee/packages/federation-matrix/src/events/index.ts @@ -1,6 +1,7 @@ import type { HomeserverEventSignatures } from '@hs/federation-sdk'; import type { Emitter } from '@rocket.chat/emitter'; +import { edus } from './edu'; import { invite } from './invite'; import { member } from './member'; import { message } from './message'; @@ -13,4 +14,5 @@ export function registerEvents(emitter: Emitter) { invite(emitter); reaction(emitter); member(emitter); + edus(emitter); } diff --git a/packages/core-services/src/events/Events.ts b/packages/core-services/src/events/Events.ts index f149aec9f9ac3..637ae6fc76d23 100644 --- a/packages/core-services/src/events/Events.ts +++ b/packages/core-services/src/events/Events.ts @@ -141,6 +141,11 @@ export type EventSignatures = { }): void; 'user.updateCustomStatus'(userStatus: Omit): void; 'user.typing'(data: { user: Partial; isTyping: boolean; roomId: string }): void; + 'federation-matrix.user.typing'(data: { username: string; isTyping: boolean; roomId: string }): void; + 'federation-matrix.user.presence.status'(data: { + user: Pick; + previousStatus?: UserStatus; + }): void; 'user.video-conference'(data: { userId: IUser['_id']; action: string; diff --git a/packages/model-typings/src/models/ISubscriptionsModel.ts b/packages/model-typings/src/models/ISubscriptionsModel.ts index 6ca771ef2d84c..47e931b2bed2a 100644 --- a/packages/model-typings/src/models/ISubscriptionsModel.ts +++ b/packages/model-typings/src/models/ISubscriptionsModel.ts @@ -327,4 +327,5 @@ export interface ISubscriptionsModel extends IBaseModel { countByRoomIdWhenUsernameExists(rid: string): Promise; setE2EKeyByUserIdAndRoomId(userId: string, rid: string, key: string): Promise>; countUsersInRoles(roles: IRole['_id'][], rid: IRoom['_id'] | undefined): Promise; + findUserFederatedRoomIds(userId: IUser['_id']): AggregationCursor<{ _id: IRoom['_id']; externalRoomId: string }>; } diff --git a/packages/models/src/models/Rooms.ts b/packages/models/src/models/Rooms.ts index 9a9091f05e96a..e368d5e0d60a1 100644 --- a/packages/models/src/models/Rooms.ts +++ b/packages/models/src/models/Rooms.ts @@ -96,6 +96,10 @@ export class RoomsRaw extends BaseRaw implements IRoomsModel { sparse: true, }, { key: { t: 1, ts: 1 } }, + { + key: { federated: 1 }, + sparse: true, + }, { key: { 'usersWaitingForE2EKeys.userId': 1, diff --git a/packages/models/src/models/Subscriptions.ts b/packages/models/src/models/Subscriptions.ts index d8b2f1a90b3dd..16ebf113dea42 100644 --- a/packages/models/src/models/Subscriptions.ts +++ b/packages/models/src/models/Subscriptions.ts @@ -2052,4 +2052,41 @@ export class SubscriptionsRaw extends BaseRaw implements ISubscri return this.updateOne(query, update); } + + findUserFederatedRoomIds(userId: IUser['_id']): AggregationCursor<{ _id: IRoom['_id']; externalRoomId: string }> { + return this.col.aggregate<{ _id: IRoom['_id']; externalRoomId: string }>([ + { + $match: { + 'u._id': userId, + }, + }, + { + $lookup: { + from: 'rocketchat_room', + localField: 'rid', + foreignField: '_id', + as: 'room', + }, + }, + { + $match: { + 'room.federated': true, + }, + }, + { + $lookup: { + from: 'rocketchat_matrix_bridged_rooms', + localField: 'rid', + foreignField: 'rid', + as: 'matrixRoom', + }, + }, + { + $project: { + _id: '$rid', + externalRoomId: { $arrayElemAt: ['$matrixRoom.mri', 0] }, + }, + }, + ]); + } }