Skip to content
Merged
8 changes: 6 additions & 2 deletions app/lib/server/functions/setStatusText.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ export const _setStatusTextPromise = async function(userId, statusText) {
await UsersRaw.updateStatusText(user._id, statusText);

const { _id, username, status } = user;
api.broadcast('userpresence', { user: { _id, username, status, statusText } });
api.broadcast('presence.status', {
user: { _id, username, status, statusText },
});

return true;
};
Expand All @@ -48,7 +50,9 @@ export const _setStatusText = function(userId, statusText) {
user.statusText = statusText;

const { _id, username, status } = user;
api.broadcast('userpresence', { user: { _id, username, status, statusText } });
api.broadcast('presence.status', {
user: { _id, username, status, statusText },
});

return true;
};
Expand Down
50 changes: 27 additions & 23 deletions app/models/server/raw/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import { UsersSessionsRaw } from './UsersSessions';
import UsersSessionsModel from '../models/UsersSessions';
import { ServerEventsRaw } from './ServerEvents';
import { trash } from '../models/_BaseDb';
import { initWatchers } from '../../../../server/modules/watchers/watchers.module';
import LoginServiceConfigurationModel from '../models/LoginServiceConfiguration';
import { LoginServiceConfigurationRaw } from './LoginServiceConfiguration';
import { InstanceStatusRaw } from './InstanceStatus';
Expand All @@ -64,6 +63,8 @@ import { IntegrationHistoryRaw } from './IntegrationHistory';
import IntegrationHistoryModel from '../models/IntegrationHistory';
import OmnichannelQueueModel from '../models/OmnichannelQueue';
import { OmnichannelQueueRaw } from './OmnichannelQueue';
import { api } from '../../../../server/sdk/api';
import { initWatchers } from '../../../../server/modules/watchers/watchers.module';

const trashCollection = trash.rawCollection();

Expand Down Expand Up @@ -117,27 +118,30 @@ const map = {
[Integrations.col.collectionName]: IntegrationsModel,
};

!process.env.DISABLE_DB_WATCH && initWatchers({
Messages,
Users,
Subscriptions,
Settings,
LivechatInquiry,
LivechatDepartmentAgents,
UsersSessions,
Permissions,
Roles,
Rooms,
LoginServiceConfiguration,
InstanceStatus,
IntegrationHistory,
Integrations,
}, (model, fn) => {
const meteorModel = map[model.col.collectionName];
if (!process.env.DISABLE_DB_WATCH) {
const models = {
Messages,
Users,
Subscriptions,
Settings,
LivechatInquiry,
LivechatDepartmentAgents,
UsersSessions,
Permissions,
Roles,
Rooms,
LoginServiceConfiguration,
InstanceStatus,
IntegrationHistory,
Integrations,
};

if (!meteorModel) {
return;
}
initWatchers(models, api.broadcastLocal.bind(api), (model, fn) => {
const meteorModel = map[model.col.collectionName];
if (!meteorModel) {
return;
}

meteorModel.on('change', fn);
});
meteorModel.on('change', fn);
});
}
20 changes: 12 additions & 8 deletions app/notifications/server/lib/Notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Promise } from 'meteor/promise';
import { DDPCommon } from 'meteor/ddp-common';

