Skip to content

Commit

Permalink
Refactor sync sub status and throttle (#5734)
Browse files Browse the repository at this point in the history
- Rename syncSubStatus to syncStage
- Rename ongoingSyncStartedAt to syncStageStartedAt
- Remove throttlePauseUntil from db and compute it with
syncStageStartedAt and throttleFailureCount
  • Loading branch information
bosiraphael authored Jun 4, 2024
1 parent ce1469c commit 234e062
Show file tree
Hide file tree
Showing 14 changed files with 148 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6903,9 +6903,9 @@ export const mockedStandardObjectMetadataQueryResult: ObjectMetadataItemsQuery =
options: null,
id: '24147b01-4394-4aee-92a4-5f6b5073704f',
type: 'DATE_TIME',
name: 'ongoingSyncStartedAt',
label: 'Ongoing sync started at',
description: 'Ongoing sync started at',
name: 'syncStageStartedAt',
label: 'Sync stage started at',
description: 'Sync stage started at',
icon: 'IconHistory',
isCustom: false,
isActive: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EntityManager } from 'typeorm';

import { DEV_SEED_CONNECTED_ACCOUNT_IDS } from 'src/database/typeorm-seeds/workspace/connected-account';
import { MessageChannelSyncSubStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';

const tableName = 'messageChannel';

Expand All @@ -28,7 +28,7 @@ export const seedMessageChannel = async (
'connectedAccountId',
'handle',
'visibility',
'syncSubStatus',
'syncStage',
])
.orIgnore()
.values([
Expand All @@ -42,8 +42,7 @@ export const seedMessageChannel = async (
connectedAccountId: DEV_SEED_CONNECTED_ACCOUNT_IDS.TIM,
handle: '[email protected]',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.JONY,
Expand All @@ -55,8 +54,7 @@ export const seedMessageChannel = async (
connectedAccountId: DEV_SEED_CONNECTED_ACCOUNT_IDS.JONY,
handle: '[email protected]',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.PHIL,
Expand All @@ -68,8 +66,7 @@ export const seedMessageChannel = async (
connectedAccountId: DEV_SEED_CONNECTED_ACCOUNT_IDS.PHIL,
handle: '[email protected]',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
},
])
.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export const CALENDAR_CHANNEL_STANDARD_FIELD_IDS = {
isSyncEnabled: '20202020-fe19-4818-8854-21f7b1b43395',
syncCursor: '20202020-bac2-4852-a5cb-7a7898992b70',
calendarChannelEventAssociations: '20202020-afb0-4a9f-979f-2d5087d71d09',
throttlePauseUntil: '20202020-16e8-40ca-be79-a3af4787af2c',
throttleFailureCount: '20202020-525c-4b76-b9bd-0dd57fd11d61',
};

Expand Down Expand Up @@ -208,9 +207,8 @@ export const MESSAGE_CHANNEL_STANDARD_FIELD_IDS = {
syncCursor: '20202020-79d1-41cf-b738-bcf5ed61e256',
syncedAt: '20202020-263d-4c6b-ad51-137ada56f7d4',
syncStatus: '20202020-56a1-4f7e-9880-a8493bb899cc',
syncSubStatus: '20202020-7979-4b08-89fe-99cb5e698767',
ongoingSyncStartedAt: '20202020-8c61-4a42-ae63-73c1c3c52e06',
throttlePauseUntil: '20202020-a8cb-475b-868c-b83538614df4',
syncStage: '20202020-7979-4b08-89fe-99cb5e698767',
syncStageStartedAt: '20202020-8c61-4a42-ae63-73c1c3c52e06',
throttleFailureCount: '20202020-0291-42be-9ad0-d578a51684ab',
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is
import { WorkspaceIsNotAuditLogged } from 'src/engine/twenty-orm/decorators/workspace-is-not-audit-logged.decorator';
import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator';
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace-is-nullable.decorator';

export enum CalendarChannelVisibility {
METADATA = 'METADATA',
Expand Down Expand Up @@ -97,16 +96,6 @@ export class CalendarChannelWorkspaceEntity extends BaseWorkspaceEntity {
})
syncCursor: string;

@WorkspaceField({
standardId: CALENDAR_CHANNEL_STANDARD_FIELD_IDS.throttlePauseUntil,
type: FieldMetadataType.DATE_TIME,
label: 'Throttle Pause Until',
description: 'Throttle Pause Until',
icon: 'IconPlayerPause',
})
@WorkspaceIsNullable()
throttlePauseUntil: Date;

@WorkspaceField({
standardId: CALENDAR_CHANNEL_STANDARD_FIELD_IDS.throttleFailureCount,
type: FieldMetadataType.NUMBER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metada
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncStatus,
MessageChannelSyncSubStatus,
MessageChannelSyncStage,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';

@Injectable()
Expand Down Expand Up @@ -51,7 +51,7 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "ongoingSyncStartedAt" = NULL
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "syncStageStartedAt" = NULL
WHERE "connectedAccountId" = $1`,
[connectedAccountId],
workspaceId,
Expand Down Expand Up @@ -169,37 +169,52 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);

const needsToUpdateSyncedAt =
syncStatus === MessageChannelSyncStatus.SUCCEEDED;

const needsToUpdateOngoingSyncStartedAt =
syncStatus === MessageChannelSyncStatus.ONGOING;
syncStatus === MessageChannelSyncStatus.COMPLETED;

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${
needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : ''
} ${
needsToUpdateOngoingSyncStartedAt
? `, "ongoingSyncStartedAt" = NOW()`
: `, "ongoingSyncStartedAt" = NULL`
} WHERE "id" = $2`,
[syncStatus, id],
workspaceId,
transactionManager,
);
}

public async updateSyncSubStatus(
public async updateSyncStage(
id: string,
syncSubStatus: MessageChannelSyncSubStatus,
syncStage: MessageChannelSyncStage,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

const needsToUpdateSyncStageStartedAt =
syncStage === MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING ||
syncStage === MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING;

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncSubStatus" = $1 WHERE "id" = $2`,
[syncSubStatus, id],
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStage" = $1 ${
needsToUpdateSyncStageStartedAt ? `, "syncStageStartedAt" = NOW()` : ''
} WHERE "id" = $2`,
[syncStage, id],
workspaceId,
transactionManager,
);
}

public async resetSyncStageStartedAt(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStageStartedAt" = NULL WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
Expand Down Expand Up @@ -241,25 +256,24 @@ export class MessageChannelRepository {
);
}

public async updateThrottlePauseUntilAndIncrementThrottleFailureCount(
public async incrementThrottleFailureCount(
id: string,
throttleDurationMs: number,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $2`,
[throttleDurationMs, id],
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}

public async resetThrottlePauseUntilAndThrottleFailureCount(
public async resetThrottleFailureCount(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
Expand All @@ -268,7 +282,7 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);

await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = 0
WHERE "id" = $1`,
[id],
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
MessageChannelSyncStage,
MessageChannelSyncStatus,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';

Expand All @@ -24,9 +24,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
);
}
Expand All @@ -35,9 +35,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
);
}
Expand All @@ -46,9 +46,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
workspaceId,
);
}
Expand All @@ -68,16 +68,26 @@ export class MessagingChannelSyncStatusService {
workspaceId,
);

await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannelId,
workspaceId,
);

await this.messageChannelRepository.resetThrottleFailureCount(
messageChannelId,
workspaceId,
);

await this.scheduleFullMessageListFetch(messageChannelId, workspaceId);
}

public async markAsMessagesListFetchOngoing(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
workspaceId,
);

Expand Down Expand Up @@ -105,9 +115,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
workspaceId,
);
}
Expand All @@ -120,9 +130,9 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);

await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
MessageChannelSyncStage.FAILED,
workspaceId,
);

Expand All @@ -141,9 +151,9 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);

await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
MessageChannelSyncStage.FAILED,
workspaceId,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { MessagingTelemetryService } from 'src/modules/messaging/common/services
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts';

type SyncStep =
Expand Down Expand Up @@ -212,13 +211,8 @@ export class MessagingErrorHandlingService {
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
const throttleDuration =
MESSAGING_THROTTLE_DURATION *
Math.pow(2, messageChannel.throttleFailureCount);

await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount(
await this.messageChannelRepository.incrementThrottleFailureCount(
messageChannel.id,
throttleDuration,
workspaceId,
);

Expand All @@ -227,7 +221,7 @@ export class MessagingErrorHandlingService {
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `Throttling for ${throttleDuration}ms`,
message: `Increment throttle failure count to ${messageChannel.throttleFailureCount}`,
});
}
}
Loading

0 comments on commit 234e062

Please sign in to comment.