Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5951 create a command to trigger the import of a single message #5962

Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,31 @@ import { AxiosResponse } from 'axios';

import { GmailMessageParsedResponse } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response';
import { BatchQueries } from 'src/modules/messaging/message-import-manager/types/batch-queries';
import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util';

@Injectable()
export class MessagingFetchByBatchesService {
constructor(private readonly httpService: HttpService) {}

async fetchAllByBatches(
queries: BatchQueries,
messageIds: string[],
accessToken: string,
boundary: string,
): Promise<AxiosResponse<any, any>[]> {
): Promise<{
messageIdsByBatch: string[][];
batchResponses: AxiosResponse<any, any>[];
}> {
const batchLimit = 50;

let batchOffset = 0;

let batchResponses: AxiosResponse<any, any>[] = [];

while (batchOffset < queries.length) {
const messageIdsByBatch: string[][] = [];

while (batchOffset < messageIds.length) {
const batchResponse = await this.fetchBatch(
queries,
messageIds,
accessToken,
batchOffset,
batchLimit,
Expand All @@ -32,19 +38,25 @@ export class MessagingFetchByBatchesService {

batchResponses = batchResponses.concat(batchResponse);

messageIdsByBatch.push(
messageIds.slice(batchOffset, batchOffset + batchLimit),
);

batchOffset += batchLimit;
}

return batchResponses;
return { messageIdsByBatch, batchResponses };
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved
}

async fetchBatch(
queries: BatchQueries,
messageIds: string[],
accessToken: string,
batchOffset: number,
batchLimit: number,
boundary: string,
): Promise<AxiosResponse<any, any>> {
const queries = createQueriesFromMessageIds(messageIds);

const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit);

const response = await this.httpService.axiosRef.post(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { Command, CommandRunner, Option } from 'nest-commander';

import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MessagingAddSingleMessageToCacheForImportJob,
MessagingAddSingleMessageToCacheForImportJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';

type MessagingSingleMessageImportCommandOptions = {
messageId: string;
messageChannelId: string;
workspaceId: string;
};

@Command({
name: 'messaging:single-message-import',
description: 'Enqueue a job to schedule the import of a single message',
})
export class MessagingSingleMessageImportCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}

async run(
_passedParam: string[],
options: MessagingSingleMessageImportCommandOptions,
): Promise<void> {
await this.messageQueueService.add<MessagingAddSingleMessageToCacheForImportJobData>(
MessagingAddSingleMessageToCacheForImportJob.name,
{
messageId: options.messageId,
messageChannelId: options.messageChannelId,
workspaceId: options.workspaceId,
},
);
}

@Option({
flags: '-m, --message-id [message_id]',
description: 'Message ID',
required: true,
})
parseMessageId(value: string): string {
return value;
}

@Option({
flags: '-n, --message-channel-id [message_channel_id]',
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved
description: 'Message channel ID',
required: true,
})
parseMessageChannelId(value: string): string {
return value;
}

@Option({
flags: '-w, --workspace-id [workspace_id]',
description: 'Workspace ID',
required: true,
})
parseWorkspaceId(value: string): string {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { gmail_v1 } from 'googleapis';

import { assert, assertNotNull } from 'src/utils/assert';
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { MessageQuery } from 'src/modules/messaging/message-import-manager/types/message-or-thread-query';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
Expand All @@ -27,7 +26,7 @@ export class MessagingGmailFetchMessagesByBatchesService {
) {}

async fetchAllMessages(
queries: MessageQuery[],
messageIds: string[],
connectedAccountId: string,
workspaceId: string,
): Promise<GmailMessage[]> {
Expand All @@ -46,22 +45,24 @@ export class MessagingGmailFetchMessagesByBatchesService {

const accessToken = connectedAccount.accessToken;

const batchResponses = await this.fetchByBatchesService.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_messages',
);
const { messageIdsByBatch, batchResponses } =
await this.fetchByBatchesService.fetchAllByBatches(
messageIds,
accessToken,
'batch_gmail_messages',
);
let endTime = Date.now();

this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${
queries.length
messageIds.length
} messages in ${endTime - startTime}ms`,
);

startTime = Date.now();

const formattedResponse = this.formatBatchResponsesAsGmailMessages(
messageIdsByBatch,
batchResponses,
workspaceId,
connectedAccountId,
Expand All @@ -71,14 +72,15 @@ export class MessagingGmailFetchMessagesByBatchesService {

this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${
queries.length
messageIds.length
} messages in ${endTime - startTime}ms`,
);

return formattedResponse;
}

private formatBatchResponseAsGmailMessage(
messageIds: string[],
responseCollection: AxiosResponse<any, any>,
workspaceId: string,
connectedAccountId: string,
Expand All @@ -90,94 +92,92 @@ export class MessagingGmailFetchMessagesByBatchesService {
return str.replace(/\0/g, '');
};

const formattedResponse = parsedResponses.map(
(response): GmailMessage | null => {
if ('error' in response) {
if (response.error.code === 404) {
return null;
}

throw response.error;
}

const {
historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);

if (!from) {
this.logger.log(
`From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!to && !deliveredTo && !bcc && !cc) {
this.logger.log(
`To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!headerMessageId) {
this.logger.log(
`Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!threadId) {
this.logger.log(
`Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

const formattedResponse = parsedResponses.map((response, index) => {
if ('error' in response) {
if (response.error.code === 404) {
return null;
}

const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];

let textWithoutReplyQuotations = text;

if (text) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain');
}

const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};

return messageFromGmail;
},
);
throw { ...response.error, messageId: messageIds[index] };
}

const {
historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);

if (!from) {
this.logger.log(
`From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!to && !deliveredTo && !bcc && !cc) {
this.logger.log(
`To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!headerMessageId) {
this.logger.log(
`Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

if (!threadId) {
this.logger.log(
`Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);

return null;
}

const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];

let textWithoutReplyQuotations = text;

if (text) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain');
}

const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};

return messageFromGmail;
});

const filteredMessages = formattedResponse.filter((message) =>
assertNotNull(message),
Expand All @@ -187,12 +187,14 @@ export class MessagingGmailFetchMessagesByBatchesService {
}

private formatBatchResponsesAsGmailMessages(
messageIdsByBatch: string[][],
batchResponses: AxiosResponse<any, any>[],
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const messageBatches = batchResponses.map((response) => {
const messageBatches = batchResponses.map((response, index) => {
return this.formatBatchResponseAsGmailMessage(
messageIdsByBatch[index],
response,
workspaceId,
connectedAccountId,
Expand Down
Loading
Loading