Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/meteor/app/lib/server/functions/createRoom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Rooms, Subscriptions, Users } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { createDirectRoom } from './createDirectRoom';
import { setupTypingEventListenerForRoom } from '../../../../ee/server/hooks/federation';
import { callbacks } from '../../../../lib/callbacks';
import { beforeCreateRoomCallback } from '../../../../lib/callbacks/beforeCreateRoomCallback';
import { calculateRoomRolePriorityFromRoles } from '../../../../lib/roles/calculateRoomRolePriorityFromRoles';
Expand Down Expand Up @@ -276,7 +277,9 @@ export const createRoom = async <T extends RoomType>(
}

if (shouldBeHandledByFederation && federationVersion === 'native') {
// TODO: move this to the hooks folder
await FederationMatrix.createRoom(room, owner, members);
setupTypingEventListenerForRoom(room._id);
}

void Apps.self?.triggerEvent(AppEvents.IPostRoomCreate, room);
Expand Down
24 changes: 22 additions & 2 deletions apps/meteor/ee/server/hooks/federation/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { FederationMatrix } from '@rocket.chat/core-services';
import { api, FederationMatrix } from '@rocket.chat/core-services';
import type { IMessage, IRoom, IUser } from '@rocket.chat/core-typings';
import { Messages } from '@rocket.chat/models';
import { Messages, Rooms } from '@rocket.chat/models';

import notifications from '../../../../app/notifications/server/lib/Notifications';
import { callbacks } from '../../../../lib/callbacks';
import { afterLeaveRoomCallback } from '../../../../lib/callbacks/afterLeaveRoomCallback';
import { afterRemoveFromRoomCallback } from '../../../../lib/callbacks/afterRemoveFromRoomCallback';
Expand Down Expand Up @@ -85,3 +86,22 @@ afterRemoveFromRoomCallback.add(
callbacks.priority.HIGH,
'federation-matrix-after-remove-from-room',
);

export const setupTypingEventListenerForRoom = (roomId: string): void => {
notifications.streamRoom.on(`${roomId}/user-activity`, (username, activity) => {
if (Array.isArray(activity) && (!activity.length || activity.includes('user-typing'))) {
void api.broadcast('user.typing', {
user: { username },
isTyping: activity.includes('user-typing'),
roomId,
});
}
});
};

export const setupInternalEDUEventListeners = async () => {
const federatedRooms = await Rooms.findFederatedRooms({ projection: { _id: 1 } }).toArray();
for (const room of federatedRooms) {
setupTypingEventListenerForRoom(room._id);
}
};
3 changes: 2 additions & 1 deletion apps/meteor/ee/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ export * from './apps/startup';
export { registerEEBroker } from './startup';

await License.onLicense('federation', async () => {
await import('./hooks/federation');
const { setupInternalEDUEventListeners } = await import('./hooks/federation');
await setupInternalEDUEventListeners();
});
55 changes: 33 additions & 22 deletions apps/meteor/server/modules/listeners/listeners.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,35 +151,20 @@ export class ListenersModule {
notifications.notifyRoom(rid, 'videoconf', callId);
});

service.onEvent('presence.status', ({ user }) => {
const { _id, username, name, status, statusText, roles } = user;
if (!status || !username) {
return;
}

notifications.notifyUserInThisInstance(_id, 'userData', {
type: 'updated',
id: _id,
diff: {
status,
...(statusText && { statusText }),
},
unset: {},
});

notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]);

if (_id) {
notifications.sendPresence(_id, username, STATUS_MAP[status], statusText);
}
});
service.onEvent('presence.status', ({ user }) => this.handlePresence({ user }, notifications));

service.onEvent('user.updateCustomStatus', (userStatus) => {
notifications.notifyLoggedInThisInstance('updateCustomUserStatus', {
userStatusData: userStatus,
});
});

service.onEvent('federation-matrix.user.typing', ({ isTyping, roomId, username }) => {
notifications.notifyRoom(roomId, 'user-activity', username, isTyping ? ['user-typing'] : []);
});

service.onEvent('federation-matrix.user.presence.status', ({ user }) => this.handlePresence({ user }, notifications));

