diff --git a/apps/meteor/client/lib/e2ee/rocketchat.e2e.room.ts b/apps/meteor/client/lib/e2ee/rocketchat.e2e.room.ts index 69732d0cb859f..ac97caba1395a 100644 --- a/apps/meteor/client/lib/e2ee/rocketchat.e2e.room.ts +++ b/apps/meteor/client/lib/e2ee/rocketchat.e2e.room.ts @@ -1,5 +1,6 @@ import { Base64 } from '@rocket.chat/base64'; -import type { IE2EEMessage, IMessage, IRoom, ISubscription, IUser, IUploadWithUser, AtLeast } from '@rocket.chat/core-typings'; +import type { IE2EEMessage, IMessage, IRoom, ISubscription, IUser, AtLeast, EncryptedMessageContent } from '@rocket.chat/core-typings'; +import { isEncryptedMessageContent } from '@rocket.chat/core-typings'; import { Emitter } from '@rocket.chat/emitter'; import type { Optional } from '@tanstack/react-query'; import EJSON from 'ejson'; @@ -670,11 +671,9 @@ export class E2ERoom extends Emitter { return this.encryptText(data); } - async decryptContent(data: T) { - if (data.content && data.content.algorithm === 'rc.v1.aes-sha2') { - const content = await this.decrypt(data.content.ciphertext); - Object.assign(data, content); - } + async decryptContent(data: T) { + const content = await this.decrypt(data.content.ciphertext); + Object.assign(data, content); return data; } @@ -693,7 +692,7 @@ export class E2ERoom extends Emitter { } } - message = await this.decryptContent(message); + message = isEncryptedMessageContent(message) ? await this.decryptContent(message) : message; return { ...message, diff --git a/apps/meteor/client/lib/e2ee/rocketchat.e2e.ts b/apps/meteor/client/lib/e2ee/rocketchat.e2e.ts index 268cdfa4c8aeb..cd42765297d49 100644 --- a/apps/meteor/client/lib/e2ee/rocketchat.e2e.ts +++ b/apps/meteor/client/lib/e2ee/rocketchat.e2e.ts @@ -2,7 +2,7 @@ import QueryString from 'querystring'; import URL from 'url'; import type { IE2EEMessage, IMessage, IRoom, ISubscription, IUser, IUploadWithUser, MessageAttachment } from '@rocket.chat/core-typings'; -import { isE2EEMessage } from '@rocket.chat/core-typings'; +import { isE2EEMessage, isEncryptedMessageContent } from '@rocket.chat/core-typings'; import { Emitter } from '@rocket.chat/emitter'; import { imperativeModal } from '@rocket.chat/ui-client'; import EJSON from 'ejson'; @@ -664,7 +664,7 @@ class E2E extends Emitter { } async decryptFileContent(file: IUploadWithUser): Promise { - if (!file.rid) { + if (!file.rid || !isEncryptedMessageContent(file)) { return file; } diff --git a/apps/meteor/server/services/messages/service.ts b/apps/meteor/server/services/messages/service.ts index 94c543da63593..85938ba38fb0f 100644 --- a/apps/meteor/server/services/messages/service.ts +++ b/apps/meteor/server/services/messages/service.ts @@ -90,12 +90,18 @@ export class MessageService extends ServiceClassInternal implements IMessageServ rid, msg, federation_event_id, + file, + files, + attachments, thread, }: { fromId: string; rid: string; msg: string; federation_event_id: string; + file?: IMessage['file']; + files?: IMessage['files']; + attachments?: IMessage['attachments']; thread?: { tmid: string; tshow: boolean }; }): Promise { return executeSendMessage(fromId, { @@ -103,6 +109,9 @@ export class MessageService extends ServiceClassInternal implements IMessageServ msg, ...thread, federation: { eventId: federation_event_id }, + ...(file && { file }), + ...(files && { files }), + ...(attachments && { attachments }), }); } diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index 546182a666b36..77d936646b2e0 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -16,6 +16,7 @@ import emojione from 'emojione'; import { getWellKnownRoutes } from './api/.well-known/server'; import { getMatrixInviteRoutes } from './api/_matrix/invite'; import { getKeyServerRoutes } from './api/_matrix/key/server'; +import { getMatrixMediaRoutes } from './api/_matrix/media'; import { getMatrixProfilesRoutes } from './api/_matrix/profiles'; import { getMatrixRoomsRoutes } from './api/_matrix/rooms'; import { getMatrixSendJoinRoutes } from './api/_matrix/send-join'; @@ -26,6 +27,16 @@ import { isLicenseEnabledMiddleware } from './api/middlewares/isLicenseEnabled'; import { registerEvents } from './events'; import { saveExternalUserIdForLocalUser } from './helpers/identifiers'; import { toExternalMessageFormat, toExternalQuoteMessageFormat } from './helpers/message.parsers'; +import { MatrixMediaService } from './services/MatrixMediaService'; + +type MatrixFileTypes = 'm.image' | 'm.video' | 'm.audio' | 'm.file'; + +export const fileTypes: Record = { + image: 'm.image', + video: 'm.video', + audio: 'm.audio', + file: 'm.file', +}; export class FederationMatrix extends ServiceClass implements IFederationMatrixService { protected name = 'federation-matrix'; @@ -100,6 +111,7 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS await createFederationContainer(containerOptions, config); instance.homeserverServices = getAllServices(); + MatrixMediaService.setHomeserverServices(instance.homeserverServices); instance.buildMatrixHTTPRoutes(); instance.onEvent('user.typing', async ({ isTyping, roomId, user: { username } }): Promise => { if (!roomId || !username) { @@ -170,7 +182,8 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS .use(getMatrixSendJoinRoutes(this.homeserverServices)) .use(getMatrixTransactionsRoutes(this.homeserverServices)) .use(getKeyServerRoutes(this.homeserverServices)) - .use(getFederationVersionsRoutes(this.homeserverServices)); + .use(getFederationVersionsRoutes(this.homeserverServices)) + .use(getMatrixMediaRoutes(this.homeserverServices)); wellKnown.use(isFederationEnabledMiddleware).use(isLicenseEnabledMiddleware).use(getWellKnownRoutes(this.homeserverServices)); @@ -396,6 +409,143 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS } } + private getMatrixMessageType(mimeType?: string): MatrixFileTypes { + const mainType = mimeType?.split('/')[0]; + if (!mainType) { + return fileTypes.file; + } + + return fileTypes[mainType] ?? fileTypes.file; + } + + private async handleFileMessage( + message: IMessage, + matrixRoomId: string, + matrixUserId: string, + matrixDomain: string, + ): Promise<{ eventId: string } | null> { + if (!message.files || message.files.length === 0) { + return null; + } + + try { + // TODO: Handle multiple files + const file = message.files[0]; + const mxcUri = await MatrixMediaService.prepareLocalFileForMatrix(file._id, matrixDomain); + + const msgtype = this.getMatrixMessageType(file.type); + const fileContent = { + body: file.name, + msgtype, + url: mxcUri, + info: { + mimetype: file.type, + size: file.size, + }, + }; + + return this.homeserverServices.message.sendFileMessage(matrixRoomId, fileContent, matrixUserId); + } catch (error) { + this.logger.error('Failed to handle file message', { + messageId: message._id, + error, + }); + throw error; + } + } + + private async handleTextMessage( + message: IMessage, + matrixRoomId: string, + matrixUserId: string, + matrixDomain: string, + ): Promise<{ eventId: string } | null> { + const parsedMessage = await toExternalMessageFormat({ + message: message.msg, + externalRoomId: matrixRoomId, + homeServerDomain: matrixDomain, + }); + + if (message.tmid) { + return this.handleThreadedMessage(message, matrixRoomId, matrixUserId, matrixDomain, parsedMessage); + } + + if (message.attachments?.some((attachment) => isQuoteAttachment(attachment) && Boolean(attachment.message_link))) { + return this.handleQuoteMessage(message, matrixRoomId, matrixUserId, matrixDomain); + } + + return this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, parsedMessage, matrixUserId); + } + + private async handleThreadedMessage( + message: IMessage, + matrixRoomId: string, + matrixUserId: string, + matrixDomain: string, + parsedMessage: string, + ): Promise<{ eventId: string } | null> { + if (!message.tmid) { + throw new Error('Thread message ID not found'); + } + + const threadRootMessage = await Messages.findOneById(message.tmid); + const threadRootEventId = threadRootMessage?.federation?.eventId; + + if (!threadRootEventId) { + this.logger.warn('Thread root event ID not found, sending as regular message'); + if (message.attachments?.some((attachment) => isQuoteAttachment(attachment) && Boolean(attachment.message_link))) { + return this.handleQuoteMessage(message, matrixRoomId, matrixUserId, matrixDomain); + } + return this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, parsedMessage, matrixUserId); + } + + const latestThreadMessage = await Messages.findLatestFederationThreadMessageByTmid(message.tmid, message._id); + const latestThreadEventId = latestThreadMessage?.federation?.eventId; + + if (message.attachments?.some((attachment) => isQuoteAttachment(attachment) && Boolean(attachment.message_link))) { + const quoteMessage = await this.getQuoteMessage(message, matrixRoomId, matrixUserId, matrixDomain); + if (!quoteMessage) { + throw new Error('Failed to retrieve quote message'); + } + return this.homeserverServices.message.sendReplyToInsideThreadMessage( + matrixRoomId, + quoteMessage.rawMessage, + quoteMessage.formattedMessage, + matrixUserId, + threadRootEventId, + quoteMessage.eventToReplyTo, + ); + } + + return this.homeserverServices.message.sendThreadMessage( + matrixRoomId, + message.msg, + parsedMessage, + matrixUserId, + threadRootEventId, + latestThreadEventId, + ); + } + + private async handleQuoteMessage( + message: IMessage, + matrixRoomId: string, + matrixUserId: string, + matrixDomain: string, + ): Promise<{ eventId: string } | null> { + const quoteMessage = await this.getQuoteMessage(message, matrixRoomId, matrixUserId, matrixDomain); + if (!quoteMessage) { + throw new Error('Failed to retrieve quote message'); + } + return this.homeserverServices.message.sendReplyToMessage( + matrixRoomId, + quoteMessage.rawMessage, + quoteMessage.formattedMessage, + quoteMessage.eventToReplyTo, + matrixUserId, + ); + } + async sendMessage(message: IMessage, room: IRoom, user: IUser): Promise { try { const matrixRoomId = await MatrixBridgedRoom.getExternalRoomId(room._id); @@ -417,84 +567,10 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS const actualMatrixUserId = existingMatrixUserId || matrixUserId; let result; - - const parsedMessage = await toExternalMessageFormat({ - message: message.msg, - externalRoomId: matrixRoomId, - homeServerDomain: this.serverName, - }); - if (!message.tmid) { - if (message.attachments?.some((attachment) => isQuoteAttachment(attachment) && Boolean(attachment.message_link))) { - const quoteMessage = await this.getQuoteMessage(message, matrixRoomId, actualMatrixUserId, this.serverName); - if (!quoteMessage) { - throw new Error('Failed to retrieve quote message'); - } - result = await this.homeserverServices.message.sendReplyToMessage( - matrixRoomId, - quoteMessage.rawMessage, - quoteMessage.formattedMessage, - quoteMessage.eventToReplyTo, - actualMatrixUserId, - ); - } else { - result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, parsedMessage, actualMatrixUserId); - } + if (message.files && message.files.length > 0) { + result = await this.handleFileMessage(message, matrixRoomId, actualMatrixUserId, this.serverName); } else { - const threadRootMessage = await Messages.findOneById(message.tmid); - const threadRootEventId = threadRootMessage?.federation?.eventId; - - if (threadRootEventId) { - const latestThreadMessage = await Messages.findOne( - { - 'tmid': message.tmid, - 'federation.eventId': { $exists: true }, - '_id': { $ne: message._id }, // Exclude the current message - }, - { sort: { ts: -1 } }, - ); - const latestThreadEventId = latestThreadMessage?.federation?.eventId; - - if (message.attachments?.some((attachment) => isQuoteAttachment(attachment) && Boolean(attachment.message_link))) { - const quoteMessage = await this.getQuoteMessage(message, matrixRoomId, actualMatrixUserId, this.serverName); - if (!quoteMessage) { - throw new Error('Failed to retrieve quote message'); - } - result = await this.homeserverServices.message.sendReplyToInsideThreadMessage( - matrixRoomId, - quoteMessage.rawMessage, - quoteMessage.formattedMessage, - actualMatrixUserId, - threadRootEventId, - quoteMessage.eventToReplyTo, - ); - } else { - result = await this.homeserverServices.message.sendThreadMessage( - matrixRoomId, - message.msg, - parsedMessage, - actualMatrixUserId, - threadRootEventId, - latestThreadEventId, - ); - } - } else { - this.logger.warn('Thread root event ID not found, sending as regular message'); - if (message.attachments?.some((attachment) => isQuoteAttachment(attachment) && Boolean(attachment.message_link))) { - const quoteMessage = await this.getQuoteMessage(message, matrixRoomId, actualMatrixUserId, this.serverName); - if (!quoteMessage) { - throw new Error('Failed to retrieve quote message'); - } - result = await this.homeserverServices.message.sendReplyToMessage( - matrixRoomId, - quoteMessage.rawMessage, - quoteMessage.formattedMessage, - quoteMessage.eventToReplyTo, - actualMatrixUserId, - ); - } else { - result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, parsedMessage, actualMatrixUserId); - } - } + result = await this.handleTextMessage(message, matrixRoomId, actualMatrixUserId, this.serverName); } if (!result) { diff --git a/ee/packages/federation-matrix/src/api/_matrix/media.ts b/ee/packages/federation-matrix/src/api/_matrix/media.ts new file mode 100644 index 0000000000000..650452ca09844 --- /dev/null +++ b/ee/packages/federation-matrix/src/api/_matrix/media.ts @@ -0,0 +1,153 @@ +import crypto from 'crypto'; + +import type { HomeserverServices } from '@hs/federation-sdk'; +import type { IUpload } from '@rocket.chat/core-typings'; +import { Router } from '@rocket.chat/http-router'; +import { ajv } from '@rocket.chat/rest-typings/dist/v1/Ajv'; + +import { MatrixMediaService } from '../../services/MatrixMediaService'; +import { canAccessMedia } from '../middlewares'; + +const MediaDownloadParamsSchema = { + type: 'object', + properties: { + mediaId: { type: 'string' }, + }, + required: ['mediaId'], + additionalProperties: false, +}; + +const ErrorResponseSchema = { + type: 'object', + properties: { + errcode: { type: 'string' }, + error: { type: 'string' }, + }, + required: ['errcode', 'error'], +}; + +const BufferResponseSchema = { + type: 'object', + description: 'Raw file buffer or multipart response', +}; + +const isMediaDownloadParamsProps = ajv.compile(MediaDownloadParamsSchema); +const isErrorResponseProps = ajv.compile(ErrorResponseSchema); +const isBufferResponseProps = ajv.compile(BufferResponseSchema); + +const SECURITY_HEADERS = { + 'X-Content-Type-Options': 'nosniff', + 'X-Frame-Options': 'DENY', + 'Content-Security-Policy': "default-src 'none'; img-src 'self'; media-src 'self'", + 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', +}; + +function createMultipartResponse( + buffer: Buffer, + mimeType: string, + fileName: string, + metadata: Record = {}, +): { body: Buffer; contentType: string } { + const boundary = crypto.randomBytes(16).toString('hex'); + const parts: string[] = []; + + parts.push(`--${boundary}`, 'Content-Type: application/json', '', JSON.stringify(metadata)); + parts.push(`--${boundary}`, `Content-Type: ${mimeType}`, `Content-Disposition: attachment; filename="${fileName}"`, ''); + + const headerBuffer = Buffer.from(`${parts.join('\r\n')}\r\n`); + const endBoundary = Buffer.from(`\r\n--${boundary}--\r\n`); + + return { + body: Buffer.concat([headerBuffer, buffer, endBoundary]), + contentType: `multipart/mixed; boundary=${boundary}`, + }; +} + +async function getMediaFile(mediaId: string, serverName: string): Promise<{ file: IUpload; buffer: Buffer } | null> { + const file = await MatrixMediaService.getLocalFileForMatrixNode(mediaId, serverName); + if (!file) { + return null; + } + + const buffer = await MatrixMediaService.getLocalFileBuffer(file); + return { file, buffer }; +} + +export const getMatrixMediaRoutes = (homeserverServices: HomeserverServices) => { + const { config, federationAuth } = homeserverServices; + const router = new Router('/federation'); + + router.get( + '/v1/media/download/:mediaId', + { + params: isMediaDownloadParamsProps, + response: { + 200: isBufferResponseProps, + 401: isErrorResponseProps, + 403: isErrorResponseProps, + 404: isErrorResponseProps, + 429: isErrorResponseProps, + 500: isErrorResponseProps, + }, + tags: ['Federation', 'Media'], + }, + canAccessMedia(federationAuth), + async (c) => { + try { + const { mediaId } = c.req.param(); + const { serverName } = config; + + // TODO: Add file streaming support + const result = await getMediaFile(mediaId, serverName); + if (!result) { + return { + statusCode: 404, + body: { errcode: 'M_NOT_FOUND', error: 'Media not found' }, + }; + } + + const { file, buffer } = result; + + const mimeType = file.type || 'application/octet-stream'; + const fileName = file.name || mediaId; + + const multipartResponse = createMultipartResponse(buffer, mimeType, fileName); + + return { + statusCode: 200, + headers: { + ...SECURITY_HEADERS, + 'content-type': multipartResponse.contentType, + 'content-length': String(multipartResponse.body.length), + }, + body: multipartResponse.body, + }; + } catch (error) { + return { + statusCode: 500, + body: { errcode: 'M_UNKNOWN', error: 'Internal server error' }, + }; + } + }, + ); + + router.get( + '/v1/media/thumbnail/:mediaId', + { + params: isMediaDownloadParamsProps, + response: { + 404: isErrorResponseProps, + }, + tags: ['Federation', 'Media'], + }, + async () => ({ + statusCode: 404, + body: { + errcode: 'M_UNRECOGNIZED', + error: 'This endpoint is not implemented on the homeserver side', + }, + }), + ); + + return router; +}; diff --git a/ee/packages/federation-matrix/src/api/middlewares.ts b/ee/packages/federation-matrix/src/api/middlewares.ts index bcfff8dada781..a919cc0758103 100644 --- a/ee/packages/federation-matrix/src/api/middlewares.ts +++ b/ee/packages/federation-matrix/src/api/middlewares.ts @@ -3,6 +3,35 @@ import { errCodes } from '@hs/federation-sdk'; import type { EventID } from '@hs/room'; import type { Context, Next } from 'hono'; +export const canAccessMedia = (federationAuth: EventAuthorizationService) => async (c: Context, next: Next) => { + try { + const url = new URL(c.req.url); + const path = url.search ? `${c.req.path}${url.search}` : c.req.path; + + const verificationResult = await federationAuth.canAccessMediaFromAuthorizationHeader( + c.req.param('mediaId'), + c.req.header('Authorization') || '', + c.req.method, + path, + undefined, + ); + + if (!verificationResult.authorized) { + return c.json( + { + errcode: errCodes[verificationResult.errorCode].errcode, + error: errCodes[verificationResult.errorCode].error, + }, + errCodes[verificationResult.errorCode].status, + ); + } + + return next(); + } catch (error) { + return c.json(errCodes.M_UNKNOWN, 500); + } +}; + export const canAccessEvent = (federationAuth: EventAuthorizationService) => async (c: Context, next: Next) => { try { const url = new URL(c.req.url); diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index b1eeb4c305f46..aa99a2b79c81d 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -1,131 +1,252 @@ import type { HomeserverEventSignatures } from '@hs/federation-sdk'; import { FederationMatrix, Message, MeteorService } from '@rocket.chat/core-services'; import { UserStatus } from '@rocket.chat/core-typings'; -import type { IUser } from '@rocket.chat/core-typings'; +import type { IUser, IRoom } from '@rocket.chat/core-typings'; import type { Emitter } from '@rocket.chat/emitter'; import { Logger } from '@rocket.chat/logger'; import { Users, MatrixBridgedUser, MatrixBridgedRoom, Rooms, Subscriptions, Messages } from '@rocket.chat/models'; +import { fileTypes } from '../FederationMatrix'; import { toInternalMessageFormat, toInternalQuoteMessageFormat } from '../helpers/message.parsers'; +import { MatrixMediaService } from '../services/MatrixMediaService'; const logger = new Logger('federation-matrix:message'); -export function message(emitter: Emitter, serverName: string) { - emitter.on('homeserver.matrix.message', async (data) => { - try { - const message = data.content?.body?.toString(); - if (!message) { - logger.debug('No message found in event content'); - return; - } +async function getOrCreateFederatedUser(matrixUserId: string): Promise { + const [userPart, domain] = matrixUserId.split(':'); + if (!userPart || !domain) { + logger.error('Invalid Matrix sender ID format:', matrixUserId); + return null; + } + const username = userPart.substring(1); - const content = data.content as any; - const replyToRelation = content?.['m.relates_to']; - const isThreadMessage = replyToRelation?.rel_type === 'm.thread'; - const isQuoteMessage = replyToRelation?.['m.in_reply_to']?.event_id && !replyToRelation?.is_falling_back; - const threadRootEventId = isThreadMessage ? replyToRelation.event_id : undefined; + const user = await Users.findOneByUsername(matrixUserId); + if (user) { + await MatrixBridgedUser.createOrUpdateByLocalId(user._id, matrixUserId, false, domain); + return user; + } - const [userPart, domain] = data.sender.split(':'); - if (!userPart || !domain) { - logger.error('Invalid Matrix sender ID format:', data.sender); - return; - } - const username = userPart.substring(1); + logger.info('Creating new federated user:', { username: matrixUserId, externalId: matrixUserId }); - const internalUsername = data.sender; - let user = await Users.findOneByUsername(internalUsername); + const userData = { + username: matrixUserId, + name: username, // TODO: Fetch display name from Matrix profile + type: 'user', + status: UserStatus.ONLINE, + active: true, + roles: ['user'], + requirePasswordChange: false, + federated: true, + federation: { + version: 1, + }, + createdAt: new Date(), + _updatedAt: new Date(), + }; - if (!user) { - logger.info('Creating new federated user:', { username: internalUsername, externalId: data.sender }); - - const userData: Partial = { - username: internalUsername, - name: username, // TODO: Fetch display name from Matrix profile - type: 'user', - status: UserStatus.ONLINE, - active: true, - roles: ['user'], - requirePasswordChange: false, - federated: true, // Mark as federated user - federation: { - version: 1, - }, - createdAt: new Date(), - _updatedAt: new Date(), - }; + const { insertedId } = await Users.insertOne(userData); - const { insertedId } = await Users.insertOne(userData as IUser); + await MatrixBridgedUser.createOrUpdateByLocalId( + insertedId, + matrixUserId, + true, // isRemote = true for external Matrix users + domain, + ); - await MatrixBridgedUser.createOrUpdateByLocalId( - insertedId, - data.sender, - true, // isRemote = true for external Matrix users - domain, - ); + const newUser = await Users.findOneById(insertedId); + if (!newUser) { + logger.error('Failed to create user:', matrixUserId); + return null; + } - user = await Users.findOneById(insertedId); - if (!user) { - logger.error('Failed to create user:', internalUsername); - return; - } + logger.info('Successfully created federated user:', { userId: newUser._id, username }); - logger.info('Successfully created federated user:', { userId: user._id, username }); - } else { - await MatrixBridgedUser.createOrUpdateByLocalId(user._id, data.sender, false, domain); - } + return newUser; +} - const internalRoomId = await MatrixBridgedRoom.getLocalRoomId(data.room_id); - if (!internalRoomId) { - logger.error('Room not found in bridge mapping:', data.room_id); - // TODO: Handle room creation for unknown federated rooms - return; - } +async function getRoomAndEnsureSubscription(matrixRoomId: string, user: IUser): Promise { + const internalRoomId = await MatrixBridgedRoom.getLocalRoomId(matrixRoomId); + if (!internalRoomId) { + logger.error('Room not found in bridge mapping:', matrixRoomId); + // TODO: Handle room creation for unknown federated rooms + return null; + } - const room = await Rooms.findOneById(internalRoomId); - if (!room) { - logger.error('Room not found:', internalRoomId); - return; - } + const room = await Rooms.findOneById(internalRoomId); + if (!room) { + logger.error('Room not found:', internalRoomId); + return null; + } - if (!room.federated) { - logger.error('Room is not marked as federated:', { roomId: room._id, matrixRoomId: data.room_id }); - // TODO: Should we update the room to be federated? - } + if (!room.federated) { + logger.error('Room is not marked as federated:', { roomId: room._id, matrixRoomId }); + // TODO: Should we update the room to be federated? + } - const existingSubscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, user._id); + const existingSubscription = await Subscriptions.findOneByRoomIdAndUserId(room._id, user._id); - if (!existingSubscription) { - logger.info('Creating subscription for federated user in room:', { userId: user._id, roomId: room._id }); + if (existingSubscription) { + return room; + } - const { insertedId } = await Subscriptions.createWithRoomAndUser(room, user, { - ts: new Date(), - open: false, - alert: false, - unread: 0, - userMentions: 0, - groupMentions: 0, - // Federation status is inherited from room.federated and user.federated - }); + logger.info('Creating subscription for federated user in room:', { userId: user._id, roomId: room._id }); - if (insertedId) { - logger.debug('Successfully created subscription:', insertedId); - // TODO: Import and use notifyOnSubscriptionChangedById if needed - // void notifyOnSubscriptionChangedById(insertedId, 'inserted'); - } + const { insertedId } = await Subscriptions.createWithRoomAndUser(room, user, { + ts: new Date(), + open: false, + alert: false, + unread: 0, + userMentions: 0, + groupMentions: 0, + }); + + if (insertedId) { + logger.debug('Successfully created subscription:', insertedId); + // TODO: Import and use notifyOnSubscriptionChangedById if needed + } + + return room; +} + +async function getThreadMessageId(threadRootEventId: string): Promise<{ tmid: string; tshow: boolean } | undefined> { + const threadRootMessage = await Messages.findOneByFederationId(threadRootEventId); + if (!threadRootMessage) { + logger.warn('Thread root message not found for event:', threadRootEventId); + return; + } + + const shouldSetTshow = !threadRootMessage?.tcount; + return { tmid: threadRootMessage._id, tshow: shouldSetTshow }; +} + +async function handleMediaMessage( + // TODO improve typing + content: any, + msgtype: string, + messageBody: string, + user: IUser, + room: IRoom, + eventId: string, + tmid?: string, +): Promise<{ + fromId: string; + rid: string; + msg: string; + federation_event_id: string; + tmid?: string; + file: any; + files: any[]; + attachments: any[]; +}> { + const fileInfo = content.info; + const mimeType = fileInfo.mimetype; + const fileName = messageBody; + + const fileRefId = await MatrixMediaService.downloadAndStoreRemoteFile(content.url, { + name: messageBody, + size: fileInfo.size, + type: mimeType, + roomId: room._id, + userId: user._id, + }); + + let fileExtension = ''; + if (fileName && fileName.includes('.')) { + fileExtension = fileName.split('.').pop()?.toLowerCase() || ''; + } else if (mimeType && mimeType.includes('/')) { + fileExtension = mimeType.split('/')[1] || ''; + if (fileExtension === 'jpeg') { + fileExtension = 'jpg'; + } + } + + const fileUrl = `/file-upload/${fileRefId}/${encodeURIComponent(fileName)}`; + + // TODO improve typing + const attachment: any = { + title: fileName, + type: 'file', + title_link: fileUrl, + title_link_download: true, + }; + + if (msgtype === 'm.image') { + attachment.image_url = fileUrl; + attachment.image_type = mimeType; + attachment.image_size = fileInfo.size || 0; + attachment.description = ''; + if (fileInfo.w && fileInfo.h) { + attachment.image_dimensions = { + width: fileInfo.w, + height: fileInfo.h, + }; + } + } else if (msgtype === 'm.video') { + attachment.video_url = fileUrl; + attachment.video_type = mimeType; + attachment.video_size = fileInfo.size || 0; + attachment.description = ''; + } else if (msgtype === 'm.audio') { + attachment.audio_url = fileUrl; + attachment.audio_type = mimeType; + attachment.audio_size = fileInfo.size || 0; + attachment.description = ''; + } else { + attachment.description = ''; + } + + const fileData = { + _id: fileRefId, + name: fileName, + type: mimeType, + size: fileInfo.size || 0, + format: fileExtension, + }; + + return { + fromId: user._id, + rid: room._id, + msg: '', + federation_event_id: eventId, + tmid, + file: fileData, + files: [fileData], + attachments: [attachment], + }; +} + +export function message(emitter: Emitter, serverName: string) { + emitter.on('homeserver.matrix.message', async (data) => { + try { + // TODO remove type casting + const content = data.content as any; + const msgtype = content?.msgtype; + const messageBody = content?.body?.toString(); + + if (!messageBody && !msgtype) { + logger.debug('No message content found in event'); + return; } - let thread: { tmid: string; tshow: boolean } | undefined; - if (isThreadMessage && threadRootEventId) { - const threadRootMessage = await Messages.findOneByFederationId(threadRootEventId); - if (!threadRootMessage) { - logger.warn('Thread root message not found for event:', threadRootEventId); - return; - } + const user = await getOrCreateFederatedUser(data.sender); + if (!user) { + return; + } - const shouldSetTshow = !threadRootMessage?.tcount; - thread = { tmid: threadRootMessage._id, tshow: shouldSetTshow }; + const room = await getRoomAndEnsureSubscription(data.room_id, user); + if (!room) { + return; } + const replyToRelation = content?.['m.relates_to']; + const threadRelation = content?.['m.relates_to']; + const isThreadMessage = threadRelation?.rel_type === 'm.thread'; + const isQuoteMessage = replyToRelation?.['m.in_reply_to']?.event_id && !replyToRelation?.is_falling_back; + const threadRootEventId = isThreadMessage ? threadRelation.event_id : undefined; + const thread = await getThreadMessageId(threadRootEventId); + + const isMediaMessage = Object.values(fileTypes).includes(msgtype); + const isEditedMessage = data.content['m.relates_to']?.rel_type === 'm.replace'; if (isEditedMessage && data.content['m.relates_to']?.event_id && data.content['m.new_content']) { logger.debug('Received edited message from Matrix, updating existing message'); @@ -152,7 +273,7 @@ export function message(emitter: Emitter, serverName: const formatted = await toInternalQuoteMessageFormat({ messageToReplyToUrl, formattedMessage: data.content.formatted_body || '', - rawMessage: message, + rawMessage: messageBody, homeServerDomain: serverName, senderExternalId: data.sender, }); @@ -183,23 +304,24 @@ export function message(emitter: Emitter, serverName: ); return; } + if (isQuoteMessage && room.name) { const originalMessage = await Messages.findOneByFederationId(replyToRelation?.['m.in_reply_to']?.event_id); if (!originalMessage) { - logger.error('Original message not found for edit:', replyToRelation?.['m.in_reply_to']?.event_id); + logger.error('Original message not found for quote:', replyToRelation?.['m.in_reply_to']?.event_id); return; } const messageToReplyToUrl = await MeteorService.getMessageURLToReplyTo(room.t as string, room._id, room.name, originalMessage._id); const formatted = await toInternalQuoteMessageFormat({ messageToReplyToUrl, formattedMessage: data.content.formatted_body || '', - rawMessage: message, + rawMessage: messageBody, homeServerDomain: serverName, senderExternalId: data.sender, }); await Message.saveMessageFromFederation({ fromId: user._id, - rid: internalRoomId, + rid: room._id, msg: formatted, federation_event_id: data.event_id, thread, @@ -207,19 +329,24 @@ export function message(emitter: Emitter, serverName: return; } - const formatted = await toInternalMessageFormat({ - rawMessage: message, - formattedMessage: data.content.formatted_body || '', - homeServerDomain: serverName, - senderExternalId: data.sender, - }); - await Message.saveMessageFromFederation({ - fromId: user._id, - rid: internalRoomId, - msg: formatted, - federation_event_id: data.event_id, - thread, - }); + if (isMediaMessage && content?.url) { + const result = await handleMediaMessage(content, msgtype, messageBody, user, room, data.event_id, thread?.tmid); + await Message.saveMessageFromFederation(result); + } else { + const formatted = toInternalMessageFormat({ + rawMessage: messageBody, + formattedMessage: data.content.formatted_body || '', + homeServerDomain: serverName, + senderExternalId: data.sender, + }); + await Message.saveMessageFromFederation({ + fromId: user._id, + rid: room._id, + msg: formatted, + federation_event_id: data.event_id, + thread, + }); + } } catch (error) { logger.error('Error processing Matrix message:', error); } diff --git a/ee/packages/federation-matrix/src/services/MatrixMediaService.ts b/ee/packages/federation-matrix/src/services/MatrixMediaService.ts new file mode 100644 index 0000000000000..6c7aed293e421 --- /dev/null +++ b/ee/packages/federation-matrix/src/services/MatrixMediaService.ts @@ -0,0 +1,148 @@ +import type { HomeserverServices } from '@hs/federation-sdk'; +import { Upload } from '@rocket.chat/core-services'; +import type { IUpload } from '@rocket.chat/core-typings'; +import { Logger } from '@rocket.chat/logger'; +import { Uploads } from '@rocket.chat/models'; + +const logger = new Logger('federation-matrix:media-service'); + +export interface IRemoteFileReference { + name: string; + size: number; + type: string; + mxcUri: string; + serverName: string; + mediaId: string; +} + +export class MatrixMediaService { + private static homeserverServices: HomeserverServices; + + static setHomeserverServices(services: HomeserverServices): void { + this.homeserverServices = services; + } + + static generateMXCUri(fileId: string, serverName: string): string { + return `mxc://${serverName}/${fileId}`; + } + + static parseMXCUri(mxcUri: string): { serverName: string; mediaId: string } | null { + const match = mxcUri.match(/^mxc:\/\/([^/]+)\/(.+)$/); + if (!match) { + logger.error('Invalid MXC URI format', { mxcUri }); + return null; + } + return { + serverName: match[1], + mediaId: match[2], + }; + } + + static async prepareLocalFileForMatrix(fileId: string, serverName: string): Promise { + try { + const file = await Uploads.findOneById(fileId); + if (!file) { + logger.error(`File ${fileId} not found in database`); + throw new Error(`File ${fileId} not found`); + } + + if (file.federation?.mxcUri) { + return file.federation.mxcUri; + } + + const mxcUri = this.generateMXCUri(fileId, serverName); + + await Uploads.setFederationInfo(fileId, { + mxcUri, + serverName, + mediaId: fileId, + }); + + return mxcUri; + } catch (error) { + logger.error('Error preparing file for Matrix:', error); + throw error; + } + } + + static async getLocalFileForMatrixNode(mediaId: string, serverName: string): Promise { + try { + let file = await Uploads.findByFederationMediaIdAndServerName(mediaId, serverName); + + if (!file) { + file = await Uploads.findOneById(mediaId); + } + + if (!file) { + return null; + } + + return file; + } catch (error) { + logger.error('Error retrieving local file:', error); + return null; + } + } + + static async downloadAndStoreRemoteFile( + mxcUri: string, + metadata: { + name: string; + size: number; + type: string; + messageId?: string; + roomId?: string; + userId?: string; + }, + ): Promise { + try { + const parts = this.parseMXCUri(mxcUri); + if (!parts) { + logger.error('Invalid MXC URI format', { mxcUri }); + throw new Error('Invalid MXC URI'); + } + + const uploadAlreadyExists = await Uploads.findByFederationMediaIdAndServerName(parts.mediaId, parts.serverName); + if (uploadAlreadyExists) { + return uploadAlreadyExists._id; + } + + if (!this.homeserverServices) { + throw new Error('Homeserver services not initialized. Call setHomeserverServices first.'); + } + + const buffer = await this.homeserverServices.media.downloadFromRemoteServer(parts.serverName, parts.mediaId); + if (!buffer) { + throw new Error('Download from remote server returned null content.'); + } + + // TODO: Make uploadFile support Partial to avoid calling a DB update right after the upload to set the federation info + const uploadedFile = await Upload.uploadFile({ + userId: metadata.userId || 'federation', + buffer, + details: { + name: metadata.name || 'unnamed', + size: buffer.length, + type: metadata.type || 'application/octet-stream', + rid: metadata.roomId, + userId: metadata.userId || 'federation', + }, + }); + + await Uploads.setFederationInfo(uploadedFile._id, { + mxcUri, + serverName: parts.serverName, + mediaId: parts.mediaId, + }); + + return uploadedFile._id; + } catch (error) { + logger.error('Error downloading and storing remote file:', error); + throw error; + } + } + + static async getLocalFileBuffer(file: IUpload): Promise { + return Upload.getFileBuffer({ file }); + } +} diff --git a/packages/core-services/src/types/IMessageService.ts b/packages/core-services/src/types/IMessageService.ts index 223dbc92dcfbd..793781bcfe070 100644 --- a/packages/core-services/src/types/IMessageService.ts +++ b/packages/core-services/src/types/IMessageService.ts @@ -14,12 +14,18 @@ export interface IMessageService { rid, msg, federation_event_id, + file, + files, + attachments, thread, }: { fromId: string; rid: string; msg: string; federation_event_id: string; + file?: IMessage['file']; + files?: IMessage['files']; + attachments?: IMessage['attachments']; thread?: { tmid: string; tshow: boolean }; }): Promise; saveSystemMessageAndNotifyUser( diff --git a/packages/core-typings/src/IMessage/IMessage.ts b/packages/core-typings/src/IMessage/IMessage.ts index d1dc67b00230c..ae265f26d77ab 100644 --- a/packages/core-typings/src/IMessage/IMessage.ts +++ b/packages/core-typings/src/IMessage/IMessage.ts @@ -237,6 +237,20 @@ export interface IMessage extends IRocketChatRecord { }; } +export type EncryptedMessageContent = { + content: { + algorithm: 'rc.v1.aes-sha2'; + ciphertext: string; + }; +}; + +export const isEncryptedMessageContent = (content: unknown): content is EncryptedMessageContent => + typeof content === 'object' && + content !== null && + 'content' in content && + typeof (content as any).content === 'object' && + (content as any).content?.algorithm === 'rc.v1.aes-sha2'; + export interface ISystemMessage extends IMessage { t: MessageTypesValues; } diff --git a/packages/core-typings/src/IUpload.ts b/packages/core-typings/src/IUpload.ts index 8bcabee2fa8de..2eb5cf0741a73 100644 --- a/packages/core-typings/src/IUpload.ts +++ b/packages/core-typings/src/IUpload.ts @@ -59,6 +59,11 @@ export interface IUpload { hashes?: { sha256: string; }; + federation?: { + mxcUri: string; + serverName: string; + mediaId: string; + }; } export type IUploadWithUser = IUpload & { user?: Pick }; diff --git a/packages/model-typings/src/models/IMessagesModel.ts b/packages/model-typings/src/models/IMessagesModel.ts index 6b79ac0c21829..20ba7048ac092 100644 --- a/packages/model-typings/src/models/IMessagesModel.ts +++ b/packages/model-typings/src/models/IMessagesModel.ts @@ -103,6 +103,8 @@ export interface IMessagesModel extends IBaseModel { findOneByFederationId(federationEventId: string): Promise; + findLatestFederationThreadMessageByTmid(tmid: string, messageId: IMessage['_id']): Promise; + setFederationEventIdById(_id: string, federationEventId: string): Promise; removeByRoomId(roomId: IRoom['_id']): Promise; diff --git a/packages/model-typings/src/models/IUploadsModel.ts b/packages/model-typings/src/models/IUploadsModel.ts index 1e80fcfe39b52..1c0cbb316e427 100644 --- a/packages/model-typings/src/models/IUploadsModel.ts +++ b/packages/model-typings/src/models/IUploadsModel.ts @@ -1,5 +1,5 @@ import type { IRoom, IUpload } from '@rocket.chat/core-typings'; -import type { FindCursor, WithId, Filter, FindOptions } from 'mongodb'; +import type { FindCursor, WithId, Filter, FindOptions, UpdateResult } from 'mongodb'; import type { FindPaginated } from './IBaseModel'; import type { IBaseUploadsModel } from './IBaseUploadsModel'; @@ -14,4 +14,8 @@ export interface IUploadsModel extends IBaseUploadsModel { uploadedAt?: Date, options?: Omit, 'sort'>, ): FindPaginated>>; + + findByFederationMediaIdAndServerName(mediaId: string, serverName: string): Promise; + + setFederationInfo(fileId: IUpload['_id'], info: Required['federation']): Promise; } diff --git a/packages/models/src/models/Messages.ts b/packages/models/src/models/Messages.ts index 50f73cab73d42..452c3fbb05d4d 100644 --- a/packages/models/src/models/Messages.ts +++ b/packages/models/src/models/Messages.ts @@ -604,6 +604,19 @@ export class MessagesRaw extends BaseRaw implements IMessagesModel { return this.findOne({ 'federation.eventId': federationEventId }); } + async findLatestFederationThreadMessageByTmid(tmid: string, messageId: IMessage['_id']): Promise { + return this.findOne( + { + '_id': { $ne: messageId }, + tmid, + 'federation.eventId': { $exists: true }, + }, + { + sort: { ts: -1 }, + }, + ); + } + async setFederationEventIdById(_id: string, federationEventId: string): Promise { await this.updateOne( { _id }, diff --git a/packages/models/src/models/Uploads.ts b/packages/models/src/models/Uploads.ts index fc1b72bce53b1..760080a0cf3b6 100644 --- a/packages/models/src/models/Uploads.ts +++ b/packages/models/src/models/Uploads.ts @@ -2,7 +2,7 @@ import type { IUpload, RocketChatRecordDeleted, IRoom } from '@rocket.chat/core-typings'; import type { FindPaginated, IUploadsModel } from '@rocket.chat/model-typings'; import { escapeRegExp } from '@rocket.chat/string-helpers'; -import type { Collection, FindCursor, Db, IndexDescription, WithId, Filter, FindOptions } from 'mongodb'; +import type { Collection, FindCursor, Db, IndexDescription, WithId, Filter, FindOptions, UpdateResult } from 'mongodb'; import { BaseUploadModelRaw } from './BaseUploadModel'; @@ -12,7 +12,12 @@ export class UploadsRaw extends BaseUploadModelRaw implements IUploadsModel { } protected modelIndexes(): IndexDescription[] { - return [...super.modelIndexes(), { key: { uploadedAt: -1 } }, { key: { rid: 1, _hidden: 1, typeGroup: 1 } }]; + return [ + ...super.modelIndexes(), + { key: { uploadedAt: -1 } }, + { key: { rid: 1, _hidden: 1, typeGroup: 1 } }, + { key: { 'federation.mediaId': 1, 'federation.serverName': 1 }, unique: true, sparse: true }, + ]; } findNotHiddenFilesOfRoom(roomId: string, searchText: string, fileType: string, limit: number): FindCursor { @@ -47,6 +52,14 @@ export class UploadsRaw extends BaseUploadModelRaw implements IUploadsModel { }); } + findByFederationMediaIdAndServerName(mediaId: string, serverName: string): Promise { + return this.findOne({ 'federation.mediaId': mediaId, 'federation.serverName': serverName }); + } + + setFederationInfo(fileId: IUpload['_id'], info: Required['federation']): Promise { + return this.updateOne({ _id: fileId }, { $set: { federation: info } }); + } + findPaginatedWithoutThumbs(query: Filter = {}, options?: FindOptions): FindPaginated>> { return this.findPaginated( {