-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
5615 create messageongoingstalecron (#6005)
Closes #5615
- Loading branch information
1 parent
f8c057d
commit 4dfca45
Showing
7 changed files
with
225 additions
and
1 deletion.
There are no files selected for viewing
32 changes: 32 additions & 0 deletions
32
...s/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import { Command, CommandRunner } from 'nest-commander'; | ||
|
||
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; | ||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; | ||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; | ||
import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job'; | ||
|
||
const MESSAGING_ONGOING_STALE_CRON_PATTERN = '0 * * * *'; | ||
|
||
@Command({ | ||
name: 'cron:messaging:ongoing-stale', | ||
description: | ||
'Starts a cron job to check for stale ongoing message imports and put them back to pending', | ||
}) | ||
export class MessagingOngoingStaleCronCommand extends CommandRunner { | ||
constructor( | ||
@InjectMessageQueue(MessageQueue.cronQueue) | ||
private readonly messageQueueService: MessageQueueService, | ||
) { | ||
super(); | ||
} | ||
|
||
async run(): Promise<void> { | ||
await this.messageQueueService.addCron<undefined>( | ||
MessagingOngoingStaleCronJob.name, | ||
undefined, | ||
{ | ||
repeat: { pattern: MESSAGING_ONGOING_STALE_CRON_PATTERN }, | ||
}, | ||
); | ||
} | ||
} |
2 changes: 1 addition & 1 deletion
2
...modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
...c/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import { InjectRepository } from '@nestjs/typeorm'; | ||
|
||
import { Repository, In } from 'typeorm'; | ||
|
||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; | ||
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; | ||
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; | ||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; | ||
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; | ||
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; | ||
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; | ||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; | ||
import { | ||
MessagingOngoingStaleJobData, | ||
MessagingOngoingStaleJob, | ||
} from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; | ||
|
||
@Processor(MessageQueue.cronQueue) | ||
export class MessagingOngoingStaleCronJob { | ||
constructor( | ||
@InjectRepository(Workspace, 'core') | ||
private readonly workspaceRepository: Repository<Workspace>, | ||
@InjectRepository(DataSourceEntity, 'metadata') | ||
private readonly dataSourceRepository: Repository<DataSourceEntity>, | ||
private readonly environmentService: EnvironmentService, | ||
@InjectMessageQueue(MessageQueue.messagingQueue) | ||
private readonly messageQueueService: MessageQueueService, | ||
) {} | ||
|
||
@Process(MessagingOngoingStaleCronJob.name) | ||
async handle(): Promise<void> { | ||
const workspaceIds = ( | ||
await this.workspaceRepository.find({ | ||
where: this.environmentService.get('IS_BILLING_ENABLED') | ||
? { | ||
subscriptionStatus: In(['active', 'trialing', 'past_due']), | ||
} | ||
: {}, | ||
select: ['id'], | ||
}) | ||
).map((workspace) => workspace.id); | ||
|
||
const dataSources = await this.dataSourceRepository.find({ | ||
where: { | ||
workspaceId: In(workspaceIds), | ||
}, | ||
}); | ||
|
||
const workspaceIdsWithDataSources = new Set( | ||
dataSources.map((dataSource) => dataSource.workspaceId), | ||
); | ||
|
||
for (const workspaceId of workspaceIdsWithDataSources) { | ||
await this.messageQueueService.add<MessagingOngoingStaleJobData>( | ||
MessagingOngoingStaleJob.name, | ||
{ | ||
workspaceId, | ||
}, | ||
); | ||
} | ||
} | ||
} |
74 changes: 74 additions & 0 deletions
74
...y-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import { Logger, Scope } from '@nestjs/common'; | ||
|
||
import { In } from 'typeorm'; | ||
|
||
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; | ||
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; | ||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; | ||
import { | ||
MessageChannelSyncStage, | ||
MessageChannelWorkspaceEntity, | ||
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; | ||
import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util'; | ||
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; | ||
import { InjectWorkspaceRepository } from 'src/engine/twenty-orm/decorators/inject-workspace-repository.decorator'; | ||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; | ||
|
||
export type MessagingOngoingStaleJobData = { | ||
workspaceId: string; | ||
}; | ||
|
||
@Processor({ | ||
queueName: MessageQueue.messagingQueue, | ||
scope: Scope.REQUEST, | ||
}) | ||
export class MessagingOngoingStaleJob { | ||
private readonly logger = new Logger(MessagingOngoingStaleJob.name); | ||
constructor( | ||
@InjectWorkspaceRepository(MessageChannelWorkspaceEntity) | ||
private readonly messageChannelRepository: WorkspaceRepository<MessageChannelWorkspaceEntity>, | ||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, | ||
) {} | ||
|
||
@Process(MessagingOngoingStaleJob.name) | ||
async handle(data: MessagingOngoingStaleJobData): Promise<void> { | ||
const { workspaceId } = data; | ||
|
||
const messageChannels = await this.messageChannelRepository.find({ | ||
where: { | ||
syncStage: In([ | ||
MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, | ||
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, | ||
]), | ||
}, | ||
}); | ||
|
||
for (const messageChannel of messageChannels) { | ||
if ( | ||
messageChannel.syncStageStartedAt && | ||
isSyncStale(messageChannel.syncStageStartedAt) | ||
) { | ||
this.logger.log( | ||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`, | ||
); | ||
|
||
switch (messageChannel.syncStage) { | ||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: | ||
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( | ||
messageChannel.id, | ||
workspaceId, | ||
); | ||
break; | ||
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING: | ||
await this.messagingChannelSyncStatusService.scheduleMessagesImport( | ||
messageChannel.id, | ||
workspaceId, | ||
); | ||
break; | ||
default: | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
...r/src/modules/messaging/message-import-manager/utils/__tests__/is-sync-stale.util.spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant'; | ||
import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util'; | ||
|
||
jest.useFakeTimers().setSystemTime(new Date('2024-01-01')); | ||
|
||
describe('isSyncStale', () => { | ||
it('should return true if sync is stale', () => { | ||
const syncStageStartedAt = new Date( | ||
Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT - 1, | ||
).toISOString(); | ||
|
||
const result = isSyncStale(syncStageStartedAt); | ||
|
||
expect(result).toBe(true); | ||
}); | ||
|
||
it('should return false if sync is not stale', () => { | ||
const syncStageStartedAt = new Date( | ||
Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT + 1, | ||
).toISOString(); | ||
|
||
const result = isSyncStale(syncStageStartedAt); | ||
|
||
expect(result).toBe(false); | ||
}); | ||
|
||
it('should return false if syncStageStartedAt is invalid', () => { | ||
const syncStageStartedAt = 'invalid-date'; | ||
|
||
expect(() => { | ||
isSyncStale(syncStageStartedAt); | ||
}).toThrow('Invalid date format'); | ||
}); | ||
}); |
13 changes: 13 additions & 0 deletions
13
...es/twenty-server/src/modules/messaging/message-import-manager/utils/is-sync-stale.util.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant'; | ||
|
||
export const isSyncStale = (syncStageStartedAt: string): boolean => { | ||
const syncStageStartedTime = new Date(syncStageStartedAt).getTime(); | ||
|
||
if (isNaN(syncStageStartedTime)) { | ||
throw new Error('Invalid date format'); | ||
} | ||
|
||
return ( | ||
Date.now() - syncStageStartedTime > MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT | ||
); | ||
}; |