diff --git a/ee/packages/federation-matrix/package.json b/ee/packages/federation-matrix/package.json index 4518af0e943ea..6a4faf7b64f43 100644 --- a/ee/packages/federation-matrix/package.json +++ b/ee/packages/federation-matrix/package.json @@ -33,6 +33,7 @@ "marked": "^16.1.2", "mongodb": "6.16.0", "pino": "^8.21.0", + "prom-client": "^15.1.3", "reflect-metadata": "^0.2.2", "sanitize-html": "~2.17.0", "tsyringe": "^4.10.0", diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index a8445c43af10b..ef811e4c6aeed 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -18,6 +18,7 @@ import emojione from 'emojione'; import { createOrUpdateFederatedUser } from './helpers/createOrUpdateFederatedUser'; import { extractDomainFromMatrixUserId } from './helpers/extractDomainFromMatrixUserId'; import { toExternalMessageFormat, toExternalQuoteMessageFormat } from './helpers/message.parsers'; +import { federationMetrics, determineOutgoingMessageType } from './helpers/metricsHelpers'; import { validateFederatedUsername } from './helpers/validateFederatedUsername'; import { MatrixMediaService } from './services/MatrixMediaService'; @@ -118,6 +119,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS throw new Error('Room is not a public or private room'); } + const endTimer = federationMetrics.federationRoomCreateDuration.startTimer({ room_type: room.t }); + try { const matrixUserId = userIdSchema.parse(`@${owner.username}:${this.serverName}`); const roomName = room.name || room.fname || 'Untitled Room'; @@ -143,12 +146,23 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS // Members are NOT invited here - invites are sent via beforeAddUserToRoom callback. + // Increment success metrics + federationMetrics.federatedRoomsCreated.inc({ room_type: room.t }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_create', direction: 'outgoing' }); + this.logger.debug({ msg: 'Room creation completed successfully', roomId: room._id }); return matrixRoomResult; } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_create', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to create room', err }); throw err; + } finally { + endTimer(); } } @@ -184,6 +198,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS creatorId, })) async createDirectMessageRoom(room: IRoom, members: IUser[], creatorId: IUser['_id']): Promise { + const endTimer = federationMetrics.federationRoomCreateDuration.startTimer({ room_type: 'd' }); + try { this.logger.debug({ msg: 'Creating direct message room in Matrix', roomId: room._id, memberCount: members.length }); @@ -210,10 +226,21 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS origin: this.serverName, }); + // Increment success metrics + federationMetrics.federatedRoomsCreated.inc({ room_type: 'd' }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_create', direction: 'outgoing' }); + this.logger.debug({ roomId: room._id, msg: 'Direct message room creation completed successfully' }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_create', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to create direct message room', err }); throw err; + } finally { + endTimer(); } } @@ -356,9 +383,11 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS hasAttachments: Boolean(message?.attachments?.length), })) async sendMessage(message: IMessage, room: IRoomNativeFederated, user: IUser): Promise { + const messageType = determineOutgoingMessageType(message); + const endTimer = federationMetrics.federationOutgoingMessageSendDuration.startTimer({ message_type: messageType }); + try { const userMui = isUserNativeFederated(user) ? user.federation.mui : `@${user.username}:${this.serverName}`; - const messageType = message.files && message.files.length > 0 ? 'file' : 'text'; // Add runtime attributes for computed values addSpanAttributes({ @@ -385,10 +414,21 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await Messages.setFederationEventIdById(message._id, result.eventId); + // Increment success metrics + federationMetrics.federatedMessagesSent.inc({ room_type: room.t, message_type: messageType }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'message', direction: 'outgoing' }); + this.logger.debug({ msg: 'Message sent to Matrix successfully', eventId: result.eventId }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'message', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to send message to Matrix', err }); throw err; + } finally { + endTimer(); } } @@ -452,8 +492,15 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS // TODO message.u?.username is not the user who removed the message const eventId = await federationSDK.redactMessage(roomIdSchema.parse(matrixRoomId), eventIdSchema.parse(matrixEventId)); + federationMetrics.federationEventsProcessed.inc({ event_type: 'redaction', direction: 'outgoing' }); + this.logger.debug({ msg: 'Message redaction sent to Matrix successfully', eventId }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'redaction', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to send redaction to Matrix', err }); throw err; } @@ -466,6 +513,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS inviterUsername: inviter?.username, })) async inviteUsersToRoom(room: IRoomNativeFederated, matrixUsersUsername: string[], inviter: IUser): Promise { + const endTimer = federationMetrics.federationInviteSendDuration.startTimer({ room_type: room.t }); + try { const inviterUserId = `@${inviter.username}:${this.serverName}`; @@ -493,9 +542,20 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS ); }), ); + + // Increment invite counter for each user invited + federationMetrics.federatedInvitesSent.inc({ room_type: room.t }, matrixUsersUsername.length); + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to invite a user to Matrix', err }); throw err; + } finally { + endTimer(); } } @@ -548,8 +608,16 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await Messages.setFederationReactionEventId(user.username || '', messageId, reaction, eventId); + federationMetrics.federatedReactionsSent.inc({ action: 'add' }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'reaction', direction: 'outgoing' }); + this.logger.debug({ eventId, msg: 'Reaction sent to Matrix successfully' }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'reaction', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to send reaction to Matrix', err }); throw err; } @@ -606,9 +674,17 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS } await Messages.unsetFederationReactionEventId(eventId, messageId, reaction); + + federationMetrics.federatedReactionsSent.inc({ action: 'remove' }); + federationMetrics.federationEventsProcessed.inc({ event_type: 'reaction', direction: 'outgoing' }); break; } } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'reaction', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to remove reaction from Matrix', err }); throw err; } @@ -653,8 +729,15 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await federationSDK.leaveRoom(roomIdSchema.parse(room.federation.mrid), userIdSchema.parse(actualMatrixUserId)); + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); + this.logger.info({ msg: 'User left Matrix room successfully', username: user.username, roomId: room.federation.mrid }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to leave room in Matrix', err }); throw err; } @@ -691,6 +774,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS `Kicked by ${userWhoRemoved.username}`, ); + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); + this.logger.info({ msg: 'User was kicked from Matrix room', kickedUsername: removedUser.username, @@ -698,6 +783,11 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS performedBy: userWhoRemoved.username, }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to kick user from Matrix room', err }); throw err; } @@ -737,8 +827,15 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS eventIdSchema.parse(matrixEventId), ); + federationMetrics.federationEventsProcessed.inc({ event_type: 'message_edit', direction: 'outgoing' }); + this.logger.debug({ msg: 'Message updated in Matrix successfully', eventId }); } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'message_edit', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); this.logger.error({ msg: 'Failed to update message in Matrix', err }); throw err; } @@ -750,19 +847,30 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS username: user?.username, })) async updateRoomName(rid: string, displayName: string, user: IUser): Promise { - const room = await Rooms.findOneById(rid); - if (!room || !isRoomNativeFederated(room)) { - throw new Error(`No Matrix room mapping found for room ${rid}`); - } + try { + const room = await Rooms.findOneById(rid); + if (!room || !isRoomNativeFederated(room)) { + throw new Error(`No Matrix room mapping found for room ${rid}`); + } - if (isUserNativeFederated(user)) { - this.logger.debug('Only local users can change the name of a room, ignoring action'); - return; - } + if (isUserNativeFederated(user)) { + this.logger.debug('Only local users can change the name of a room, ignoring action'); + return; + } - const userMui = `@${user.username}:${this.serverName}`; + const userMui = `@${user.username}:${this.serverName}`; - await federationSDK.updateRoomName(roomIdSchema.parse(room.federation.mrid), displayName, userIdSchema.parse(userMui)); + await federationSDK.updateRoomName(roomIdSchema.parse(room.federation.mrid), displayName, userIdSchema.parse(userMui)); + + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_update', direction: 'outgoing' }); + } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_update', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + throw err; + } } @traced((room: IRoomNativeFederated, topic: string, user: Pick) => ({ @@ -776,14 +884,25 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS topic: string, user: Pick, ): Promise { - if (isUserNativeFederated(user)) { - this.logger.debug('Only local users can change the topic of a room, ignoring action'); - return; - } + try { + if (isUserNativeFederated(user)) { + this.logger.debug('Only local users can change the topic of a room, ignoring action'); + return; + } + + const userMui = `@${user.username}:${this.serverName}`; - const userMui = `@${user.username}:${this.serverName}`; + await federationSDK.setRoomTopic(roomIdSchema.parse(room.federation.mrid), userIdSchema.parse(userMui), topic); - await federationSDK.setRoomTopic(roomIdSchema.parse(room.federation.mrid), userIdSchema.parse(userMui), topic); + federationMetrics.federationEventsProcessed.inc({ event_type: 'room_update', direction: 'outgoing' }); + } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'room_update', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + throw err; + } } @traced((room: IRoomNativeFederated, senderId: string, userId: string, role: string) => ({ @@ -868,6 +987,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS const userMui = isUserNativeFederated(localUser) ? localUser.federation.mui : `@${localUser.username}:${this.serverName}`; void federationSDK.sendTypingNotification(room.federation.mrid, userMui, isTyping); + + federationMetrics.federationEventsProcessed.inc({ event_type: 'typing', direction: 'outgoing' }); } @traced((matrixIds: string[]) => ({ @@ -926,50 +1047,61 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS action, })) async handleInvite(roomId: IRoom['_id'], userId: IUser['_id'], action: 'accept' | 'reject'): Promise { - const subscription = await Subscriptions.findInvitedSubscription(roomId, userId); - if (!subscription) { - throw new Error('No subscription found or user does not have permission to accept or reject this invite'); - } + try { + const subscription = await Subscriptions.findInvitedSubscription(roomId, userId); + if (!subscription) { + throw new Error('No subscription found or user does not have permission to accept or reject this invite'); + } - const room = await Rooms.findOneById(roomId); - if (!room || !isRoomNativeFederated(room)) { - throw new Error('Room not found or not federated'); - } + const room = await Rooms.findOneById(roomId); + if (!room || !isRoomNativeFederated(room)) { + throw new Error('Room not found or not federated'); + } - const user = await Users.findOneById(userId); - if (!user) { - throw new Error('User not found'); - } + const user = await Users.findOneById(userId); + if (!user) { + throw new Error('User not found'); + } - if (!user.username) { - throw new Error('User username not found'); - } + if (!user.username) { + throw new Error('User username not found'); + } - // TODO: should use common function to get matrix user ID - const matrixUserId = isUserNativeFederated(user) ? user.federation.mui : `@${user.username}:${this.serverName}`; + // TODO: should use common function to get matrix user ID + const matrixUserId = isUserNativeFederated(user) ? user.federation.mui : `@${user.username}:${this.serverName}`; - // Add runtime attributes after querying room and user - addSpanAttributes({ - matrixRoomId: room.federation.mrid, - matrixUserId, - username: user.username, - isNativeFederatedUser: isUserNativeFederated(user), - }); + // Add runtime attributes after querying room and user + addSpanAttributes({ + matrixRoomId: room.federation.mrid, + matrixUserId, + username: user.username, + isNativeFederatedUser: isUserNativeFederated(user), + }); - if (action === 'accept') { - await federationSDK.acceptInvite(room.federation.mrid, matrixUserId); - } + if (action === 'accept') { + await federationSDK.acceptInvite(room.federation.mrid, matrixUserId); + } - if (action === 'reject') { - try { - await federationSDK.rejectInvite(room.federation.mrid, matrixUserId); - } catch (err) { - if (err instanceof FederationRequestError && err.response.status === 403) { - return Room.performUserRemoval(room, user); + if (action === 'reject') { + try { + await federationSDK.rejectInvite(room.federation.mrid, matrixUserId); + } catch (err) { + if (err instanceof FederationRequestError && err.response.status === 403) { + return Room.performUserRemoval(room, user); + } + this.logger.error({ msg: 'Failed to reject invite in Matrix', err }); + throw err; } - this.logger.error({ msg: 'Failed to reject invite in Matrix', err }); - throw err; } + + federationMetrics.federationEventsProcessed.inc({ event_type: 'membership', direction: 'outgoing' }); + } catch (err) { + federationMetrics.federationEventsFailed.inc({ + event_type: 'membership', + direction: 'outgoing', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + throw err; } } } diff --git a/ee/packages/federation-matrix/src/events/edu.ts b/ee/packages/federation-matrix/src/events/edu.ts index 2235e8479d811..773f90e39479b 100644 --- a/ee/packages/federation-matrix/src/events/edu.ts +++ b/ee/packages/federation-matrix/src/events/edu.ts @@ -1,14 +1,15 @@ import { api } from '@rocket.chat/core-services'; import { UserStatus } from '@rocket.chat/core-typings'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Rooms, Users } from '@rocket.chat/models'; const logger = new Logger('federation-matrix:edu'); export const edus = async () => { - federationSDK.eventEmitterService.on('homeserver.matrix.typing', async (data) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.typing', + async (data: HomeserverEventSignatures['homeserver.matrix.typing']) => { const matrixRoom = await Rooms.findOne({ 'federation.mrid': data.room_id }, { projection: { _id: 1 } }); if (!matrixRoom) { logger.debug({ msg: 'No bridged room found for Matrix room_id', roomId: data.room_id }); @@ -20,13 +21,13 @@ export const edus = async () => { isTyping: data.typing, roomId: matrixRoom._id, }); - } catch (err) { - logger.error({ msg: 'Error handling Matrix typing event', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error handling Matrix typing event', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.presence', async (data) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.presence', + async (data: HomeserverEventSignatures['homeserver.matrix.presence']) => { const matrixUser = await Users.findOneByUsername(data.user_id); if (!matrixUser) { logger.debug({ msg: 'No federated user found for Matrix user_id', userId: data.user_id }); @@ -67,8 +68,7 @@ export const edus = async () => { previousStatus: undefined, }); logger.debug({ msg: 'Updated presence for user from Matrix federation', userId: matrixUser._id, status }); - } catch (err) { - logger.error({ msg: 'Error handling Matrix presence event', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error handling Matrix presence event', err }), + ); }; diff --git a/ee/packages/federation-matrix/src/events/member.ts b/ee/packages/federation-matrix/src/events/member.ts index a78c23861f0b8..792b5eb6e1ded 100644 --- a/ee/packages/federation-matrix/src/events/member.ts +++ b/ee/packages/federation-matrix/src/events/member.ts @@ -94,7 +94,7 @@ async function getOrCreateFederatedRoom({ function getJoinRuleType(strippedState: PduForType<'m.room.join_rules'>[]): 'p' | 'c' | 'd' { const joinRulesState = strippedState?.find((state: PduForType<'m.room.join_rules'>) => state.type === 'm.room.join_rules'); - // as per the spec, users need to be invited to join a room, unless the room’s join rules state otherwise. + // as per the spec, users need to be invited to join a room, unless the room's join rules state otherwise. if (!joinRulesState) { return 'p'; } @@ -255,8 +255,9 @@ async function handleLeave({ } export function member() { - federationSDK.eventEmitterService.on('homeserver.matrix.membership', async ({ event }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.membership', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.membership']) => { switch (event.content.membership) { case 'invite': await handleInvite(event); @@ -273,8 +274,7 @@ export function member() { default: logger.warn({ msg: 'Unknown membership type', membership: event.content.membership }); } - } catch (err) { - logger.error({ msg: 'Failed to process Matrix membership event', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix membership event', err }), + ); } diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index 2d15fd7e302cf..f1659a5b6cd7c 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -1,6 +1,13 @@ import { FederationMatrix, Message, MeteorService } from '@rocket.chat/core-services'; import type { IUser, IRoom, FileAttachmentProps } from '@rocket.chat/core-typings'; -import { type FileMessageType, type MessageType, type FileMessageContent, type EventID, federationSDK } from '@rocket.chat/federation-sdk'; +import { + type FileMessageType, + type MessageType, + type FileMessageContent, + type EventID, + federationSDK, + type HomeserverEventSignatures, +} from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Users, Rooms, Messages } from '@rocket.chat/models'; @@ -111,8 +118,9 @@ async function handleMediaMessage( } export function message() { - federationSDK.eventEmitterService.on('homeserver.matrix.message', async ({ event, event_id: eventId }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.message', + async ({ event, event_id: eventId }: HomeserverEventSignatures['homeserver.matrix.message']) => { const { msgtype, body } = event.content; const messageBody = body.toString(); @@ -136,7 +144,7 @@ export function message() { const relation = event.content['m.relates_to']; - // SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such. + // SPEC: For example, an m.thread relationship type denotes that the event is part of a "thread" of messages and should be rendered as such. const hasRelation = relation && 'rel_type' in relation; const isThreadMessage = hasRelation && relation.rel_type === 'm.thread'; @@ -144,7 +152,7 @@ export function message() { const threadRootEventId = isThreadMessage && relation.event_id; // SPEC: Though rich replies form a relationship to another event, they do not use rel_type to create this relationship. - // Instead, a subkey named m.in_reply_to is used to describe the reply’s relationship, + // Instead, a subkey named m.in_reply_to is used to describe the reply's relationship, const isRichReply = relation && !('rel_type' in relation) && 'm.in_reply_to' in relation; const quoteMessageEventId = isRichReply && relation['m.in_reply_to']?.event_id; @@ -261,13 +269,13 @@ export function message() { ts: new Date(event.origin_server_ts), }); } - } catch (err) { - logger.error({ msg: 'Error processing Matrix message', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error processing Matrix message', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.encrypted', async ({ event, event_id: eventId }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.encrypted', + async ({ event, event_id: eventId }: HomeserverEventSignatures['homeserver.matrix.encrypted']) => { if (!event.content.ciphertext) { logger.debug('No message content found in event'); return; @@ -286,7 +294,7 @@ export function message() { const relation = event.content['m.relates_to']; - // SPEC: For example, an m.thread relationship type denotes that the event is part of a “thread” of messages and should be rendered as such. + // SPEC: For example, an m.thread relationship type denotes that the event is part of a "thread" of messages and should be rendered as such. const hasRelation = relation && 'rel_type' in relation; const isThreadMessage = hasRelation && relation.rel_type === 'm.thread'; @@ -294,7 +302,7 @@ export function message() { const threadRootEventId = isThreadMessage && relation.event_id; // SPEC: Though rich replies form a relationship to another event, they do not use rel_type to create this relationship. - // Instead, a subkey named m.in_reply_to is used to describe the reply’s relationship, + // Instead, a subkey named m.in_reply_to is used to describe the reply's relationship, const isRichReply = relation && !('rel_type' in relation) && 'm.in_reply_to' in relation; const quoteMessageEventId = isRichReply && relation['m.in_reply_to']?.event_id; @@ -377,13 +385,13 @@ export function message() { thread, ts: new Date(event.origin_server_ts), }); - } catch (err) { - logger.error({ msg: 'Error processing Matrix message', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Error processing Matrix message', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.redaction', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.redaction']) => { const redactedEventId = event.redacts; if (!redactedEventId) { logger.debug('No redacts field in redaction event'); @@ -409,8 +417,7 @@ export function message() { } await Message.deleteMessage(user, rcMessage); - } catch (err) { - logger.error({ msg: 'Failed to process Matrix removal redaction', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix removal redaction', err }), + ); } diff --git a/ee/packages/federation-matrix/src/events/ping.ts b/ee/packages/federation-matrix/src/events/ping.ts index 204a49fdf891d..9142bb2405fa4 100644 --- a/ee/packages/federation-matrix/src/events/ping.ts +++ b/ee/packages/federation-matrix/src/events/ping.ts @@ -1,7 +1,14 @@ -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { Logger } from '@rocket.chat/logger'; + +const logger = new Logger('federation-matrix:ping'); export const ping = async () => { - federationSDK.eventEmitterService.on('homeserver.ping', async (data) => { - console.log('Message received from homeserver', data); - }); + federationSDK.eventEmitterService.on( + 'homeserver.ping', + async (data: HomeserverEventSignatures['homeserver.ping']) => { + logger.debug({ msg: 'Message received from homeserver', data }); + }, + (err: Error) => logger.error({ msg: 'Error handling homeserver ping', err }), + ); }; diff --git a/ee/packages/federation-matrix/src/events/reaction.ts b/ee/packages/federation-matrix/src/events/reaction.ts index 2871f0d9f5409..a1aa80e4714ad 100644 --- a/ee/packages/federation-matrix/src/events/reaction.ts +++ b/ee/packages/federation-matrix/src/events/reaction.ts @@ -1,5 +1,5 @@ import { Message, FederationMatrix } from '@rocket.chat/core-services'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; import { Logger } from '@rocket.chat/logger'; import { Users, Messages } from '@rocket.chat/models'; // Rooms import emojione from 'emojione'; @@ -7,8 +7,9 @@ import emojione from 'emojione'; const logger = new Logger('federation-matrix:reaction'); export function reaction() { - federationSDK.eventEmitterService.on('homeserver.matrix.reaction', async ({ event, event_id: eventId }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.reaction', + async ({ event, event_id: eventId }: HomeserverEventSignatures['homeserver.matrix.reaction']) => { const isSetReaction = event.content?.['m.relates_to']; const reactionTargetEventId = isSetReaction?.event_id; @@ -41,13 +42,13 @@ export function reaction() { const reactionEmoji = emojione.toShort(reactionKey); await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, true); await Messages.setFederationReactionEventId(internalUsername, rcMessage._id, reactionEmoji, eventId); - } catch (err) { - logger.error({ msg: 'Failed to process Matrix reaction', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix reaction', err }), + ); - federationSDK.eventEmitterService.on('homeserver.matrix.redaction', async ({ event }) => { - try { + federationSDK.eventEmitterService.on( + 'homeserver.matrix.redaction', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.redaction']) => { const redactedEventId = event.redacts; if (!redactedEventId) { logger.debug('No redacts field in redaction event'); @@ -85,8 +86,7 @@ export function reaction() { const reactionEmoji = emojione.toShort(reactionKey); await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, false); await Messages.unsetFederationReactionEventId(redactedEventId, rcMessage._id, reactionEmoji); - } catch (err) { - logger.error({ msg: 'Failed to process Matrix reaction redaction', err }); - } - }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix reaction redaction', err }), + ); } diff --git a/ee/packages/federation-matrix/src/events/room.ts b/ee/packages/federation-matrix/src/events/room.ts index 23b3688fd9373..101cd90e7a73b 100644 --- a/ee/packages/federation-matrix/src/events/room.ts +++ b/ee/packages/federation-matrix/src/events/room.ts @@ -1,87 +1,102 @@ import { Room } from '@rocket.chat/core-services'; -import { federationSDK } from '@rocket.chat/federation-sdk'; +import { federationSDK, type HomeserverEventSignatures } from '@rocket.chat/federation-sdk'; +import { Logger } from '@rocket.chat/logger'; import { Rooms, Users } from '@rocket.chat/models'; import { getUsernameServername } from '../helpers/getUsernameServername'; +const logger = new Logger('federation-matrix:room'); + export function room() { - federationSDK.eventEmitterService.on('homeserver.matrix.room.name', async ({ event }) => { - const { - room_id: roomId, - content: { name }, - sender: userId, - } = event; - - const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); - if (!localRoomId) { - throw new Error(`mapped room not found: ${roomId}`); - } - - const localUserId = await Users.findOneByUsername(userId, { projection: { _id: 1 } }); - if (!localUserId) { - throw new Error(`mapped user not found: ${userId}`); - } - - await Room.saveRoomName(localRoomId._id, localUserId._id, name); - }); - - federationSDK.eventEmitterService.on('homeserver.matrix.room.topic', async ({ event }) => { - const { - room_id: roomId, - content: { topic }, - sender: userId, - } = event; - - const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); - if (!localRoomId) { - throw new Error('mapped room not found'); - } - - const localUser = await Users.findOneByUsername(userId, { projection: { _id: 1, federation: 1, federated: 1 } }); - if (!localUser) { - throw new Error('mapped user not found'); - } - - await Room.saveRoomTopic(localRoomId._id, topic, { - _id: localUser._id, - username: userId, - federation: localUser.federation, - federated: localUser.federated, - }); - }); - - federationSDK.eventEmitterService.on('homeserver.matrix.room.role', async (data) => { - const { room_id: roomId, user_id: userId, sender_id: senderId, role } = data; - - const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); - if (!localRoomId) { - throw new Error('mapped room not found'); - } - - const serverName = federationSDK.getConfig('serverName'); - - const [allegedUsernameLocal, , allegedUserLocalIsLocal] = getUsernameServername(userId, serverName); - const localUserId = allegedUserLocalIsLocal && (await Users.findOneByUsername(allegedUsernameLocal, { projection: { _id: 1 } })); - - if (!allegedUserLocalIsLocal) { - return; - } - - if (!localUserId) { - throw new Error('mapped user not found'); - } - - const [senderUsername, , senderIsLocal] = getUsernameServername(senderId, serverName); - - if (senderIsLocal) { - return; - } - - const localSenderId = await Users.findOneByUsername(senderUsername, { projection: { _id: 1 } }); - if (!localSenderId) { - throw new Error('mapped user not found'); - } - - await Room.addUserRoleRoomScoped(localSenderId._id, localUserId._id, localRoomId._id, role); - }); + federationSDK.eventEmitterService.on( + 'homeserver.matrix.room.name', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.room.name']) => { + const { + room_id: roomId, + content: { name }, + sender: userId, + } = event; + + const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); + if (!localRoomId) { + throw new Error(`mapped room not found: ${roomId}`); + } + + const localUserId = await Users.findOneByUsername(userId, { projection: { _id: 1 } }); + if (!localUserId) { + throw new Error(`mapped user not found: ${userId}`); + } + + await Room.saveRoomName(localRoomId._id, localUserId._id, name); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix room name event', err }), + ); + + federationSDK.eventEmitterService.on( + 'homeserver.matrix.room.topic', + async ({ event }: HomeserverEventSignatures['homeserver.matrix.room.topic']) => { + const { + room_id: roomId, + content: { topic }, + sender: userId, + } = event; + + const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); + if (!localRoomId) { + throw new Error('mapped room not found'); + } + + const localUser = await Users.findOneByUsername(userId, { projection: { _id: 1, federation: 1, federated: 1 } }); + if (!localUser) { + throw new Error('mapped user not found'); + } + + await Room.saveRoomTopic(localRoomId._id, topic, { + _id: localUser._id, + username: userId, + federation: localUser.federation, + federated: localUser.federated, + }); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix room topic event', err }), + ); + + federationSDK.eventEmitterService.on( + 'homeserver.matrix.room.role', + async (data: HomeserverEventSignatures['homeserver.matrix.room.role']) => { + const { room_id: roomId, user_id: userId, sender_id: senderId, role } = data; + + const localRoomId = await Rooms.findOne({ 'federation.mrid': roomId }, { projection: { _id: 1 } }); + if (!localRoomId) { + throw new Error('mapped room not found'); + } + + const serverName = federationSDK.getConfig('serverName'); + + const [allegedUsernameLocal, , allegedUserLocalIsLocal] = getUsernameServername(userId, serverName); + const localUserId = allegedUserLocalIsLocal && (await Users.findOneByUsername(allegedUsernameLocal, { projection: { _id: 1 } })); + + if (!allegedUserLocalIsLocal) { + return; + } + + if (!localUserId) { + throw new Error('mapped user not found'); + } + + const [senderUsername, , senderIsLocal] = getUsernameServername(senderId, serverName); + + if (senderIsLocal) { + return; + } + + const localSenderId = await Users.findOneByUsername(senderUsername, { projection: { _id: 1 } }); + if (!localSenderId) { + throw new Error('mapped user not found'); + } + + await Room.addUserRoleRoomScoped(localSenderId._id, localUserId._id, localRoomId._id, role); + }, + (err: Error) => logger.error({ msg: 'Failed to process Matrix room role event', err }), + ); } diff --git a/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts new file mode 100644 index 0000000000000..5155b24b041cb --- /dev/null +++ b/ee/packages/federation-matrix/src/helpers/metricsHelpers.ts @@ -0,0 +1,158 @@ +import type { IMessage } from '@rocket.chat/core-typings'; +import client from 'prom-client'; + +const percentiles = [0.01, 0.1, 0.5, 0.9, 0.95, 0.99, 1]; + +/** + * Gets an existing metric from the registry or creates it if it doesn't exist. + * This ensures we don't get duplicate registration errors when the same metric + * is accessed from different parts of the application. + */ +function getOrCreateMetric(name: string, createFn: () => T): T { + const existing = client.register.getSingleMetric(name); + if (existing) { + return existing as T; + } + return createFn(); +} + +/** + * Federation metrics for outgoing operations. + * Incoming metrics are now collected by the SDK's EventEmitterService. + */ +export const federationMetrics = { + // ===================================== + // OUTGOING METRICS + // ===================================== + + /** Counter for federation events processed (both incoming and outgoing) */ + get federationEventsProcessed() { + return getOrCreateMetric( + 'rocketchat_federation_events_processed', + () => + new client.Counter({ + name: 'rocketchat_federation_events_processed', + labelNames: ['event_type', 'direction'], + help: 'Total federation events processed', + }), + ); + }, + + /** Counter for failed federation events (both incoming and outgoing) */ + get federationEventsFailed() { + return getOrCreateMetric( + 'rocketchat_federation_events_failed', + () => + new client.Counter({ + name: 'rocketchat_federation_events_failed', + labelNames: ['event_type', 'direction', 'error_type'], + help: 'Total federation events that failed to process', + }), + ); + }, + + /** Counter for messages sent to federated rooms */ + get federatedMessagesSent() { + return getOrCreateMetric( + 'rocketchat_federation_messages_sent', + () => + new client.Counter({ + name: 'rocketchat_federation_messages_sent', + labelNames: ['room_type', 'message_type'], + help: 'Total messages sent to federated rooms', + }), + ); + }, + + /** Counter for federated rooms created */ + get federatedRoomsCreated() { + return getOrCreateMetric( + 'rocketchat_federation_rooms_created', + () => + new client.Counter({ + name: 'rocketchat_federation_rooms_created', + labelNames: ['room_type'], + help: 'Total federated rooms created', + }), + ); + }, + + /** Counter for federation invites sent */ + get federatedInvitesSent() { + return getOrCreateMetric( + 'rocketchat_federation_invites_sent', + () => + new client.Counter({ + name: 'rocketchat_federation_invites_sent', + labelNames: ['room_type'], + help: 'Total federation invites sent', + }), + ); + }, + + /** Counter for reactions sent/removed */ + get federatedReactionsSent() { + return getOrCreateMetric( + 'rocketchat_federation_reactions_sent', + () => + new client.Counter({ + name: 'rocketchat_federation_reactions_sent', + labelNames: ['action'], + help: 'Total reactions sent or removed via federation', + }), + ); + }, + + /** Duration to send a message via federation */ + get federationOutgoingMessageSendDuration() { + return getOrCreateMetric( + 'rocketchat_federation_outgoing_message_send_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_outgoing_message_send_duration_seconds', + labelNames: ['message_type'], + help: 'Time to send a message via federation', + percentiles, + }), + ); + }, + + /** Duration to create a federated room */ + get federationRoomCreateDuration() { + return getOrCreateMetric( + 'rocketchat_federation_room_create_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_room_create_duration_seconds', + labelNames: ['room_type'], + help: 'Time to create a federated room', + percentiles, + }), + ); + }, + + /** Duration to send an invitation via federation */ + get federationInviteSendDuration() { + return getOrCreateMetric( + 'rocketchat_federation_invite_send_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_invite_send_duration_seconds', + labelNames: ['room_type'], + help: 'Time to send an invitation via federation', + percentiles, + }), + ); + }, +}; + +/** + * Determines the message type from a Rocket.Chat message for outgoing metrics labeling. + * @returns 'text' | 'file' + */ +export function determineOutgoingMessageType(message: IMessage): 'text' | 'file' { + if (message.files && message.files.length > 0) { + return 'file'; + } + return 'text'; +} diff --git a/yarn.lock b/yarn.lock index fc3cdb206a58c..021e1782ad527 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5300,7 +5300,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.9.0": +"@opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.4.0, @opentelemetry/api@npm:^1.9.0": version: 1.9.0 resolution: "@opentelemetry/api@npm:1.9.0" checksum: 10/a607f0eef971893c4f2ee2a4c2069aade6ec3e84e2a1f5c2aac19f65c5d9eeea41aa72db917c1029faafdd71789a1a040bdc18f40d63690e22ccae5d7070f194 @@ -8345,6 +8345,7 @@ __metadata: mongodb: "npm:6.16.0" pino: "npm:^8.21.0" pino-pretty: "npm:^7.6.1" + prom-client: "npm:^15.1.3" reflect-metadata: "npm:^0.2.2" sanitize-html: "npm:~2.17.0" tsyringe: "npm:^4.10.0" @@ -30503,6 +30504,16 @@ __metadata: languageName: node linkType: hard +"prom-client@npm:^15.1.3": + version: 15.1.3 + resolution: "prom-client@npm:15.1.3" + dependencies: + "@opentelemetry/api": "npm:^1.4.0" + tdigest: "npm:^0.1.1" + checksum: 10/eba75e15ab896845d39359e3a4d6f7913ea05339b3122d8dde8c8c374669ad1a1d1ab2694ab2101c420bd98086a564e4f2a18aa29018fc14a4732e57c1c19aec + languageName: node + linkType: hard + "prometheus-gc-stats@npm:^1.1.0": version: 1.1.0 resolution: "prometheus-gc-stats@npm:1.1.0"