service.onEvent('watch.messages', async ({ message }) => {
if (!message.rid) {
return;
Expand Down Expand Up @@ -506,4 +491,30 @@ export class ListenersModule {
notifications.streamRoomMessage.emit(roomId, acknowledgeMessage);
});
}

private handlePresence(
{ user }: { user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'> },
notifications: NotificationsModule,
): void {
const { _id, username, name, status, statusText, roles } = user;
if (!status || !username) {
return;
}

notifications.notifyUserInThisInstance(_id, 'userData', {
type: 'updated',
id: _id,
diff: {
status,
...(statusText && { statusText }),
},
unset: {},
});

notifications.notifyLoggedInThisInstance('user-status', [_id, username, STATUS_MAP[status], statusText, name, roles]);

if (_id) {
notifications.sendPresence(_id, username, STATUS_MAP[status], statusText);
}
}
}
55 changes: 54 additions & 1 deletion ee/packages/federation-matrix/src/FederationMatrix.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import 'reflect-metadata';

import type { PresenceState } from '@hs/core';
import { ConfigService, createFederationContainer, getAllServices } from '@hs/federation-sdk';
import type { HomeserverEventSignatures, HomeserverServices, FederationContainerOptions } from '@hs/federation-sdk';
import { type IFederationMatrixService, Room, ServiceClass, Settings } from '@rocket.chat/core-services';
import { isDeletedMessage, isMessageFromMatrixFederation, type IMessage, type IRoom, type IUser } from '@rocket.chat/core-typings';
import { isDeletedMessage, isMessageFromMatrixFederation, UserStatus, type IMessage, type IRoom, type IUser } from '@rocket.chat/core-typings';
import { Emitter } from '@rocket.chat/emitter';
import { Router } from '@rocket.chat/http-router';
import { Logger } from '@rocket.chat/logger';
Expand Down Expand Up @@ -63,6 +64,58 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
createFederationContainer(containerOptions, config);
instance.homeserverServices = getAllServices();
instance.buildMatrixHTTPRoutes();
instance.onEvent('user.typing', async ({ isTyping, roomId, user: { username } }): Promise<void> => {
if (!roomId || !username) {
return;
}
const externalRoomId = await MatrixBridgedRoom.getExternalRoomId(roomId);
if (!externalRoomId) {
return;
}
const localUser = await Users.findOneByUsername(username, { projection: { _id: 1 } });
if (!localUser) {
return;
}
const externalUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(localUser._id);
if (!externalUserId) {
return;
}
void instance.homeserverServices.edu.sendTypingNotification(externalRoomId, externalUserId, isTyping);
});
instance.onEvent(
'presence.status',
async ({ user }: { user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'> }): Promise<void> => {
if (!user.username || !user.status) {
return;
}
const localUser = await Users.findOneByUsername(user.username, { projection: { _id: 1 } });
if (!localUser) {
return;
}
const externalUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(localUser._id);
if (!externalUserId) {
return;
}

const roomsUserIsMemberOf = await Subscriptions.findUserFederatedRoomIds(localUser._id).toArray();
const statusMap: Record<UserStatus, PresenceState> = {
[UserStatus.ONLINE]: 'online',
[UserStatus.OFFLINE]: 'offline',
[UserStatus.AWAY]: 'unavailable',
[UserStatus.BUSY]: 'unavailable',
[UserStatus.DISABLED]: 'offline',
};
void instance.homeserverServices.edu.sendPresenceUpdateToRooms(
[
{
user_id: externalUserId,
presence: statusMap[user.status] || 'offline',
},
],
roomsUserIsMemberOf.map(({ externalRoomId }) => externalRoomId),
);
},
);

return instance;
}
Expand Down
18 changes: 7 additions & 11 deletions ee/packages/federation-matrix/src/api/_matrix/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,15 @@ export const getMatrixTransactionsRoutes = (services: HomeserverServices) => {
async (c) => {
const body = await c.req.json();

const { pdus = [] } = body;

if (pdus.length === 0) {
return {
body: {
pdus: {},
edus: {},
},
statusCode: 200,
};
const { pdus = [], edus = [] } = body;

if (pdus.length > 0) {
await event.processIncomingPDUs(pdus);
}

await event.processIncomingPDUs(pdus);
if (edus.length > 0) {
await event.processIncomingEDUs(edus);
}

return {
body: {
Expand Down
84 changes: 84 additions & 0 deletions ee/packages/federation-matrix/src/events/edu.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import type { HomeserverEventSignatures } from '@hs/federation-sdk';
import { api } from '@rocket.chat/core-services';
import { UserStatus } from '@rocket.chat/core-typings';
import type { Emitter } from '@rocket.chat/emitter';
import { Logger } from '@rocket.chat/logger';
import { MatrixBridgedUser, MatrixBridgedRoom, Users } from '@rocket.chat/models';

import { convertExternalUserIdToInternalUsername } from '../helpers/identifiers';

const logger = new Logger('federation-matrix:edu');

export const edus = async (emitter: Emitter<HomeserverEventSignatures>) => {
emitter.on('homeserver.matrix.typing', async (data) => {
try {
const matrixRoom = await MatrixBridgedRoom.getLocalRoomId(data.room_id);
if (!matrixRoom) {
logger.debug(`No bridged room found for Matrix room_id: ${data.room_id}`);
return;
}

const matrixUser = await MatrixBridgedUser.findOne({ mui: convertExternalUserIdToInternalUsername(data.user_id) });
if (!matrixUser) {
logger.debug(`No bridged user found for Matrix user_id: ${data.user_id}`);
return;
}

const user = await Users.findOneById(matrixUser.uid, { projection: { _id: 1, username: 1 } });
if (!user || !user.username) {
logger.debug(`User not found for uid: ${matrixUser.uid}`);
return;
}

void api.broadcast('federation-matrix.user.typing', {
username: user.username,
isTyping: data.typing,
roomId: matrixRoom,
});
} catch (error) {
logger.error('Error handling Matrix typing event:', error);
}
});

emitter.on('homeserver.matrix.presence', async (data) => {
try {
const matrixUser = await MatrixBridgedUser.findOne({ mui: convertExternalUserIdToInternalUsername(data.user_id) });
if (!matrixUser) {
logger.debug(`No bridged user found for Matrix user_id: ${data.user_id}`);
return;
}
const user = await Users.findOneById(matrixUser.uid, {
projection: { _id: 1, username: 1, statusText: 1, roles: 1, name: 1, status: 1 },
});
if (!user) {
logger.debug(`User not found for uid: ${matrixUser.uid}`);
return;
}

const statusMap = {
online: UserStatus.ONLINE,
offline: UserStatus.OFFLINE,
unavailable: UserStatus.AWAY,
};

const status = statusMap[data.presence] || UserStatus.OFFLINE;
await Users.updateOne(
{ _id: user._id },
{
$set: {
status,
statusDefault: status,
},
},
);

const { _id, username, statusText, roles, name } = user;
void api.broadcast('federation-matrix.user.presence.status', {
user: { status, _id, username, statusText, roles, name },
});
logger.debug(`Updated presence for user ${matrixUser.uid} to ${status} from Matrix federation`);
} catch (error) {
logger.error('Error handling Matrix presence event:', error);
}
});
};
2 changes: 2 additions & 0 deletions ee/packages/federation-matrix/src/events/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { HomeserverEventSignatures } from '@hs/federation-sdk';
import type { Emitter } from '@rocket.chat/emitter';

import { edus } from './edu';
import { invite } from './invite';
import { member } from './member';
import { message } from './message';
Expand All @@ -13,4 +14,5 @@ export function registerEvents(emitter: Emitter<HomeserverEventSignatures>) {
invite(emitter);
reaction(emitter);
member(emitter);
edus(emitter);
}
5 changes: 5 additions & 0 deletions packages/core-services/src/events/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ export type EventSignatures = {
}): void;
'user.updateCustomStatus'(userStatus: Omit<ICustomUserStatus, '_updatedAt'>): void;
'user.typing'(data: { user: Partial<IUser>; isTyping: boolean; roomId: string }): void;
'federation-matrix.user.typing'(data: { username: string; isTyping: boolean; roomId: string }): void;
'federation-matrix.user.presence.status'(data: {
user: Pick<IUser, '_id' | 'username' | 'status' | 'statusText' | 'name' | 'roles'>;
previousStatus?: UserStatus;
}): void;
'user.video-conference'(data: {
userId: IUser['_id'];
action: string;
Expand Down
1 change: 1 addition & 0 deletions packages/model-typings/src/models/ISubscriptionsModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,5 @@ export interface ISubscriptionsModel extends IBaseModel<ISubscription> {
countByRoomIdWhenUsernameExists(rid: string): Promise<number>;
setE2EKeyByUserIdAndRoomId(userId: string, rid: string, key: string): Promise<null | WithId<ISubscription>>;
countUsersInRoles(roles: IRole['_id'][], rid: IRoom['_id'] | undefined): Promise<number>;
findUserFederatedRoomIds(userId: IUser['_id']): AggregationCursor<{ _id: IRoom['_id']; externalRoomId: string }>;
}
4 changes: 4 additions & 0 deletions packages/models/src/models/Rooms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ export class RoomsRaw extends BaseRaw<IRoom> implements IRoomsModel {
sparse: true,
},
{ key: { t: 1, ts: 1 } },
{
key: { federated: 1 },
sparse: true,
},
{
key: {
'usersWaitingForE2EKeys.userId': 1,
Expand Down
Loading
Loading