Skip to content

Commit e6a55f2

Browse files
5617 Create CalendarOngoingStaleCron Job (#6748)
Closes #5617 --------- Co-authored-by: Charles Bochet <[email protected]>
1 parent e771793 commit e6a55f2

File tree

9 files changed

+214
-0
lines changed

9 files changed

+214
-0
lines changed

packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts

+6
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
1010
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
1111
import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module';
1212
import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command';
13+
import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command';
1314
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
15+
import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job';
1416
import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module';
1517
import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
18+
import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job';
1619
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service';
1720
import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
1821
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
@@ -51,6 +54,9 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
5154
CalendarEventListFetchCronJob,
5255
CalendarEventListFetchCronCommand,
5356
CalendarEventListFetchJob,
57+
CalendarOngoingStaleCronJob,
58+
CalendarOngoingStaleCronCommand,
59+
CalendarOngoingStaleJob,
5460
],
5561
exports: [CalendarEventsImportService],
5662
})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { Command, CommandRunner } from 'nest-commander';
2+
3+
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
4+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
5+
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
6+
import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job';
7+
8+
const CALENDAR_ONGOING_STALE_CRON_PATTERN = '0 * * * *';
9+
10+
@Command({
11+
name: 'cron:calendar:ongoing-stale',
12+
description:
13+
'Starts a cron job to check for stale ongoing calendar event imports and put them back to pending',
14+
})
15+
export class CalendarOngoingStaleCronCommand extends CommandRunner {
16+
constructor(
17+
@InjectMessageQueue(MessageQueue.cronQueue)
18+
private readonly messageQueueService: MessageQueueService,
19+
) {
20+
super();
21+
}
22+
23+
async run(): Promise<void> {
24+
await this.messageQueueService.addCron<undefined>(
25+
CalendarOngoingStaleCronJob.name,
26+
undefined,
27+
{
28+
repeat: { pattern: CALENDAR_ONGOING_STALE_CRON_PATTERN },
29+
},
30+
);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { InjectRepository } from '@nestjs/typeorm';
2+
3+
import { Repository } from 'typeorm';
4+
5+
import {
6+
Workspace,
7+
WorkspaceActivationStatus,
8+
} from 'src/engine/core-modules/workspace/workspace.entity';
9+
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
10+
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
11+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
12+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
13+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
14+
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
15+
import {
16+
CalendarOngoingStaleJob,
17+
CalendarOngoingStaleJobData,
18+
} from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job';
19+
20+
@Processor(MessageQueue.cronQueue)
21+
export class CalendarOngoingStaleCronJob {
22+
constructor(
23+
@InjectRepository(Workspace, 'core')
24+
private readonly workspaceRepository: Repository<Workspace>,
25+
@InjectMessageQueue(MessageQueue.calendarQueue)
26+
private readonly messageQueueService: MessageQueueService,
27+
private readonly exceptionHandlerService: ExceptionHandlerService,
28+
) {}
29+
30+
@Process(CalendarOngoingStaleCronJob.name)
31+
async handle(): Promise<void> {
32+
const activeWorkspaces = await this.workspaceRepository.find({
33+
where: {
34+
activationStatus: WorkspaceActivationStatus.ACTIVE,
35+
},
36+
});
37+
38+
for (const activeWorkspace of activeWorkspaces) {
39+
try {
40+
await this.messageQueueService.add<CalendarOngoingStaleJobData>(
41+
CalendarOngoingStaleJob.name,
42+
{
43+
workspaceId: activeWorkspace.id,
44+
},
45+
);
46+
} catch (error) {
47+
this.exceptionHandlerService.captureExceptions([error], {
48+
user: {
49+
workspaceId: activeWorkspace.id,
50+
},
51+
});
52+
}
53+
}
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { Logger, Scope } from '@nestjs/common';
2+
3+
import { In } from 'typeorm';
4+
5+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
6+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
7+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
8+
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
9+
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service';
10+
import { isSyncStale } from 'src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util';
11+
import {
12+
CalendarChannelSyncStage,
13+
CalendarChannelWorkspaceEntity,
14+
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
15+
16+
export type CalendarOngoingStaleJobData = {
17+
workspaceId: string;
18+
};
19+
20+
@Processor({
21+
queueName: MessageQueue.messagingQueue,
22+
scope: Scope.REQUEST,
23+
})
24+
export class CalendarOngoingStaleJob {
25+
private readonly logger = new Logger(CalendarOngoingStaleJob.name);
26+
constructor(
27+
private readonly twentyORMManager: TwentyORMManager,
28+
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
29+
) {}
30+
31+
@Process(CalendarOngoingStaleJob.name)
32+
async handle(data: CalendarOngoingStaleJobData): Promise<void> {
33+
const { workspaceId } = data;
34+
35+
const calendarChannelRepository =
36+
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
37+
'calendarChannel',
38+
);
39+
40+
const calendarChannels = await calendarChannelRepository.find({
41+
where: {
42+
syncStage: In([
43+
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING,
44+
CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING,
45+
]),
46+
},
47+
});
48+
49+
for (const calendarChannel of calendarChannels) {
50+
if (
51+
calendarChannel.syncStageStartedAt &&
52+
isSyncStale(calendarChannel.syncStageStartedAt)
53+
) {
54+
this.logger.log(
55+
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to pending`,
56+
);
57+
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt(
58+
calendarChannel.id,
59+
);
60+
61+
switch (calendarChannel.syncStage) {
62+
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING:
63+
await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch(
64+
calendarChannel.id,
65+
);
66+
break;
67+
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING:
68+
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
69+
calendarChannel.id,
70+
);
71+
break;
72+
default:
73+
break;
74+
}
75+
}
76+
}
77+
}
78+
}

packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts

+11
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ export class CalendarChannelSyncStatusService {
8787
await this.scheduleFullCalendarEventListFetch(calendarChannelId);
8888
}
8989

90+
public async resetSyncStageStartedAt(calendarChannelId: string) {
91+
const calendarChannelRepository =
92+
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
93+
'calendarChannel',
94+
);
95+
96+
await calendarChannelRepository.update(calendarChannelId, {
97+
syncStageStartedAt: null,
98+
});
99+
}
100+
90101
public async scheduleCalendarEventsImport(calendarChannelId: string) {
91102
const calendarChannelRepository =
92103
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant';
2+
3+
export const isSyncStale = (syncStageStartedAt: string): boolean => {
4+
const syncStageStartedTime = new Date(syncStageStartedAt).getTime();
5+
6+
if (isNaN(syncStageStartedTime)) {
7+
throw new Error('Invalid date format');
8+
}
9+
10+
return (
11+
Date.now() - syncStageStartedTime > CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT
12+
);
13+
};

packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts

+14
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,20 @@ export class MessageChannelSyncStatusService {
9393
await this.scheduleFullMessageListFetch(messageChannelId);
9494
}
9595

96+
public async resetSyncStageStartedAt(messageChannelId: string) {
97+
const messageChannelRepository =
98+
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
99+
'messageChannel',
100+
);
101+
102+
await messageChannelRepository.update(
103+
{ id: messageChannelId },
104+
{
105+
syncStageStartedAt: null,
106+
},
107+
);
108+
}
109+
96110
public async markAsMessagesListFetchOngoing(messageChannelId: string) {
97111
const messageChannelRepository =
98112
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(

packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ export class MessagingOngoingStaleJob {
5555
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
5656
);
5757

58+
await this.messagingChannelSyncStatusService.resetSyncStageStartedAt(
59+
messageChannel.id,
60+
);
61+
5862
switch (messageChannel.syncStage) {
5963
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
6064
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(

0 commit comments

Comments
 (0)