From e6de227e2ec4a3b937fb0a184707cdf4c4b71e09 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Mon, 4 Aug 2025 20:28:22 -0300 Subject: [PATCH 1/2] feat: adds threads messaging and reactions support --- .../server/services/messages/service.ts | 10 +++++- .../federation-matrix/src/FederationMatrix.ts | 34 ++++++++++++++++++- .../federation-matrix/src/events/message.ts | 29 +++++++++------- .../src/types/IMessageService.ts | 2 ++ 4 files changed, 60 insertions(+), 15 deletions(-) diff --git a/apps/meteor/server/services/messages/service.ts b/apps/meteor/server/services/messages/service.ts index cb54e4c055c2e..8427ebee2a9f1 100644 --- a/apps/meteor/server/services/messages/service.ts +++ b/apps/meteor/server/services/messages/service.ts @@ -90,13 +90,21 @@ export class MessageService extends ServiceClassInternal implements IMessageServ rid, msg, federation_event_id, + tmid, }: { fromId: string; rid: string; msg: string; federation_event_id: string; + tmid?: string; }): Promise { - return executeSendMessage(fromId, { rid, msg, federation: { eventId: federation_event_id } }); + const threadParams = tmid ? { tmid, tshow: true } : {}; + return executeSendMessage(fromId, { + rid, + msg, + ...threadParams, + federation: { eventId: federation_event_id }, + }); } async sendMessageWithValidation(user: IUser, message: Partial, room: Partial, upsert = false): Promise { diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index 74581ebcc661a..f5091b884d62a 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -184,7 +184,39 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS const actualMatrixUserId = existingMatrixUserId || matrixUserId; - const result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId); + let result; + + if (!message.tmid) { + result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId); + } + + if (message.tmid) { + const threadRootMessage = await Messages.findOneById(message.tmid); + const threadRootEventId = threadRootMessage?.federation?.eventId; + + if (threadRootEventId) { + const latestThreadMessage = await Messages.findOne( + { + 'tmid': message.tmid, + 'federation.eventId': { $exists: true }, + '_id': { $ne: message._id }, // Exclude the current message + }, + { sort: { ts: -1 } }, + ); + const latestThreadEventId = latestThreadMessage?.federation?.eventId; + + result = await this.homeserverServices.message.sendThreadMessage( + matrixRoomId, + message.msg, + actualMatrixUserId, + threadRootEventId, + latestThreadEventId, + ); + } else { + this.logger.warn('Thread root event ID not found, sending as regular message'); + result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId); + } + } await Messages.setFederationEventIdById(message._id, result.eventId); diff --git a/ee/packages/federation-matrix/src/events/message.ts b/ee/packages/federation-matrix/src/events/message.ts index 14ec990143de2..dfde54bfa158c 100644 --- a/ee/packages/federation-matrix/src/events/message.ts +++ b/ee/packages/federation-matrix/src/events/message.ts @@ -11,18 +11,17 @@ const logger = new Logger('federation-matrix:message'); export function message(emitter: Emitter) { emitter.on('homeserver.matrix.message', async (data) => { try { - logger.info('Received Matrix message event:', { - event_id: data.event_id, - room_id: data.room_id, - sender: data.sender, - }); - const message = data.content?.body?.toString(); if (!message) { logger.debug('No message found in event content'); return; } + const content = data.content as any; + const threadRelation = content?.['m.relates_to']; + const isThreadMessage = threadRelation?.rel_type === 'm.thread'; + const threadRootEventId = isThreadMessage ? threadRelation.event_id : undefined; + const [userPart, domain] = data.sender.split(':'); if (!userPart || !domain) { logger.error('Invalid Matrix sender ID format:', data.sender); @@ -108,20 +107,24 @@ export function message(emitter: Emitter) { } } - logger.info('Saving federated message:', { - fromId: user._id, - roomId: internalRoomId, - eventId: data.event_id, - }); + let tmid: string | undefined; + if (isThreadMessage && threadRootEventId) { + const threadRootMessage = await Messages.findOneByFederationId(threadRootEventId); + if (threadRootMessage) { + tmid = threadRootMessage._id; + logger.debug('Found thread root message:', { tmid, threadRootEventId }); + } else { + logger.warn('Thread root message not found for event:', threadRootEventId); + } + } await Message.saveMessageFromFederation({ fromId: user._id, rid: internalRoomId, msg: message, federation_event_id: data.event_id, + tmid, }); - - logger.debug('Successfully processed Matrix message'); } catch (error) { logger.error('Error processing Matrix message:', error); } diff --git a/packages/core-services/src/types/IMessageService.ts b/packages/core-services/src/types/IMessageService.ts index f0c0a2df8b8b9..0b2e5a743b11f 100644 --- a/packages/core-services/src/types/IMessageService.ts +++ b/packages/core-services/src/types/IMessageService.ts @@ -14,11 +14,13 @@ export interface IMessageService { rid, msg, federation_event_id, + tmid, }: { fromId: string; rid: string; msg: string; federation_event_id: string; + tmid?: string; }): Promise; saveSystemMessageAndNotifyUser( type: MessageTypesValues, From aaa253db1fbaced284ee99ccc01c7367c04aecdf Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Mon, 4 Aug 2025 21:20:47 -0300 Subject: [PATCH 2/2] fix: changes way to handle thread messages inside sendMessage method --- ee/packages/federation-matrix/src/FederationMatrix.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ee/packages/federation-matrix/src/FederationMatrix.ts b/ee/packages/federation-matrix/src/FederationMatrix.ts index f5091b884d62a..825c091c18d3c 100644 --- a/ee/packages/federation-matrix/src/FederationMatrix.ts +++ b/ee/packages/federation-matrix/src/FederationMatrix.ts @@ -188,9 +188,7 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS if (!message.tmid) { result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId); - } - - if (message.tmid) { + } else { const threadRootMessage = await Messages.findOneById(message.tmid); const threadRootEventId = threadRootMessage?.federation?.eventId; @@ -218,6 +216,10 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS } } + if (!result) { + throw new Error('Failed to send message to Matrix - no result returned'); + } + await Messages.setFederationEventIdById(message._id, result.eventId); this.logger.debug('Message sent to Matrix successfully:', result.eventId);