Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions apps/meteor/app/lib/server/methods/addUsersToRoom.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { api } from '@rocket.chat/core-services';
import type { IUser } from '@rocket.chat/core-typings';
import { isRoomFederated } from '@rocket.chat/core-typings';
import type { ServerMethods } from '@rocket.chat/ddp-client';
import { Subscriptions, Users, Rooms } from '@rocket.chat/models';
import { Match } from 'meteor/check';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { i18n } from '../../../../server/lib/i18n';
import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission';
import { addUserToRoom } from '../functions/addUserToRoom';
Expand Down Expand Up @@ -79,12 +77,6 @@ export const addUsersToRoomMethod = async (userId: string, data: { rid: string;
});
}

// Validate each user, then add to room
if (isRoomFederated(room)) {
await callbacks.run('federation.onAddUsersToRoom', { invitees: data.users, inviter: user }, room);
return true;
}

await Promise.all(
data.users.map(async (username) => {
const newUser = await Users.findOneByUsernameIgnoringCase(username);
Expand Down
15 changes: 0 additions & 15 deletions apps/meteor/ee/server/hooks/federation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,6 @@ callbacks.add(
'native-federation-after-delete-message',
);

callbacks.add(
'federation.onAddUsersToRoom',
async ({ invitees, inviter }, room) => {
if (FederationActions.shouldPerformFederationAction(room)) {
await FederationMatrix.inviteUsersToRoom(
room,
invitees.map((invitee) => (typeof invitee === 'string' ? invitee : invitee.username)).filter((v) => v != null),
inviter,
);
}
},
callbacks.priority.MEDIUM,
'native-federation-on-add-users-to-room ',
);

beforeAddUserToRoom.add(
async ({ user, inviter }, room) => {
if (!user.username || !inviter) {
Expand Down
1 change: 0 additions & 1 deletion apps/meteor/lib/callbacks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ interface EventLikeCallbackSignatures {
message: IMessage,
params: { user: IUser; reaction: string; shouldReact: boolean; oldMessage: IMessage; room: IRoom },
) => void;
'federation.onAddUsersToRoom': (params: { invitees: IUser[] | Username[]; inviter: IUser }, room: IRoom) => void;
'onJoinVideoConference': (callId: VideoConference['_id'], userId?: IUser['_id']) => Promise<void>;
'usernameSet': () => void;
'beforeJoinRoom': (user: IUser, room: IRoom) => void;
Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/server/methods/addRoomModerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const addRoomModerator = async (fromUserId: IUser['_id'], rid: IRoom['_id
check(rid, String);
check(userId, String);

const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1 } });
const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1, federation: 1 } });
if (!room) {
throw new Meteor.Error('error-invalid-room', 'Invalid room', {
method: 'addRoomModerator',
Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/server/methods/addRoomOwner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const addRoomOwner = async (fromUserId: IUser['_id'], rid: IRoom['_id'],
check(rid, String);
check(userId, String);

const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1 } });
const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1, federation: 1 } });
if (!room) {
throw new Meteor.Error('error-invalid-room', 'Invalid room', {
method: 'addRoomOwner',
Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/server/methods/removeRoomModerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const removeRoomModerator = async (fromUserId: IUser['_id'], rid: IRoom['
check(rid, String);
check(userId, String);

const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1 } });
const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1, federation: 1 } });
if (!room) {
throw new Meteor.Error('error-invalid-room', 'Invalid room', {
method: 'removeRoomModerator',
Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/server/methods/removeRoomOwner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const removeRoomOwner = async (fromUserId: string, rid: string, userId: s
check(rid, String);
check(userId, String);

const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1 } });
const room = await Rooms.findOneById(rid, { projection: { t: 1, federated: 1, federation: 1 } });
if (!room) {
throw new Meteor.Error('error-invalid-room', 'Invalid room', {
method: 'removeRoomOwner',
Expand Down
2 changes: 1 addition & 1 deletion ee/apps/federation-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"@rocket.chat/core-typings": "workspace:*",
"@rocket.chat/emitter": "^0.31.25",
"@rocket.chat/federation-matrix": "workspace:^",
"@rocket.chat/federation-sdk": "0.1.11",
"@rocket.chat/federation-sdk": "0.1.13",
"@rocket.chat/http-router": "workspace:*",
"@rocket.chat/instance-status": "workspace:^",
"@rocket.chat/license": "workspace:^",
Expand Down
2 changes: 1 addition & 1 deletion ee/packages/federation-matrix/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"@rocket.chat/core-services": "workspace:^",
"@rocket.chat/core-typings": "workspace:^",
"@rocket.chat/emitter": "^0.31.25",
"@rocket.chat/federation-sdk": "0.1.11",
"@rocket.chat/federation-sdk": "0.1.13",
"@rocket.chat/http-router": "workspace:^",
"@rocket.chat/license": "workspace:^",
"@rocket.chat/models": "workspace:^",
Expand Down
56 changes: 39 additions & 17 deletions ee/packages/federation-matrix/src/FederationMatrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { Users, Subscriptions, Messages, Rooms } from '@rocket.chat/models';
import emojione from 'emojione';

import { getWellKnownRoutes } from './api/.well-known/server';
import { getMatrixInviteRoutes } from './api/_matrix/invite';
import { acceptInvite, getMatrixInviteRoutes } from './api/_matrix/invite';
import { getKeyServerRoutes } from './api/_matrix/key/server';
import { getMatrixMediaRoutes } from './api/_matrix/media';
import { getMatrixProfilesRoutes } from './api/_matrix/profiles';
Expand Down Expand Up @@ -88,6 +88,24 @@ export const extractDomainFromMatrixUserId = (mxid: string): string => {
return mxid.substring(separatorIndex + 1);
};

/**
* Extract the username and the servername from a matrix user id
* if the serverName is the same as the serverName in the mxid, return only the username (rocket.chat regular username)
* otherwise, return the full mxid and the servername
*/
export const getUsernameServername = (mxid: string, serverName: string): [mxid: string, serverName: string, isLocal: boolean] => {
const senderServerName = extractDomainFromMatrixUserId(mxid);
// if the serverName is the same as the serverName in the mxid, return only the username (rocket.chat regular username)
if (serverName === senderServerName) {
const separatorIndex = mxid.indexOf(':', 1);
if (separatorIndex === -1) {
throw new Error(`Invalid federated username: ${mxid}`);
}
return [mxid.substring(1, separatorIndex), senderServerName, true]; // removers also the @
}

return [mxid, senderServerName, false];
};
/**
* Helper function to create a federated user
*
Expand All @@ -99,7 +117,7 @@ export async function createOrUpdateFederatedUser(options: {
name?: string;
origin: string;
}): Promise<string> {
const { username, name = username } = options;
const { username, name = username, origin } = options;

const result = await Users.updateOne(
{
Expand All @@ -108,7 +126,7 @@ export async function createOrUpdateFederatedUser(options: {
{
$set: {
username,
name,
name: name || username,
type: 'user' as const,
status: UserStatus.OFFLINE,
active: true,
Expand Down Expand Up @@ -294,7 +312,12 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS

async created(): Promise<void> {
try {
registerEvents(this.eventHandler, this.serverName, { typing: this.processEDUTyping, presence: this.processEDUPresence });
registerEvents(
this.eventHandler,
this.serverName,
{ typing: this.processEDUTyping, presence: this.processEDUPresence },
this.homeserverServices,
);
} catch (error) {
this.logger.warn('Homeserver module not available, running in limited mode');
}
Expand Down Expand Up @@ -669,24 +692,23 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
}
}

async inviteUsersToRoom(room: IRoomNativeFederated, usersUserName: string[], inviter: IUser): Promise<void> {
async inviteUsersToRoom(room: IRoomNativeFederated, matrixUsersUsername: string[], inviter: IUser): Promise<void> {
try {
const inviterUserId = `@${inviter.username}:${this.serverName}`;

await Promise.all(
usersUserName
.filter((username) => {
const isExternalUser = username.includes(':');
return isExternalUser;
})
.map(async (username) => {
const alreadyMember = await Subscriptions.findOneByRoomIdAndUsername(room._id, username, { projection: { _id: 1 } });
if (alreadyMember) {
return;
}
matrixUsersUsername.map(async (username) => {
if (validateFederatedUsername(username)) {
return this.homeserverServices.invite.inviteUserToRoom(username, room.federation.mrid, inviterUserId);
}
const result = await this.homeserverServices.invite.inviteUserToRoom(
`@${username}:${this.serverName}`,
room.federation.mrid,
inviterUserId,
);

await this.homeserverServices.invite.inviteUserToRoom(username, room.federation.mrid, inviterUserId);
}),
return acceptInvite(result.event, username, this.homeserverServices);
}),
);
} catch (error) {
this.logger.error({ msg: 'Failed to invite an user to Matrix:', err: error });
Expand Down
96 changes: 73 additions & 23 deletions ee/packages/federation-matrix/src/api/_matrix/invite.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Room } from '@rocket.chat/core-services';
import type { IUser } from '@rocket.chat/core-typings';
import { isUserNativeFederated, type IUser } from '@rocket.chat/core-typings';
import type {
HomeserverServices,
RoomService,
Expand All @@ -9,10 +9,10 @@ import type {
RoomVersion,
} from '@rocket.chat/federation-sdk';
import { Router } from '@rocket.chat/http-router';
import { Rooms, Users } from '@rocket.chat/models';
import { Users, Rooms } from '@rocket.chat/models';
import { ajv } from '@rocket.chat/rest-typings/dist/v1/Ajv';

import { createOrUpdateFederatedUser } from '../../FederationMatrix';
import { createOrUpdateFederatedUser, getUsernameServername } from '../../FederationMatrix';

const EventBaseSchema = {
type: 'object',
Expand Down Expand Up @@ -139,19 +139,34 @@ const isProcessInviteResponseProps = ajv.compile(ProcessInviteResponseSchema);
// 5 seconds
// 25 seconds
// 625 seconds = 10 minutes 25 seconds // max
async function runWithBackoff(fn: () => Promise<void>, delaySec = 5) {
try {
await fn();
} catch (e) {
const delay = delaySec === 625 ? 625 : delaySec ** 2;
console.log(`error occurred, retrying in ${delay}ms`, e);
setTimeout(() => {
runWithBackoff(fn, delay * 1000);
}, delay);
}
function runWithBackoff<T extends (...args: any[]) => Promise<void>>(fn: T, delaySec = 5): T & { stop: () => void } {
let timeoutId: NodeJS.Timeout | null = null;
let currentDelay = delaySec;

const execute = async (...args: Parameters<T>) => {
try {
await fn(...args);
currentDelay = delaySec; // Reset delay on success
} catch (e) {
const delay = currentDelay === 625 ? 625 : currentDelay ** 2;
console.log(`error occurred, retrying in ${delay}ms`, e);
currentDelay = delay;
timeoutId = setTimeout(() => execute(...args), delay);
}
};

return Object.assign(
(...args: Parameters<T>) => {
if (timeoutId) {
clearTimeout(timeoutId);
}
execute(...args);
},
{ stop: () => clearTimeout(timeoutId ?? 0) },
) as T & { stop: () => void };
}

async function joinRoom({
export async function joinRoom({
inviteEvent,
user, // ours trying to join the room
room,
Expand Down Expand Up @@ -190,7 +205,7 @@ async function joinRoom({
const senderUserId =
senderUser?._id ||
(await createOrUpdateFederatedUser({
username: inviteEvent.sender,
username: inviteEvent.sender as `@${string}:${string}`,
origin: matrixRoom.origin,
}));

Expand Down Expand Up @@ -264,16 +279,51 @@ async function joinRoom({
}

await Room.addUserToRoom(internalRoomId, { _id: user._id }, { _id: senderUserId, username: inviteEvent.sender });

// TODO is this needed?
// if (isDM) {
// await MatrixBridgedRoom.createOrUpdateByLocalRoomId(internalRoomId, inviteEvent.roomId, matrixRoom.origin);
// }
}

async function startJoiningRoom(...opts: Parameters<typeof joinRoom>) {
void runWithBackoff(() => joinRoom(...opts));
}
export const startJoiningRoom = runWithBackoff(joinRoom);

// This is a special case where inside rocket chat we invite users inside rockechat, so if the sender or the invitee are external iw should throw an error
export const acceptInvite = async (
inviteEvent: PersistentEventBase<RoomVersion, 'm.room.member'>,
username: string,
services: HomeserverServices,
) => {
if (!inviteEvent.stateKey) {
throw new Error('join event has missing state key, unable to determine user to join');
}

const internalMappedRoom = await Rooms.findOne({ 'federation.mrid': inviteEvent.roomId });
if (!internalMappedRoom) {
throw new Error('room not found not processing invite');
}

const inviter = await Users.findOneByUsername<Pick<IUser, '_id' | 'username'>>(
getUsernameServername(inviteEvent.sender, services.config.serverName)[0],
{
projection: { _id: 1, username: 1 },
},
);

if (!inviter) {
throw new Error('Sender user ID not found');
}
if (isUserNativeFederated(inviter)) {
throw new Error('Sender user is native federated');
}

const user = await Users.findOneByUsername<Pick<IUser, '_id' | 'username'>>(username, { projection: { _id: 1 } });

// we cannot accept invites from users that are external
if (!user) {
throw new Error('User not found');
}
if (isUserNativeFederated(user)) {
throw new Error('User is native federated');
}

await services.room.joinUser(inviteEvent.roomId, inviteEvent.stateKey);
};

export const getMatrixInviteRoutes = (services: HomeserverServices) => {
const { invite, state, room } = services;
Expand Down
5 changes: 3 additions & 2 deletions ee/packages/federation-matrix/src/events/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Emitter } from '@rocket.chat/emitter';
import type { HomeserverEventSignatures } from '@rocket.chat/federation-sdk';
import type { HomeserverEventSignatures, HomeserverServices } from '@rocket.chat/federation-sdk';

import { edus } from './edu';
import { member } from './member';
Expand All @@ -12,11 +12,12 @@ export function registerEvents(
emitter: Emitter<HomeserverEventSignatures>,
serverName: string,
eduProcessTypes: { typing: boolean; presence: boolean },
services: HomeserverServices,
) {
ping(emitter);
message(emitter, serverName);
reaction(emitter);
member(emitter);
member(emitter, services);
edus(emitter, eduProcessTypes);
room(emitter);
}
Loading
Loading