Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion apps/meteor/server/services/messages/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,21 @@ export class MessageService extends ServiceClassInternal implements IMessageServ
rid,
msg,
federation_event_id,
tmid,
}: {
fromId: string;
rid: string;
msg: string;
federation_event_id: string;
tmid?: string;
}): Promise<IMessage> {
return executeSendMessage(fromId, { rid, msg, federation: { eventId: federation_event_id } });
const threadParams = tmid ? { tmid, tshow: true } : {};
return executeSendMessage(fromId, {
rid,
msg,
...threadParams,
federation: { eventId: federation_event_id },
});
}

async sendMessageWithValidation(user: IUser, message: Partial<IMessage>, room: Partial<IRoom>, upsert = false): Promise<IMessage> {
Expand Down
36 changes: 35 additions & 1 deletion ee/packages/federation-matrix/src/FederationMatrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,41 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS

const actualMatrixUserId = existingMatrixUserId || matrixUserId;

const result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);
let result;

if (!message.tmid) {
result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);
} else {
const threadRootMessage = await Messages.findOneById(message.tmid);
const threadRootEventId = threadRootMessage?.federation?.eventId;

if (threadRootEventId) {
const latestThreadMessage = await Messages.findOne(
{
'tmid': message.tmid,
'federation.eventId': { $exists: true },
'_id': { $ne: message._id }, // Exclude the current message
},
{ sort: { ts: -1 } },
);
const latestThreadEventId = latestThreadMessage?.federation?.eventId;

result = await this.homeserverServices.message.sendThreadMessage(
matrixRoomId,
message.msg,
actualMatrixUserId,
threadRootEventId,
latestThreadEventId,
);
} else {
this.logger.warn('Thread root event ID not found, sending as regular message');
result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);
}
}

if (!result) {
throw new Error('Failed to send message to Matrix - no result returned');
}

await Messages.setFederationEventIdById(message._id, result.eventId);

Expand Down
29 changes: 16 additions & 13 deletions ee/packages/federation-matrix/src/events/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ const logger = new Logger('federation-matrix:message');
export function message(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.message', async (data) => {
try {
logger.info('Received Matrix message event:', {
event_id: data.event_id,
room_id: data.room_id,
sender: data.sender,
});

const message = data.content?.body?.toString();
if (!message) {
logger.debug('No message found in event content');
return;
}

const content = data.content as any;
const threadRelation = content?.['m.relates_to'];
const isThreadMessage = threadRelation?.rel_type === 'm.thread';
const threadRootEventId = isThreadMessage ? threadRelation.event_id : undefined;

const [userPart, domain] = data.sender.split(':');
if (!userPart || !domain) {
logger.error('Invalid Matrix sender ID format:', data.sender);
Expand Down Expand Up @@ -108,20 +107,24 @@ export function message(emitter: Emitter<HomeserverEventSignatures>) {
}
}

logger.info('Saving federated message:', {
fromId: user._id,
roomId: internalRoomId,
eventId: data.event_id,
});
let tmid: string | undefined;
if (isThreadMessage && threadRootEventId) {
const threadRootMessage = await Messages.findOneByFederationId(threadRootEventId);
if (threadRootMessage) {
tmid = threadRootMessage._id;
logger.debug('Found thread root message:', { tmid, threadRootEventId });
} else {
logger.warn('Thread root message not found for event:', threadRootEventId);
}
}

await Message.saveMessageFromFederation({
fromId: user._id,
rid: internalRoomId,
msg: message,
federation_event_id: data.event_id,
tmid,
});

logger.debug('Successfully processed Matrix message');
} catch (error) {
logger.error('Error processing Matrix message:', error);
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core-services/src/types/IMessageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ export interface IMessageService {
rid,
msg,
federation_event_id,
tmid,
}: {
fromId: string;
rid: string;
msg: string;
federation_event_id: string;
tmid?: string;
}): Promise<IMessage>;
saveSystemMessageAndNotifyUser<T = IMessage>(
type: MessageTypesValues,
Expand Down
Loading