From 1eeeae8564ce97b662cc7fda1ce785de8237363e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bosi?= <71827178+bosiraphael@users.noreply.github.com> Date: Thu, 22 Aug 2024 18:23:05 +0200 Subject: [PATCH] 6686 Add try catch on every cron job, and send exception to exceptionHandler (#6705) Closes #6686 --------- Co-authored-by: Charles Bochet --- .../calendar-event-list-fetch.cron.job.ts | 50 +++++++++++-------- .../messaging-message-list-fetch.cron.job.ts | 40 +++++++++------ .../messaging-messages-import.cron.job.ts | 45 ++++++++++------- .../jobs/messaging-ongoing-stale.cron.job.ts | 22 +++++--- 4 files changed, 99 insertions(+), 58 deletions(-) diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts index 56f9af21099f..9d5f88f421f0 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts @@ -6,6 +6,7 @@ import { Workspace, WorkspaceActivationStatus, } from 'src/engine/core-modules/workspace/workspace.entity'; +import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; @@ -28,6 +29,7 @@ export class CalendarEventListFetchCronJob { @InjectMessageQueue(MessageQueue.calendarQueue) private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @Process(CalendarEventListFetchCronJob.name) @@ -41,30 +43,38 @@ export class CalendarEventListFetchCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const calendarChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'calendarChannel', - ); + try { + const calendarChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'calendarChannel', + ); - const calendarChannels = await calendarChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: Any([ - CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, - CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, - ]), - }, - }); + const calendarChannels = await calendarChannelRepository.find({ + where: { + isSyncEnabled: true, + syncStage: Any([ + CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, + CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, + ]), + }, + }); - for (const calendarChannel of calendarChannels) { - await this.messageQueueService.add( - CalendarEventListFetchJob.name, - { - calendarChannelId: calendarChannel.id, + for (const calendarChannel of calendarChannels) { + await this.messageQueueService.add( + CalendarEventListFetchJob.name, + { + calendarChannelId: calendarChannel.id, + workspaceId: activeWorkspace.id, + }, + ); + } + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + user: { workspaceId: activeWorkspace.id, }, - ); + }); } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 2f464188209d..43ca1ae757a0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -1,11 +1,12 @@ import { InjectRepository } from '@nestjs/typeorm'; -import { Repository } from 'typeorm'; +import { In, Repository } from 'typeorm'; import { Workspace, WorkspaceActivationStatus, } from 'src/engine/core-modules/workspace/workspace.entity'; +import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; @@ -29,6 +30,7 @@ export class MessagingMessageListFetchCronJob { @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @Process(MessagingMessageListFetchCronJob.name) @@ -42,22 +44,24 @@ export class MessagingMessageListFetchCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'messageChannel', - ); + try { + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'messageChannel', + ); - const messageChannels = await messageChannelRepository.find(); + const messageChannels = await messageChannelRepository.find({ + where: { + isSyncEnabled: true, + syncStage: In([ + MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + ]), + }, + }); - for (const messageChannel of messageChannels) { - if ( - (messageChannel.isSyncEnabled && - messageChannel.syncStage === - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING) || - messageChannel.syncStage === - MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING - ) { + for (const messageChannel of messageChannels) { await this.messageQueueService.add( MessagingMessageListFetchJob.name, { @@ -66,6 +70,12 @@ export class MessagingMessageListFetchCronJob { }, ); } + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + user: { + workspaceId: activeWorkspace.id, + }, + }); } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index a90c09f89d08..f22783294595 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -6,6 +6,7 @@ import { Workspace, WorkspaceActivationStatus, } from 'src/engine/core-modules/workspace/workspace.entity'; +import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; @@ -29,11 +30,13 @@ export class MessagingMessagesImportCronJob { @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @Process(MessagingMessagesImportCronJob.name) async handle(): Promise { console.time('MessagingMessagesImportCronJob time'); + const activeWorkspaces = await this.workspaceRepository.find({ where: { activationStatus: WorkspaceActivationStatus.ACTIVE, @@ -41,27 +44,35 @@ export class MessagingMessagesImportCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'messageChannel', - ); + try { + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'messageChannel', + ); - const messageChannels = await messageChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, - }, - }); + const messageChannels = await messageChannelRepository.find({ + where: { + isSyncEnabled: true, + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, + }, + }); - for (const messageChannel of messageChannels) { - await this.messageQueueService.add( - MessagingMessagesImportJob.name, - { + for (const messageChannel of messageChannels) { + await this.messageQueueService.add( + MessagingMessagesImportJob.name, + { + workspaceId: activeWorkspace.id, + messageChannelId: messageChannel.id, + }, + ); + } + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + user: { workspaceId: activeWorkspace.id, - messageChannelId: messageChannel.id, }, - ); + }); } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts index d2c3218b0bdc..08ca244cc202 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job.ts @@ -6,6 +6,7 @@ import { Workspace, WorkspaceActivationStatus, } from 'src/engine/core-modules/workspace/workspace.entity'; +import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; @@ -23,6 +24,7 @@ export class MessagingOngoingStaleCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @Process(MessagingOngoingStaleCronJob.name) @@ -34,12 +36,20 @@ export class MessagingOngoingStaleCronJob { }); for (const activeWorkspace of activeWorkspaces) { - await this.messageQueueService.add( - MessagingOngoingStaleJob.name, - { - workspaceId: activeWorkspace.id, - }, - ); + try { + await this.messageQueueService.add( + MessagingOngoingStaleJob.name, + { + workspaceId: activeWorkspace.id, + }, + ); + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + user: { + workspaceId: activeWorkspace.id, + }, + }); + } } } }