Skip to content

Commit

Permalink
5620 implement throttle logic for message and calendar sync (#5718)
Browse files Browse the repository at this point in the history
Closes #5620 and improve messages filters
  • Loading branch information
bosiraphael authored Jun 4, 2024
1 parent 32d4b37 commit 3f9f2c3
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 7 deletions.
4 changes: 2 additions & 2 deletions packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import TabItem from '@theme/TabItem';
Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
```
# from your worker container
yarn command:prod cron:messaging:gmail-messages-import
yarn command:prod cron:messaging:gmail-message-list-fetch
yarn command:prod cron:messaging:messages-import
yarn command:prod cron:messaging:message-list-fetch
```

# Setup Environment Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class GoogleAPIRefreshAccessTokenService {
}

await this.messagingTelemetryService.track({
eventName: `refresh-token.error.insufficient_permissions`,
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const MESSAGING_THROTTLE_DURATION = 1000 * 60 * 1; // 1 minute
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const MESSAGING_THROTTLE_MAX_ATTEMPTS = 4;
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,39 @@ export class MessageChannelRepository {
transactionManager,
);
}

public async updateThrottlePauseUntilAndIncrementThrottleFailureCount(
id: string,
throttleDurationMs: number,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $2`,
[throttleDurationMs, id],
workspaceId,
transactionManager,
);
}

public async resetThrottlePauseUntilAndThrottleFailureCount(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts';

type SyncStep =
| 'partial-message-list-fetch'
Expand All @@ -27,6 +30,8 @@ export class MessagingErrorHandlingService {
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}

public async handleGmailError(
Expand Down Expand Up @@ -100,7 +105,7 @@ export class MessagingErrorHandlingService {
}
}

public async handleRateLimitExceeded(
private async handleRateLimitExceeded(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
Expand All @@ -114,6 +119,19 @@ export class MessagingErrorHandlingService {
message: `${error.code}: ${error.reason}`,
});

if (
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
) {
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);

return;
}

await this.throttle(messageChannel, workspaceId);

switch (syncStep) {
case 'full-message-list-fetch':
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
Expand Down Expand Up @@ -141,7 +159,7 @@ export class MessagingErrorHandlingService {
}
}

public async handleInsufficientPermissions(
private async handleInsufficientPermissions(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
Expand All @@ -166,7 +184,7 @@ export class MessagingErrorHandlingService {
);
}

public async handleNotFound(
private async handleNotFound(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
Expand All @@ -189,4 +207,27 @@ export class MessagingErrorHandlingService {
workspaceId,
);
}

private async throttle(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
const throttleDuration =
MESSAGING_THROTTLE_DURATION *
Math.pow(2, messageChannel.throttleFailureCount);

await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount(
messageChannel.id,
throttleDuration,
workspaceId,
);

await this.messagingTelemetryService.track({
eventName: 'message_channel.throttle',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `Throttling for ${throttleDuration}ms`,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [
'promotions',
'social',
'forums',
'updates',
];
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ export class MessagingGmailFullMessageListFetchService {
return;
}

await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
messageChannel.id,
workspaceId,
);

await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messa
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';

@Injectable()
export class MessagingGmailMessagesImportService {
Expand All @@ -39,6 +40,8 @@ export class MessagingGmailMessagesImportService {
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}

async processMessageBatchImport(
Expand Down Expand Up @@ -134,6 +137,11 @@ export class MessagingGmailMessagesImportService {
);
}

await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
messageChannel.id,
workspaceId,
);

return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ export class MessagingGmailPartialMessageListFetchService {
return;
}

await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
messageChannel.id,
workspaceId,
);

if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ export class MessagingMessageListFetchJob
return;
}

if (
messageChannel.throttlePauseUntil &&
messageChannel.throttlePauseUntil > new Date()
) {
return;
}

switch (messageChannel.syncSubStatus) {
case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
Expand All @@ -85,6 +92,7 @@ export class MessagingMessageListFetchJob
eventName: 'partial_message_list_fetch.started',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});

await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
Expand All @@ -111,6 +119,7 @@ export class MessagingMessageListFetchJob
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});

await this.gmailFullMessageListFetchService.processMessageListFetch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export class MessagingMessagesImportJob
messageChannelId: messageChannel.id,
});

if (
messageChannel.throttlePauseUntil &&
messageChannel.throttlePauseUntil > new Date()
) {
continue;
}

const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const filterOutIcsAttachments = (messages: GmailMessage[]) => {

const isPersonEmail = (email: string): boolean => {
const nonPersonalPattern =
/noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@)/;
/noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@|notifications@|notification@|news@)/;

return !nonPersonalPattern.test(email);
};
Expand Down

0 comments on commit 3f9f2c3

Please sign in to comment.