import { NotificationsModule } from '../../../../server/modules/notifications/notifications.module';
import { Streamer, StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import { Streamer } from '../../../../server/modules/streamer/streamer.module';
import { api } from '../../../../server/sdk/api';
import {
Subscriptions as SubscriptionsRaw,
Expand All @@ -13,13 +13,13 @@ import {
} from '../../../models/server/raw';

// TODO: Replace this in favor of the api.broadcast
StreamerCentral.on('broadcast', (name, eventName, args) => {
api.broadcast('stream', [
name,
eventName,
args,
]);
});
// StreamerCentral.on('broadcast', (name, eventName, args) => {
// api.broadcast('stream', [
// name,
// eventName,
// args,
// ]);
// });

export class Stream extends Streamer {
registerPublication(name: string, fn: (eventName: string, options: boolean | {useCollection?: boolean; args?: any}) => void): void {
Expand Down Expand Up @@ -51,4 +51,8 @@ notifications.configure({
Settings: SettingsRaw,
});

notifications.streamLocal.on('broadcast', ({ eventName, args }) => {
api.broadcastLocal(eventName, ...args);
});

export default notifications;
4 changes: 4 additions & 0 deletions ee/server/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ class NetworkBroker implements IBroker {
return this.broker.broadcast(event, args);
}

async broadcastLocal<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
this.broker.broadcastLocal(event, args);
}

async nodeList(): Promise<IBrokerNode[]> {
return this.broker.call('$node.list');
}
Expand Down
3 changes: 1 addition & 2 deletions ee/server/services/presence/actions/setStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ export async function setStatus(uid: string, statusDefault: USER_STATUS, statusT

if (result.modifiedCount > 0) {
const user = await User.findOne<IUser>(query, { projection: { username: 1 } });
api.broadcast('userpresence', {
action: 'updated',
api.broadcast('presence.status', {
user: { _id: uid, username: user?.username, status, statusText },
});
}
Expand Down
3 changes: 1 addition & 2 deletions ee/server/services/presence/actions/updateUserPresence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export async function updateUserPresence(uid: string): Promise<void> {
});

if (result.modifiedCount > 0) {
api.broadcast('userpresence', {
action: 'updated',
api.broadcast('presence.status', {
user: { _id: uid, username: user.username, status, statusText: user.statusText },
});
}
Expand Down
3 changes: 2 additions & 1 deletion ee/server/services/stream-hub/StreamHub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { IntegrationHistoryRaw } from '../../../../app/models/server/raw/Integra
import { LivechatDepartmentAgentsRaw } from '../../../app/models/server/raw/LivechatDepartmentAgents';
import { IntegrationsRaw } from '../../../../app/models/server/raw/Integrations';
import { PermissionsRaw } from '../../../../app/models/server/raw/Permissions';
import { api } from '../../../../server/sdk/api';

export class StreamHub extends ServiceClass implements IServiceClass {
protected name = 'hub';
Expand Down Expand Up @@ -58,7 +59,7 @@ export class StreamHub extends ServiceClass implements IServiceClass {
Integrations,
};

initWatchers(models, (model, fn) => {
initWatchers(models, api.broadcast.bind(api), (model, fn) => {
model.col.watch([]).on('change', (event) => {
switch (event.operationType) {
case 'insert':
Expand Down
4 changes: 3 additions & 1 deletion imports/users-presence/server/activeUsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ export const setUserStatus = (user, status/* , statusConnection*/) => {

// since this callback can be called by only one instance in the cluster
// we need to broadcast the change to all instances
api.broadcast('userpresence', { user: { status, _id, username, statusText } }); // remove username
api.broadcast('presence.status', {
user: { status, _id, username, statusText }, // TODO remove username
});
};

let TroubleshootDisablePresenceBroadcast;
Expand Down
2 changes: 1 addition & 1 deletion server/modules/listeners/listeners.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class ListenersModule {
notifications.notifyLoggedInThisInstance('roles-change', update);
});

service.onEvent('userpresence', ({ user }) => {
service.onEvent('presence.status', ({ user }) => {
const {
_id, username, status, statusText,
} = user;
Expand Down
9 changes: 9 additions & 0 deletions server/modules/notifications/notifications.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export class NotificationsModule {

public readonly streamRoomData: IStreamer;

public readonly streamLocal: IStreamer;

constructor(
private Streamer: IStreamerConstructor,
) {
Expand Down Expand Up @@ -88,6 +90,8 @@ export class NotificationsModule {
});

this.streamUser = new this.Streamer('notify-user');

this.streamLocal = new this.Streamer('local');
}

async configure({ Rooms, Subscriptions, Users, Settings }: IModelsParam): Promise<void> {
Expand Down Expand Up @@ -374,6 +378,11 @@ export class NotificationsModule {
});
}
});

this.streamLocal.serverOnly = true;
this.streamLocal.allowRead('none');
this.streamLocal.allowEmit('all');
this.streamLocal.allowWrite('none');
}

notifyAll(eventName: string, ...args: any[]): void {
Expand Down
Loading