diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command.ts new file mode 100644 index 000000000000..ed77d79555b8 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command.ts @@ -0,0 +1,32 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job'; + +const MESSAGING_ONGOING_STALE_CRON_PATTERN = '0 * * * *'; + +@Command({ + name: 'cron:messaging:ongoing-stale', + description: + 'Starts a cron job to check for stale ongoing message imports and put them back to pending', +}) +export class MessagingOngoingStaleCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + MessagingOngoingStaleCronJob.name, + undefined, + { + repeat: { pattern: MESSAGING_ONGOING_STALE_CRON_PATTERN }, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index 3c4352a92153..53c9d86d90ba 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts new file mode 100644 index 000000000000..b9666c5ec567 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts @@ -0,0 +1,62 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository, In } from 'typeorm'; + +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; +import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { + MessagingOngoingStaleJobData, + MessagingOngoingStaleJob, +} from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; + +@Processor(MessageQueue.cronQueue) +export class MessagingOngoingStaleCronJob { + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectRepository(DataSourceEntity, 'metadata') + private readonly dataSourceRepository: Repository, + private readonly environmentService: EnvironmentService, + @InjectMessageQueue(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + @Process(MessagingOngoingStaleCronJob.name) + async handle(): Promise { + const workspaceIds = ( + await this.workspaceRepository.find({ + where: this.environmentService.get('IS_BILLING_ENABLED') + ? { + subscriptionStatus: In(['active', 'trialing', 'past_due']), + } + : {}, + select: ['id'], + }) + ).map((workspace) => workspace.id); + + const dataSources = await this.dataSourceRepository.find({ + where: { + workspaceId: In(workspaceIds), + }, + }); + + const workspaceIdsWithDataSources = new Set( + dataSources.map((dataSource) => dataSource.workspaceId), + ); + + for (const workspaceId of workspaceIdsWithDataSources) { + await this.messageQueueService.add( + MessagingOngoingStaleJob.name, + { + workspaceId, + }, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts new file mode 100644 index 000000000000..0eb3c2aa12f8 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -0,0 +1,74 @@ +import { Logger, Scope } from '@nestjs/common'; + +import { In } from 'typeorm'; + +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; +import { InjectWorkspaceRepository } from 'src/engine/twenty-orm/decorators/inject-workspace-repository.decorator'; +import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; + +export type MessagingOngoingStaleJobData = { + workspaceId: string; +}; + +@Processor({ + queueName: MessageQueue.messagingQueue, + scope: Scope.REQUEST, +}) +export class MessagingOngoingStaleJob { + private readonly logger = new Logger(MessagingOngoingStaleJob.name); + constructor( + @InjectWorkspaceRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: WorkspaceRepository, + private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, + ) {} + + @Process(MessagingOngoingStaleJob.name) + async handle(data: MessagingOngoingStaleJobData): Promise { + const { workspaceId } = data; + + const messageChannels = await this.messageChannelRepository.find({ + where: { + syncStage: In([ + MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, + MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, + ]), + }, + }); + + for (const messageChannel of messageChannels) { + if ( + messageChannel.syncStageStartedAt && + isSyncStale(messageChannel.syncStageStartedAt) + ) { + this.logger.log( + `Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`, + ); + + switch (messageChannel.syncStage) { + case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: + await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( + messageChannel.id, + workspaceId, + ); + break; + case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING: + await this.messagingChannelSyncStatusService.scheduleMessagesImport( + messageChannel.id, + workspaceId, + ); + break; + default: + break; + } + } + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index 38d0e8db58cb..87cce001913c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -3,16 +3,21 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command'; import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command'; import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command'; +import { MessagingOngoingStaleCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command'; import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job'; import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job'; +import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job'; import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module'; import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job'; import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; +import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; @Module({ imports: [ @@ -20,15 +25,19 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import MessagingCommonModule, TypeOrmModule.forFeature([Workspace], 'core'), TypeOrmModule.forFeature([DataSourceEntity], 'metadata'), + TwentyORMModule.forFeature([MessageChannelWorkspaceEntity]), ], providers: [ MessagingMessageListFetchCronCommand, MessagingMessagesImportCronCommand, + MessagingOngoingStaleCronCommand, MessagingSingleMessageImportCommand, MessagingMessageListFetchJob, MessagingMessagesImportJob, + MessagingOngoingStaleJob, MessagingMessageListFetchCronJob, MessagingMessagesImportCronJob, + MessagingOngoingStaleCronJob, MessagingAddSingleMessageToCacheForImportJob, ], exports: [], diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/__tests__/is-sync-stale.util.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/__tests__/is-sync-stale.util.spec.ts new file mode 100644 index 000000000000..9935eb610812 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/__tests__/is-sync-stale.util.spec.ts @@ -0,0 +1,34 @@ +import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant'; +import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util'; + +jest.useFakeTimers().setSystemTime(new Date('2024-01-01')); + +describe('isSyncStale', () => { + it('should return true if sync is stale', () => { + const syncStageStartedAt = new Date( + Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT - 1, + ).toISOString(); + + const result = isSyncStale(syncStageStartedAt); + + expect(result).toBe(true); + }); + + it('should return false if sync is not stale', () => { + const syncStageStartedAt = new Date( + Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT + 1, + ).toISOString(); + + const result = isSyncStale(syncStageStartedAt); + + expect(result).toBe(false); + }); + + it('should return false if syncStageStartedAt is invalid', () => { + const syncStageStartedAt = 'invalid-date'; + + expect(() => { + isSyncStale(syncStageStartedAt); + }).toThrow('Invalid date format'); + }); +}); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/is-sync-stale.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/is-sync-stale.util.ts new file mode 100644 index 000000000000..7a1ae5e2029f --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/is-sync-stale.util.ts @@ -0,0 +1,13 @@ +import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant'; + +export const isSyncStale = (syncStageStartedAt: string): boolean => { + const syncStageStartedTime = new Date(syncStageStartedAt).getTime(); + + if (isNaN(syncStageStartedTime)) { + throw new Error('Invalid date format'); + } + + return ( + Date.now() - syncStageStartedTime > MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT + ); +};