Skip to content

Commit

Permalink
refactor: Subscriptions out of DB Watcher (#32540)
Browse files Browse the repository at this point in the history
Co-authored-by: Diego Sampaio <[email protected]>
  • Loading branch information
ricardogarim and sampaiodiego authored Aug 21, 2024
1 parent 7937ff7 commit 150c7b1
Show file tree
Hide file tree
Showing 64 changed files with 1,112 additions and 331 deletions.
1 change: 1 addition & 0 deletions apps/meteor/app/api/server/v1/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ API.v1.addRoute(
async post() {
const { readThreads = false } = this.bodyParams;
const roomId = 'rid' in this.bodyParams ? this.bodyParams.rid : this.bodyParams.roomId;

await readMessages(roomId, this.userId, readThreads);

return API.v1.success();
Expand Down
27 changes: 24 additions & 3 deletions apps/meteor/app/autotranslate/server/methods/saveSettings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';

import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission';
import { notifyOnSubscriptionChangedById } from '../../../lib/server/lib/notifyListener';

declare module '@rocket.chat/ddp-client' {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand Down Expand Up @@ -44,6 +45,8 @@ Meteor.methods<ServerMethods>({
});
}

let shouldNotifySubscriptionChanged = false;

switch (field) {
case 'autoTranslate':
const room = await Rooms.findE2ERoomById(rid, { projection: { _id: 1 } });
Expand All @@ -53,16 +56,34 @@ Meteor.methods<ServerMethods>({
});
}

await Subscriptions.updateAutoTranslateById(subscription._id, value === '1');
const updateAutoTranslateResponse = await Subscriptions.updateAutoTranslateById(subscription._id, value === '1');
if (updateAutoTranslateResponse.modifiedCount) {
shouldNotifySubscriptionChanged = true;
}

if (!subscription.autoTranslateLanguage && options.defaultLanguage) {
await Subscriptions.updateAutoTranslateLanguageById(subscription._id, options.defaultLanguage);
const updateAutoTranslateLanguageResponse = await Subscriptions.updateAutoTranslateLanguageById(
subscription._id,
options.defaultLanguage,
);
if (updateAutoTranslateLanguageResponse.modifiedCount) {
shouldNotifySubscriptionChanged = true;
}
}

break;
case 'autoTranslateLanguage':
await Subscriptions.updateAutoTranslateLanguageById(subscription._id, value);
const updateAutoTranslateLanguage = await Subscriptions.updateAutoTranslateLanguageById(subscription._id, value);
if (updateAutoTranslateLanguage.modifiedCount) {
shouldNotifySubscriptionChanged = true;
}
break;
}

if (shouldNotifySubscriptionChanged) {
void notifyOnSubscriptionChangedById(subscription._id);
}

return true;
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,28 @@ import { Match } from 'meteor/check';
import { Meteor } from 'meteor/meteor';
import type { UpdateResult } from 'mongodb';

import { notifyOnSubscriptionChangedByRoomId } from '../../../lib/server/lib/notifyListener';

export const saveRoomCustomFields = async function (rid: string, roomCustomFields: Record<string, any>): Promise<UpdateResult> {
if (!Match.test(rid, String)) {
throw new Meteor.Error('invalid-room', 'Invalid room', {
function: 'RocketChat.saveRoomCustomFields',
});
}

if (!Match.test(roomCustomFields, Object)) {
throw new Meteor.Error('invalid-roomCustomFields-type', 'Invalid roomCustomFields type', {
function: 'RocketChat.saveRoomCustomFields',
});
}

const ret = await Rooms.setCustomFieldsById(rid, roomCustomFields);

// Update customFields of any user's Subscription related with this rid
await Subscriptions.updateCustomFieldsByRoomId(rid, roomCustomFields);
const { modifiedCount } = await Subscriptions.updateCustomFieldsByRoomId(rid, roomCustomFields);
if (modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}

return ret;
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { Match } from 'meteor/check';
import { Meteor } from 'meteor/meteor';
import type { UpdateResult } from 'mongodb';

import { notifyOnSubscriptionChangedByRoomId } from '../../../lib/server/lib/notifyListener';

export const saveRoomEncrypted = async function (rid: string, encrypted: boolean, user: IUser, sendMessage = true): Promise<UpdateResult> {
if (!Match.test(rid, String)) {
throw new Meteor.Error('invalid-room', 'Invalid room', {
Expand All @@ -27,7 +29,10 @@ export const saveRoomEncrypted = async function (rid: string, encrypted: boolean
}

if (encrypted) {
await Subscriptions.disableAutoTranslateByRoomId(rid);
const { modifiedCount } = await Subscriptions.disableAutoTranslateByRoomId(rid);
if (modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}
}
return update;
};
18 changes: 15 additions & 3 deletions apps/meteor/app/channel-settings/server/functions/saveRoomName.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import type { Document, UpdateResult } from 'mongodb';
import { callbacks } from '../../../../lib/callbacks';
import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator';
import { checkUsernameAvailability } from '../../../lib/server/functions/checkUsernameAvailability';
import { notifyOnIntegrationChangedByChannels } from '../../../lib/server/lib/notifyListener';
import { notifyOnIntegrationChangedByChannels, notifyOnSubscriptionChangedByRoomId } from '../../../lib/server/lib/notifyListener';
import { getValidRoomName } from '../../../utils/server/lib/getValidRoomName';

const updateFName = async (rid: string, displayName: string): Promise<(UpdateResult | Document)[]> => {
return Promise.all([Rooms.setFnameById(rid, displayName), Subscriptions.updateFnameByRoomId(rid, displayName)]);
const responses = await Promise.all([Rooms.setFnameById(rid, displayName), Subscriptions.updateFnameByRoomId(rid, displayName)]);

if (responses[1]?.modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}

return responses;
};

const updateRoomName = async (rid: string, displayName: string, slugifiedRoomName: string) => {
Expand All @@ -24,10 +30,16 @@ const updateRoomName = async (rid: string, displayName: string, slugifiedRoomNam
});
}

return Promise.all([
const responses = await Promise.all([
Rooms.setNameById(rid, slugifiedRoomName, displayName),
Subscriptions.updateNameAndAlertByRoomId(rid, slugifiedRoomName, displayName),
]);

if (responses[1]?.modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}

return responses;
};

export async function saveRoomName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { UpdateResult, Document } from 'mongodb';
import { RoomSettingsEnum } from '../../../../definition/IRoomTypeConfig';
import { i18n } from '../../../../server/lib/i18n';
import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator';
import { notifyOnSubscriptionChangedByRoomId } from '../../../lib/server/lib/notifyListener';
import { settings } from '../../../settings/server';

export const saveRoomType = async function (
Expand Down Expand Up @@ -41,11 +42,16 @@ export const saveRoomType = async function (
});
}

const result = (await Rooms.setTypeById(rid, roomType)) && (await Subscriptions.updateTypeByRoomId(rid, roomType));
const result = await Promise.all([Rooms.setTypeById(rid, roomType), Subscriptions.updateTypeByRoomId(rid, roomType)]);

if (!result) {
return result;
}

if (result[1]?.modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}

if (sendMessage) {
let message;
if (roomType === 'c') {
Expand All @@ -59,5 +65,6 @@ export const saveRoomType = async function (
}
await Message.saveSystemMessage('room_changed_privacy', rid, message, user);
}

return result;
};
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Rooms, Subscriptions } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { notifyOnSubscriptionChangedById } from '../../../lib/server/lib/notifyListener';

export async function handleSuggestedGroupKey(
handle: 'accept' | 'reject',
rid: string,
Expand Down Expand Up @@ -30,5 +32,8 @@ export async function handleSuggestedGroupKey(
await Rooms.addUserIdToE2EEQueueByRoomIds([sub.rid], userId);
}

await Subscriptions.unsetGroupE2ESuggestedKey(sub._id);
const { modifiedCount } = await Subscriptions.unsetGroupE2ESuggestedKey(sub._id);
if (modifiedCount) {
void notifyOnSubscriptionChangedById(sub._id);
}
}
11 changes: 9 additions & 2 deletions apps/meteor/app/e2e/server/methods/updateGroupKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Subscriptions } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { methodDeprecationLogger } from '../../../lib/server/lib/deprecationWarningLogger';
import { notifyOnSubscriptionChangedById, notifyOnSubscriptionChangedByRoomIdAndUserId } from '../../../lib/server/lib/notifyListener';

declare module '@rocket.chat/ddp-client' {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand All @@ -25,12 +26,18 @@ Meteor.methods<ServerMethods>({
if (mySub) {
// Setting the key to myself, can set directly to the final field
if (userId === uid) {
await Subscriptions.setGroupE2EKey(mySub._id, key);
const setGroupE2EKeyResponse = await Subscriptions.setGroupE2EKey(mySub._id, key);
if (setGroupE2EKeyResponse.modifiedCount) {
void notifyOnSubscriptionChangedById(mySub._id);
}
return;
}

// uid also has subscription to this room
await Subscriptions.setGroupE2ESuggestedKey(uid, rid, key);
const { modifiedCount } = await Subscriptions.setGroupE2ESuggestedKey(uid, rid, key);
if (modifiedCount) {
void notifyOnSubscriptionChangedByRoomIdAndUserId(rid, uid);
}
}
},
});
23 changes: 19 additions & 4 deletions apps/meteor/app/federation/server/endpoints/dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import EJSON from 'ejson';
import { API } from '../../../api/server';
import { FileUpload } from '../../../file-upload/server';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';
import { notifyOnMessageChange, notifyOnRoomChanged, notifyOnRoomChangedById } from '../../../lib/server/lib/notifyListener';
import {
notifyOnMessageChange,
notifyOnRoomChanged,
notifyOnRoomChangedById,
notifyOnSubscriptionChanged,
notifyOnSubscriptionChangedById,
} from '../../../lib/server/lib/notifyListener';
import { notifyUsersOnMessage } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendAllNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { processThreads } from '../../../threads/server/hooks/aftersavemessage';
Expand Down Expand Up @@ -141,7 +147,10 @@ const eventHandlers = {
const denormalizedSubscription = normalizers.denormalizeSubscription(subscription);

// Create the subscription
await Subscriptions.insertOne(denormalizedSubscription);
const { insertedId } = await Subscriptions.insertOne(denormalizedSubscription);
if (insertedId) {
void notifyOnSubscriptionChangedById(insertedId);
}
federationAltered = true;
}
} catch (ex) {
Expand Down Expand Up @@ -176,7 +185,10 @@ const eventHandlers = {
} = event;

// Remove the user's subscription
await Subscriptions.removeByRoomIdAndUserId(roomId, user._id);
const deletedSubscription = await Subscriptions.removeByRoomIdAndUserId(roomId, user._id);
if (deletedSubscription) {
void notifyOnSubscriptionChanged(deletedSubscription, 'removed');
}

// Refresh the servers list
await FederationServers.refreshServers();
Expand Down Expand Up @@ -204,7 +216,10 @@ const eventHandlers = {
} = event;

// Remove the user's subscription
await Subscriptions.removeByRoomIdAndUserId(roomId, user._id);
const deletedSubscription = await Subscriptions.removeByRoomIdAndUserId(roomId, user._id);
if (deletedSubscription) {
void notifyOnSubscriptionChanged(deletedSubscription, 'removed');
}

// Refresh the servers list
await FederationServers.refreshServers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { generateUsernameSuggestion } from '../../../lib/server/functions/getUse
import { insertMessage } from '../../../lib/server/functions/insertMessage';
import { saveUserIdentity } from '../../../lib/server/functions/saveUserIdentity';
import { setUserActiveStatus } from '../../../lib/server/functions/setUserActiveStatus';
import { notifyOnUserChange } from '../../../lib/server/lib/notifyListener';
import { notifyOnSubscriptionChangedByRoomId, notifyOnUserChange } from '../../../lib/server/lib/notifyListener';
import { createChannelMethod } from '../../../lib/server/methods/createChannel';
import { createPrivateGroupMethod } from '../../../lib/server/methods/createPrivateGroup';
import { getValidRoomName } from '../../../utils/server/lib/getValidRoomName';
Expand Down Expand Up @@ -1161,8 +1161,11 @@ export class ImportDataConverter {
}

async archiveRoomById(rid: string) {
await Rooms.archiveById(rid);
await Subscriptions.archiveByRoomId(rid);
const responses = await Promise.all([Rooms.archiveById(rid), Subscriptions.archiveByRoomId(rid)]);

if (responses[1]?.modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}
}

async convertData(startedByUserId: string, callbacks: IConversionCallbacks = {}): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscriptions } from '@rocket.chat/models';
import { callbacks } from '../../../../lib/callbacks';
import { getSubscriptionAutotranslateDefaultConfig } from '../../../../server/lib/getSubscriptionAutotranslateDefaultConfig';
import { getDefaultSubscriptionPref } from '../../../utils/lib/getDefaultSubscriptionPref';
import { notifyOnSubscriptionChangedById } from '../lib/notifyListener';
import { getDefaultChannels } from './getDefaultChannels';

export const addUserToDefaultChannels = async function (user: IUser, silenced?: boolean): Promise<void> {
Expand All @@ -14,8 +15,9 @@ export const addUserToDefaultChannels = async function (user: IUser, silenced?:
for await (const room of defaultRooms) {
if (!(await Subscriptions.findOneByRoomIdAndUserId(room._id, user._id, { projection: { _id: 1 } }))) {
const autoTranslateConfig = getSubscriptionAutotranslateDefaultConfig(user);

// Add a subscription to this user
await Subscriptions.createWithRoomAndUser(room, user, {
const { insertedId } = await Subscriptions.createWithRoomAndUser(room, user, {
ts: new Date(),
open: true,
alert: true,
Expand All @@ -27,6 +29,10 @@ export const addUserToDefaultChannels = async function (user: IUser, silenced?:
...getDefaultSubscriptionPref(user),
});

if (insertedId) {
void notifyOnSubscriptionChangedById(insertedId, 'inserted');
}

// Insert user joined message
if (!silenced) {
await Message.saveSystemMessage('uj', room._id, user.username || '', user);
Expand Down
8 changes: 6 additions & 2 deletions apps/meteor/app/lib/server/functions/addUserToRoom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getSubscriptionAutotranslateDefaultConfig } from '../../../../server/li
import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator';
import { settings } from '../../../settings/server';
import { getDefaultSubscriptionPref } from '../../../utils/lib/getDefaultSubscriptionPref';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnSubscriptionChangedById } from '../lib/notifyListener';

export const addUserToRoom = async function (
rid: string,
Expand Down Expand Up @@ -82,7 +82,7 @@ export const addUserToRoom = async function (

const autoTranslateConfig = getSubscriptionAutotranslateDefaultConfig(userToBeAdded);

await Subscriptions.createWithRoomAndUser(room, userToBeAdded as IUser, {
const { insertedId } = await Subscriptions.createWithRoomAndUser(room, userToBeAdded as IUser, {
ts: now,
open: true,
alert: !skipAlertSound,
Expand All @@ -93,6 +93,10 @@ export const addUserToRoom = async function (
...getDefaultSubscriptionPref(userToBeAdded as IUser),
});

if (insertedId) {
void notifyOnSubscriptionChangedById(insertedId, 'inserted');
}

void notifyOnRoomChangedById(rid);

if (!userToBeAdded.username) {
Expand Down
9 changes: 7 additions & 2 deletions apps/meteor/app/lib/server/functions/archiveRoom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ import type { IMessage } from '@rocket.chat/core-typings';
import { Rooms, Subscriptions } from '@rocket.chat/models';

import { callbacks } from '../../../../lib/callbacks';
import { notifyOnRoomChanged } from '../lib/notifyListener';
import { notifyOnRoomChanged, notifyOnSubscriptionChangedByRoomId } from '../lib/notifyListener';

export const archiveRoom = async function (rid: string, user: IMessage['u']): Promise<void> {
await Rooms.archiveById(rid);
await Subscriptions.archiveByRoomId(rid);

const archiveResponse = await Subscriptions.archiveByRoomId(rid);
if (archiveResponse.modifiedCount) {
void notifyOnSubscriptionChangedByRoomId(rid);
}

await Message.saveSystemMessage('room-archived', rid, '', user);

const room = await Rooms.findOneById(rid);
Expand Down
Loading

0 comments on commit 150c7b1

Please sign in to comment.