Skip to content
68 changes: 50 additions & 18 deletions app/federation/server/endpoints/dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import { isFederationEnabled } from '../lib/isFederationEnabled';
import { getUpload, requestEventsFromLatest } from '../handler';
import { notifyUsersOnMessage } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendAllNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { processThreads } from '../../../threads/server/hooks/aftersavemessage';
import { processDeleteInThread } from '../../../threads/server/hooks/afterdeletemessage';

const eventHandlers = {
//
Expand Down Expand Up @@ -90,6 +92,9 @@ const eventHandlers = {
async [eventTypes.ROOM_ADD_USER](event) {
const eventResult = await FederationRoomEvents.addEvent(event.context, event);

// We only want to refresh the server list and update the room federation array if something changed
let federationAltered = false;

// If the event was successfully added, handle the event locally
if (eventResult.success) {
const { data: { roomId, user, subscription, domainsAfterAdd } } = event;
Expand All @@ -98,35 +103,49 @@ const eventHandlers = {
const persistedUser = Users.findOne({ _id: user._id });

if (persistedUser) {
// Update the federation
Users.update({ _id: persistedUser._id }, { $set: { federation: user.federation } });
// Update the federation, if its not already set (if it's set, this is likely an event being reprocessed)
if (!persistedUser.federation) {
Users.update({ _id: persistedUser._id }, { $set: { federation: user.federation } });
federationAltered = true;
}
} else {
// Denormalize user
const denormalizedUser = normalizers.denormalizeUser(user);

// Create the user
Users.insert(denormalizedUser);
federationAltered = true;
}

// Check if subscription exists
const persistedSubscription = Subscriptions.findOne({ _id: subscription._id });

if (persistedSubscription) {
// Update the federation
Subscriptions.update({ _id: persistedSubscription._id }, { $set: { federation: subscription.federation } });
} else {
// Denormalize subscription
const denormalizedSubscription = normalizers.denormalizeSubscription(subscription);
try {
if (persistedSubscription) {
// Update the federation, if its not already set (if it's set, this is likely an event being reprocessed
if (!persistedSubscription.federation) {
Subscriptions.update({ _id: persistedSubscription._id }, { $set: { federation: subscription.federation } });
federationAltered = true;
}
} else {
// Denormalize subscription
const denormalizedSubscription = normalizers.denormalizeSubscription(subscription);

// Create the subscription
Subscriptions.insert(denormalizedSubscription);
// Create the subscription
Subscriptions.insert(denormalizedSubscription);
federationAltered = true;
}
} catch (ex) {
logger.server.debug(`unable to create subscription for user ( ${ user._id } ) in room (${ roomId })`);
}

// Refresh the servers list
FederationServers.refreshServers();
if (federationAltered) {
FederationServers.refreshServers();

// Update the room's federation property
Rooms.update({ _id: roomId }, { $set: { 'federation.domains': domainsAfterAdd } });
// Update the room's federation property
Rooms.update({ _id: roomId }, { $set: { 'federation.domains': domainsAfterAdd } });
}
}

return eventResult;
Expand Down Expand Up @@ -193,7 +212,9 @@ const eventHandlers = {

if (persistedMessage) {
// Update the federation
Messages.update({ _id: persistedMessage._id }, { $set: { federation: message.federation } });
if (!persistedMessage.federation) {
Messages.update({ _id: persistedMessage._id }, { $set: { federation: message.federation } });
}
} else {
// Load the room
const room = Rooms.findOneById(message.rid);
Expand Down Expand Up @@ -239,11 +260,17 @@ const eventHandlers = {
}

// Create the message
Messages.insert(denormalizedMessage);
try {
Messages.insert(denormalizedMessage);

processThreads(denormalizedMessage, room);

// Notify users
notifyUsersOnMessage(denormalizedMessage, room);
sendAllNotifications(denormalizedMessage, room);
// Notify users
notifyUsersOnMessage(denormalizedMessage, room);
sendAllNotifications(denormalizedMessage, room);
} catch (err) {
logger.server.debug(`Error on creating message: ${ message._id }`);
}
}
}

Expand Down Expand Up @@ -285,6 +312,11 @@ const eventHandlers = {
if (eventResult.success) {
const { data: { roomId, messageId } } = event;

const message = Messages.findOne({ _id: messageId });
if (message) {
processDeleteInThread(message);
}

// Remove the message
Messages.removeById(messageId);

Expand Down
2 changes: 1 addition & 1 deletion app/federation/server/functions/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export function updateEnabled(enabled) {
}

export const checkRoomType = (room) => room.t === 'p' || room.t === 'd';
export const checkRoomDomainsLength = (domains) => domains.length <= 10;
export const checkRoomDomainsLength = (domains) => domains.length <= (process.env.FEDERATED_DOMAINS_LENGTH || 10);

export const hasExternalDomain = ({ federation }) => {
// same test as isFederated(room)
Expand Down
7 changes: 6 additions & 1 deletion app/federation/server/handler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ export function dispatchEvents(domains, events) {
throw disabled('client.dispatchEvents');
}

domains = [...new Set(domains)];

logger.client.debug(() => `dispatchEvents => domains=${ domains.join(', ') } events=${ events.map((e) => JSON.stringify(e, null, 2)) }`);

const uri = '/api/v1/federation.events.dispatch';
Expand All @@ -65,7 +67,10 @@ export function dispatchEvents(domains, events) {
}

export function dispatchEvent(domains, event) {
dispatchEvents(domains, [event]);
// Ensure the domain list is distinct to avoid excessive events
const distinctDomains = [...new Set(domains)].filter((domain) => domain === event.origin);

dispatchEvents(distinctDomains, [event]);
}

export function getUpload(domain, fileId) {
Expand Down
11 changes: 8 additions & 3 deletions app/federation/server/hooks/afterAddedToRoom.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ async function afterAddedToRoom(involvedUsers, room) {
//

// Get the users domains
const domainsAfterAdd = users.map((u) => u.federation.origin);
const domainsAfterAdd = [];
users.forEach((user) => {
if (user.hasOwnProperty('federation') && !domainsAfterAdd.includes(user.federation.origin)) {
domainsAfterAdd.push(user.federation.origin);
}
});

// Check if the number of domains is allowed
if (!checkRoomDomainsLength(room.federation.domains)) {
throw new Error('Cannot federate rooms with more than 10 domains');
if (!checkRoomDomainsLength(domainsAfterAdd)) {
throw new Error(`Cannot federate rooms with more than ${ process.env.FEDERATED_DOMAINS_LENGTH || 10 } domains`);
}

//
Expand Down
2 changes: 1 addition & 1 deletion app/federation/server/hooks/afterCreateRoom.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export async function doAfterCreateRoom(room, users, subscriptions) {

// Check if the number of domains is allowed
if (!checkRoomDomainsLength(normalizedRoom.federation.domains)) {
throw new Error('Cannot federate rooms with more than 10 domains');
throw new Error(`Cannot federate rooms with more than ${ process.env.FEDERATED_DOMAINS_LENGTH || 10 } domains`);
}

// Ensure a genesis event for this room
Expand Down
4 changes: 2 additions & 2 deletions app/federation/server/hooks/afterLeaveRoom.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function afterLeaveRoom(user, room) {

try {
// Get the domains after leave
const domainsAfterLeave = users.map((u) => u.federation.origin);
const domainsAfterLeave = [...new Set(users.map((u) => u.federation.origin))];

//
// Normalize the room's federation status
Expand All @@ -28,7 +28,7 @@ async function afterLeaveRoom(user, room) {
usersBeforeLeave.push(user);

// Get the users domains
const domainsBeforeLeft = usersBeforeLeave.map((u) => u.federation.origin);
const domainsBeforeLeft = [...new Set(usersBeforeLeave.map((u) => u.federation.origin))];

//
// Create the user left event
Expand Down
4 changes: 2 additions & 2 deletions app/federation/server/hooks/afterRemoveFromRoom.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async function afterRemoveFromRoom(involvedUsers, room) {

try {
// Get the domains after removal
const domainsAfterRemoval = users.map((u) => u.federation.origin);
const domainsAfterRemoval = [...new Set(users.map((u) => u.federation.origin))];

//
// Normalize the room's federation status
Expand All @@ -30,7 +30,7 @@ async function afterRemoveFromRoom(involvedUsers, room) {
usersBeforeRemoval.push(removedUser);

// Get the users domains
const domainsBeforeRemoval = usersBeforeRemoval.map((u) => u.federation.origin);
const domainsBeforeRemoval = [...new Set(usersBeforeRemoval.map((u) => u.federation.origin))];

//
// Create the user remove event
Expand Down
1 change: 1 addition & 0 deletions app/federation/server/hooks/afterUnsetReaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { FederationRoomEvents, Rooms } from '../../../models/server';
import { logger } from '../lib/logger';
import { hasExternalDomain } from '../functions/helpers';
import { getFederationDomain } from '../lib/getFederationDomain';
import { dispatchEvent } from '../handler';

async function afterUnsetReaction(message, { user, reaction }) {
const room = Rooms.findOneById(message.rid, { fields: { federation: 1 } });
Expand Down
15 changes: 10 additions & 5 deletions app/federation/server/lib/dns.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dnsResolver from 'dns';

import { Meteor } from 'meteor/meteor';
import mem from 'mem';

import * as federationErrors from '../functions/errors';
import { logger } from './logger';
Expand All @@ -10,6 +11,10 @@ import { federationRequest } from './http';
const dnsResolveSRV = Meteor.wrapAsync(dnsResolver.resolveSrv);
const dnsResolveTXT = Meteor.wrapAsync(dnsResolver.resolveTxt);

const cacheMaxAge = 3600000; // one hour
const memoizedDnsResolveSRV = mem(dnsResolveSRV, { maxAge: cacheMaxAge });
const memoizedDnsResolveTXT = mem(dnsResolveTXT, { maxAge: cacheMaxAge });

const hubUrl = process.env.NODE_ENV === 'development' ? 'http://localhost:8080' : 'https://hub.rocket.chat';

export function registerWithHub(peerDomain, url, publicKey) {
Expand Down Expand Up @@ -68,7 +73,7 @@ export function search(peerDomain) {
// Search by HTTPS first
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } srv=_rocketchat._https.${ peerDomain }`);
srvEntries = dnsResolveSRV(`_rocketchat._https.${ peerDomain }`);
srvEntries = memoizedDnsResolveSRV(`_rocketchat._https.${ peerDomain }`);
protocol = 'https';
} catch (err) {
// Ignore errors when looking for DNS entries
Expand All @@ -78,7 +83,7 @@ export function search(peerDomain) {
if (!srvEntries.length) {
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } srv=_rocketchat._http.${ peerDomain }`);
srvEntries = dnsResolveSRV(`_rocketchat._http.${ peerDomain }`);
srvEntries = memoizedDnsResolveSRV(`_rocketchat._http.${ peerDomain }`);
protocol = 'http';
} catch (err) {
// Ignore errors when looking for DNS entries
Expand All @@ -89,12 +94,12 @@ export function search(peerDomain) {
if (!srvEntries.length) {
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } srv=_rocketchat._tcp.${ peerDomain }`);
srvEntries = dnsResolveSRV(`_rocketchat._tcp.${ peerDomain }`);
srvEntries = memoizedDnsResolveSRV(`_rocketchat._tcp.${ peerDomain }`);
protocol = 'https'; // https is the default

// Then, also try to get the protocol
logger.dns.debug(`search: peerDomain=${ peerDomain } txt=rocketchat-tcp-protocol.${ peerDomain }`);
protocol = dnsResolveTXT(`rocketchat-tcp-protocol.${ peerDomain }`);
protocol = memoizedDnsResolveSRV(`rocketchat-tcp-protocol.${ peerDomain }`);
protocol = protocol[0].join('');

if (protocol !== 'http' && protocol !== 'https') {
Expand All @@ -119,7 +124,7 @@ export function search(peerDomain) {
// Get the public key from the TXT record
try {
logger.dns.debug(`search: peerDomain=${ peerDomain } txt=rocketchat-public-key.${ peerDomain }`);
const publicKeyTxtRecords = dnsResolveTXT(`rocketchat-public-key.${ peerDomain }`);
const publicKeyTxtRecords = memoizedDnsResolveTXT(`rocketchat-public-key.${ peerDomain }`);

// Join the TXT record, that might be split
publicKey = publicKeyTxtRecords[0].join('');
Expand Down
2 changes: 2 additions & 0 deletions app/federation/server/normalizers/room.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const normalizeRoom = (originalResource, users) => {
}
}

domains = [...new Set(domains)];

// Federation
resource.federation = resource.federation || {
origin: getFederationDomain(), // The origin of this resource, where it was created
Expand Down