Skip to content
7 changes: 7 additions & 0 deletions .changeset/many-badgers-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@rocket.chat/meteor": minor
"@rocket.chat/model-typings": minor
"@rocket.chat/models": minor
---

Makes Omnichannel converstion start process transactional.
23 changes: 13 additions & 10 deletions apps/meteor/app/livechat/server/lib/Helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import {
LivechatRooms,
LivechatDepartment,
Subscriptions,
Rooms,
Users,
LivechatContacts,
} from '@rocket.chat/models';
import { Match, check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';
import type { ClientSession } from 'mongodb';
import { ObjectId } from 'mongodb';

import { Livechat as LivechatTyped } from './LivechatTyped';
Expand Down Expand Up @@ -67,12 +67,12 @@ export const allowAgentSkipQueue = (agent: SelectedAgent) => {

return hasRoleAsync(agent.agentId, 'bot');
};
export const createLivechatRoom = async (
export const prepareLivechatRoom = async (
rid: string,
guest: ILivechatVisitor,
roomInfo: IOmnichannelRoomInfo = { source: { type: OmnichannelSourceType.OTHER } },
extraData?: IOmnichannelRoomExtraData,
): Promise<IOmnichannelRoom> => {
): Promise<InsertionModel<IOmnichannelRoom>> => {
check(rid, String);
check(
guest,
Expand Down Expand Up @@ -112,7 +112,7 @@ export const createLivechatRoom = async (
const verified = Boolean(contact.channels.some((channel) => isVerifiedChannelInSource(channel, _id, source)));

// TODO: Solve `u` missing issue
const room: InsertionModel<IOmnichannelRoom> = {
return {
_id: rid,
msgs: 0,
usersCount: 1,
Expand Down Expand Up @@ -145,26 +145,26 @@ export const createLivechatRoom = async (
estimatedWaitingTimeQueue: DEFAULT_SLA_CONFIG.ESTIMATED_WAITING_TIME_QUEUE,
...extraRoomInfo,
} as InsertionModel<IOmnichannelRoom>;
};

const result = await Rooms.findOneAndUpdate(
export const createLivechatRoom = async (room: InsertionModel<IOmnichannelRoom>, session: ClientSession) => {
const result = await LivechatRooms.findOneAndUpdate(
room,
{
$set: {},
},
{
upsert: true,
returnDocument: 'after',
session,
},
);

if (!result) {
throw new Error('Room not created');
}

await callbacks.run('livechat.newRoom', room);
await Message.saveSystemMessageAndNotifyUser('livechat-started', rid, '', { _id, username }, { groupable: false, token: guest.token });

return result as IOmnichannelRoom;
return result;
};

export const createLivechatInquiry = async ({
Expand All @@ -174,13 +174,15 @@ export const createLivechatInquiry = async ({
message,
initialStatus,
extraData,
session,
}: {
rid: string;
name?: string;
guest?: Pick<ILivechatVisitor, '_id' | 'username' | 'status' | 'department' | 'name' | 'token' | 'activity'>;
message?: string;
initialStatus?: LivechatInquiryStatus;
extraData?: IOmnichannelInquiryExtraData;
session?: ClientSession;
}) => {
check(rid, String);
check(name, String);
Expand All @@ -202,7 +204,7 @@ export const createLivechatInquiry = async ({
const ts = new Date();

logger.debug({
msg: `Creating livechat inquiry for visitor ${_id}`,
msg: `Creating livechat inquiry for visitor`,
visitor: { _id, username, department, status, activity },
});

Expand Down Expand Up @@ -235,6 +237,7 @@ export const createLivechatInquiry = async ({
{
upsert: true,
returnDocument: 'after',
session,
},
);
logger.debug(`Inquiry ${result} created for visitor ${_id}`);
Expand Down
90 changes: 59 additions & 31 deletions apps/meteor/app/livechat/server/lib/QueueManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Apps, AppEvents } from '@rocket.chat/apps';
import { Omnichannel } from '@rocket.chat/core-services';
import { Message, Omnichannel } from '@rocket.chat/core-services';
import type {
ILivechatDepartment,
IOmnichannelRoomInfo,
Expand All @@ -12,25 +12,23 @@ import type {
} from '@rocket.chat/core-typings';
import { LivechatInquiryStatus } from '@rocket.chat/core-typings';
import { Logger } from '@rocket.chat/logger';
import type { InsertionModel } from '@rocket.chat/model-typings';
import { LivechatContacts, LivechatDepartment, LivechatDepartmentAgents, LivechatInquiry, LivechatRooms, Users } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';
import { Match, check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';

import { createLivechatRoom, createLivechatInquiry, allowAgentSkipQueue } from './Helper';
import { createLivechatRoom, createLivechatInquiry, allowAgentSkipQueue, prepareLivechatRoom } from './Helper';
import { Livechat } from './LivechatTyped';
import { RoutingManager } from './RoutingManager';
import { isVerifiedChannelInSource } from './contacts/isVerifiedChannelInSource';
import { getOnlineAgents } from './getOnlineAgents';
import { getInquirySortMechanismSetting } from './settings';
import { dispatchInquiryPosition } from '../../../../ee/app/livechat-enterprise/server/lib/Helper';
import { callbacks } from '../../../../lib/callbacks';
import { client, shouldRetryTransaction } from '../../../../server/database/utils';
import { sendNotification } from '../../../lib/server';
import {
notifyOnLivechatInquiryChangedById,
notifyOnLivechatInquiryChanged,
notifyOnSettingChanged,
} from '../../../lib/server/lib/notifyListener';
import { notifyOnLivechatInquiryChangedById, notifyOnLivechatInquiryChanged } from '../../../lib/server/lib/notifyListener';
import { settings } from '../../../settings/server';
import { i18n } from '../../../utils/lib/i18n';
import { getOmniChatSortQuery } from '../../lib/inquiries';
Expand Down Expand Up @@ -213,6 +211,47 @@ export class QueueManager {
return Boolean(contact.channels.some((channel) => isVerifiedChannelInSource(channel, room.v._id, room.source)));
}

static async startConversation(
rid: string,
insertionRoom: InsertionModel<IOmnichannelRoom>,
guest: ILivechatVisitor,
roomInfo: IOmnichannelRoomInfo,
defaultAgent?: SelectedAgent,
message?: string,
extraData?: IOmnichannelRoomExtraData,
attempts = 3,
): Promise<{ room: IOmnichannelRoom; inquiry: ILivechatInquiryRecord }> {
const session = client.startSession();
try {
session.startTransaction();
const room = await createLivechatRoom(insertionRoom, session);
logger.debug(`Room for visitor ${guest._id} created with id ${room._id}`);
const inquiry = await createLivechatInquiry({
rid,
name: room.fname,
initialStatus: await this.getInquiryStatus({ room, agent: defaultAgent }),
guest,
message,
extraData: { ...extraData, source: roomInfo.source },
session,
});
await session.commitTransaction();
return { room, inquiry };
} catch (e) {
await session.abortTransaction();
if (shouldRetryTransaction(e)) {
if (attempts > 0) {
logger.debug({ msg: 'Retrying transaction because of transient error', attemptsLeft: attempts });
return this.startConversation(rid, insertionRoom, guest, roomInfo, defaultAgent, message, extraData, attempts - 1);
}
throw new Error('error-failed-to-start-conversation');
}
throw e;
} finally {
await session.endSession();
}
}

static async requestRoom({
guest,
rid = Random.id(),
Expand Down Expand Up @@ -280,38 +319,27 @@ export class QueueManager {
}
}

const room = await createLivechatRoom(rid, { ...guest, ...(department && { department }) }, roomInfo, {
const insertionRoom = await prepareLivechatRoom(rid, { ...guest, ...(department && { department }) }, roomInfo, {
...extraData,
...(Boolean(customFields) && { customFields }),
});

if (!room) {
logger.error(`Room for visitor ${guest._id} not found`);
throw new Error('room-not-found');
}
logger.debug(`Room for visitor ${guest._id} created with id ${room._id}`);
// Transactional start of the conversation. This should prevent rooms from being created without inquiries and viceversa.
// All the actions that happened inside createLivechatRoom are now outside this transaction
const { room, inquiry } = await this.startConversation(rid, insertionRoom, guest, roomInfo, defaultAgent, message, extraData);

const inquiry = await createLivechatInquiry({
// TODO: investigate if this setting is actually useful somewhere
await LivechatRooms.updateRoomCount();
await callbacks.run('livechat.newRoom', room);
await Message.saveSystemMessageAndNotifyUser(
'livechat-started',
rid,
name: room.fname,
initialStatus: await this.getInquiryStatus({ room, agent: defaultAgent }),
guest,
message,
extraData: { ...extraData, source: roomInfo.source },
});

if (!inquiry) {
logger.error(`Inquiry for visitor ${guest._id} not found`);
throw new Error('inquiry-not-found');
}

'',
{ _id: guest._id, username: guest.username },
{ groupable: false, token: guest.token },
);
void Apps.self?.triggerEvent(AppEvents.IPostLivechatRoomStarted, room);

const livechatSetting = await LivechatRooms.updateRoomCount();
if (livechatSetting) {
void notifyOnSettingChanged(livechatSetting);
}

await this.processNewInquiry(inquiry, room, defaultAgent);
const newRoom = await LivechatRooms.findOneById(rid);

Expand Down
Loading