Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5620 implement throttle logic for message and calendar sync #5718

Merged
4 changes: 2 additions & 2 deletions packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import TabItem from '@theme/TabItem';
Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
```
# from your worker container
yarn command:prod cron:messaging:gmail-messages-import
yarn command:prod cron:messaging:gmail-message-list-fetch
yarn command:prod cron:messaging:messages-import
yarn command:prod cron:messaging:message-list-fetch
```

# Setup Environment Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class GoogleAPIRefreshAccessTokenService {
}

await this.messagingTelemetryService.track({
eventName: `refresh-token.error.insufficient_permissions`,
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const MESSAGING_THROTTLE_DURATION = 1000 * 60 * 1; // 1 minute
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const MESSAGING_THROTTLE_MAX_ATTEMPTS = 4;
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,39 @@ export class MessageChannelRepository {
transactionManager,
);
}

public async updateThrottlePauseUntilAndIncrementThrottleFailureCount(
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved
id: string,
throttleDurationMs: number,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $2`,
[throttleDurationMs, id],
workspaceId,
transactionManager,
);
}

public async resetThrottlePauseUntilAndThrottleFailureCount(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts';

type SyncStep =
| 'partial-message-list-fetch'
Expand All @@ -27,6 +30,8 @@ export class MessagingErrorHandlingService {
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}

public async handleGmailError(
Expand Down Expand Up @@ -100,7 +105,7 @@ export class MessagingErrorHandlingService {
}
}

public async handleRateLimitExceeded(
private async handleRateLimitExceeded(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
Expand All @@ -114,6 +119,19 @@ export class MessagingErrorHandlingService {
message: `${error.code}: ${error.reason}`,
});

if (
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
) {
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);

return;
}

await this.throttle(messageChannel, workspaceId);

switch (syncStep) {
case 'full-message-list-fetch':
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
Expand Down Expand Up @@ -141,7 +159,7 @@ export class MessagingErrorHandlingService {
}
}

public async handleInsufficientPermissions(
private async handleInsufficientPermissions(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
Expand All @@ -166,7 +184,7 @@ export class MessagingErrorHandlingService {
);
}

public async handleNotFound(
private async handleNotFound(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
Expand All @@ -189,4 +207,27 @@ export class MessagingErrorHandlingService {
workspaceId,
);
}

private async throttle(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
const throttleDuration =
MESSAGING_THROTTLE_DURATION *
Math.pow(2, messageChannel.throttleFailureCount);

await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount(
messageChannel.id,
throttleDuration,
workspaceId,
);

await this.messagingTelemetryService.track({
eventName: 'message_channel.throttle',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `Throttling for ${throttleDuration}ms`,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [
'promotions',
'social',
'forums',
'updates',
];
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ export class MessagingGmailFullMessageListFetchService {
return;
}

await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
messageChannel.id,
workspaceId,
);

await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messa
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';

@Injectable()
export class MessagingGmailMessagesImportService {
Expand All @@ -39,6 +40,8 @@ export class MessagingGmailMessagesImportService {
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}

async processMessageBatchImport(
Expand Down Expand Up @@ -134,6 +137,11 @@ export class MessagingGmailMessagesImportService {
);
}

await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
messageChannel.id,
workspaceId,
);

return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ export class MessagingGmailPartialMessageListFetchService {
return;
}

await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
messageChannel.id,
workspaceId,
);
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved

if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ export class MessagingMessageListFetchJob
return;
}

if (
messageChannel.throttlePauseUntil &&
messageChannel.throttlePauseUntil > new Date()
) {
return;
}

switch (messageChannel.syncSubStatus) {
case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
Expand All @@ -85,6 +92,7 @@ export class MessagingMessageListFetchJob
eventName: 'partial_message_list_fetch.started',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});

await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
Expand All @@ -111,6 +119,7 @@ export class MessagingMessageListFetchJob
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});

await this.gmailFullMessageListFetchService.processMessageListFetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export class MessagingMessagesImportJob
messageChannelId: messageChannel.id,
});

if (
messageChannel.throttlePauseUntil &&
messageChannel.throttlePauseUntil > new Date()
) {
continue;
}

const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const filterOutIcsAttachments = (messages: GmailMessage[]) => {

const isPersonEmail = (email: string): boolean => {
const nonPersonalPattern =
/noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@)/;
/noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@|notifications@|notification@|news@)/;

return !nonPersonalPattern.test(email);
};
Expand Down
Loading