From d99b9d1d6b11f7961fb4131ba66aa94447779530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20M?= Date: Mon, 17 Jun 2024 09:49:37 +0200 Subject: [PATCH] feat: Enhancements to MessageQueue Module with Decorators (#5657) ### Overview This PR introduces significant enhancements to the MessageQueue module by integrating `@Processor`, `@Process`, and `@InjectMessageQueue` decorators. These changes streamline the process of defining and managing queue processors and job handlers, and also allow for request-scoped handlers, improving compatibility with services that rely on scoped providers like TwentyORM repositories. ### Key Features 1. **Decorator-based Job Handling**: Use `@Processor` and `@Process` decorators to define job handlers declaratively. 2. **Request Scope Support**: Job handlers can be scoped per request, enhancing integration with request-scoped services. ### Usage #### Defining Processors and Job Handlers The `@Processor` decorator is used to define a class that processes jobs for a specific queue. The `@Process` decorator is applied to methods within this class to define specific job handlers. ##### Example 1: Specific Job Handlers ```typescript import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue'; @Processor('taskQueue') export class TaskProcessor { @Process('taskA') async handleTaskA(job: { id: string, data: any }) { console.log(`Handling task A with data:`, job.data); // Logic for task A } @Process('taskB') async handleTaskB(job: { id: string, data: any }) { console.log(`Handling task B with data:`, job.data); // Logic for task B } } ``` In the example above, `TaskProcessor` is responsible for processing jobs in the `taskQueue`. The `handleTaskA` method will only be called for jobs with the name `taskA`, while `handleTaskB` will be called for `taskB` jobs. ##### Example 2: General Job Handler ```typescript import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue'; @Processor('generalQueue') export class GeneralProcessor { @Process() async handleAnyJob(job: { id: string, name: string, data: any }) { console.log(`Handling job ${job.name} with data:`, job.data); // Logic for any job } } ``` In this example, `GeneralProcessor` handles all jobs in the `generalQueue`, regardless of the job name. The `handleAnyJob` method will be invoked for every job added to the `generalQueue`. #### Adding Jobs to a Queue You can use the `@InjectMessageQueue` decorator to inject a queue into a service and add jobs to it. ##### Example: ```typescript import { Injectable } from '@nestjs/common'; import { InjectMessageQueue, MessageQueue } from 'src/engine/integrations/message-queue'; @Injectable() export class TaskService { constructor( @InjectMessageQueue('taskQueue') private readonly taskQueue: MessageQueue, ) {} async addTaskA(data: any) { await this.taskQueue.add('taskA', data); } async addTaskB(data: any) { await this.taskQueue.add('taskB', data); } } ``` In this example, `TaskService` adds jobs to the `taskQueue`. The `addTaskA` and `addTaskB` methods add jobs named `taskA` and `taskB`, respectively, to the queue. #### Using Scoped Job Handlers To utilize request-scoped job handlers, specify the scope in the `@Processor` decorator. This is particularly useful for services that use scoped repositories like those in TwentyORM. ##### Example: ```typescript import { Processor, Process, InjectMessageQueue, Scope } from 'src/engine/integrations/message-queue'; @Processor({ name: 'scopedQueue', scope: Scope.REQUEST }) export class ScopedTaskProcessor { @Process('scopedTask') async handleScopedTask(job: { id: string, data: any }) { console.log(`Handling scoped task with data:`, job.data); // Logic for scoped task, which might use request-scoped services } } ``` Here, the `ScopedTaskProcessor` is associated with `scopedQueue` and operates with request scope. This setup is essential when the job handler relies on services that need to be instantiated per request, such as scoped repositories. ### Migration Notes - **Decorators**: Refactor job handlers to use `@Processor` and `@Process` decorators. - **Request Scope**: Utilize the scope option in `@Processor` if your job handlers depend on request-scoped services. Fix #5628 --------- Co-authored-by: Weiko --- packages/twenty-server/@types/express.d.ts | 1 + packages/twenty-server/package.json | 2 + packages/twenty-server/src/app.module.ts | 11 +- ...t-data-seed-demo-workspace.cron.command.ts | 5 +- ...p-data-seed-demo-workspace.cron.command.ts | 5 +- .../jobs/data-seed-demo-workspace.job.ts | 12 +- .../src/database/typeorm/typeorm.module.ts | 4 + .../0-20-record-position-backfill.command.ts | 5 +- .../jobs/call-webhook-jobs.job.ts | 15 +- .../jobs/call-webhook.job.ts | 11 +- .../jobs/record-position-backfill.job.ts | 14 +- .../jobs/workspace-query-runner-job.module.ts | 15 +- .../listeners/entity-events-to-db.listener.ts | 5 +- .../workspace-query-runner.service.ts | 4 +- .../auth/services/google-apis.service.ts | 7 +- .../billing/jobs/update-subscription.job.ts | 16 +- .../billing-workspace-member.listener.ts | 5 +- .../engine/core-modules/core-engine.module.ts | 4 - .../handle-workspace-member-deleted.job.ts | 15 +- .../workspace-workspace-member.listener.ts | 5 +- .../integrations/email/email-sender.job.ts | 12 +- .../integrations/email/email.service.ts | 5 +- .../integrations/integrations.module.ts | 2 +- .../decorators/message-queue.decorator.ts | 5 +- .../decorators/process.decorator.ts | 21 ++ .../decorators/processor.decorator.ts | 69 ++++++ .../message-queue/drivers/bullmq.driver.ts | 23 +- .../message-queue-driver.interface.ts | 3 +- .../message-queue/drivers/pg-boss.driver.ts | 29 ++- .../message-queue/drivers/sync.driver.ts | 65 +++--- .../message-queue/interfaces/index.ts | 2 +- .../interfaces/message-queue-job.interface.ts | 6 +- ...message-queue-module-options.interface.ts} | 9 - .../message-queue-worker-options.interface.ts | 3 + .../integrations/message-queue/jobs.module.ts | 24 +- .../message-queue-core.module.ts | 121 ++++++++++ .../message-queue-metadata.accessor.ts | 54 +++++ .../message-queue/message-queue.constants.ts | 6 +- .../message-queue/message-queue.explorer.ts | 209 ++++++++++++++++++ .../message-queue.module-definition.ts | 20 ++ .../message-queue/message-queue.module.ts | 74 ++----- .../services/message-queue.service.ts | 21 +- .../utils/get-queue-token.util.ts | 2 + ...l-hydrate-request-from-token.middleware.ts | 1 + .../scoped-workspace-datasource.factory.ts | 8 +- .../twenty-orm/twenty-orm-core.module.ts | 9 +- .../engine/twenty-orm/twenty-orm.module.ts | 3 +- .../clean-inactive-workspaces.command.ts | 5 +- ...-clean-inactive-workspaces.cron.command.ts | 5 +- ...-clean-inactive-workspaces.cron.command.ts | 5 +- .../crons/clean-inactive-workspace.job.ts | 14 +- ...lendar-messaging-participant-job.module.ts | 11 +- .../jobs/match-participant.job.ts | 14 +- .../jobs/unmatch-participant.job.ts | 14 +- .../listeners/participant-person.listener.ts | 5 +- .../participant-workspace-member.listener.ts | 5 +- .../google-calendar-sync.cron.command.ts | 5 +- .../crons/jobs/calendar-cron-job.module.ts | 7 +- .../jobs/google-calendar-sync.cron.job.ts | 11 +- ...ocklist-item-delete-calendar-events.job.ts | 14 +- .../blocklist-reimport-calendar-events.job.ts | 14 +- ...eate-company-and-contact-after-sync.job.ts | 14 +- .../calendar/jobs/calendar-job.module.ts | 25 +-- ...ed-account-associated-calendar-data.job.ts | 15 +- .../calendar/jobs/google-calendar-sync.job.ts | 14 +- .../listeners/calendar-blocklist.listener.ts | 5 +- .../listeners/calendar-channel.listener.ts | 5 +- .../google-calendar-sync.service.ts | 2 +- ...panies-and-contacts-creation-job.module.ts | 7 +- .../jobs/create-company-and-contact.job.ts | 14 +- .../messaging-message-channel.listener.ts | 5 +- ...ging-blocklist-item-delete-messages.job.ts | 14 +- .../listeners/messaging-blocklist.listener.ts | 13 +- ...es-and-enqueue-contact-creation.service.ts | 5 +- ...-connected-account-deletion-cleanup.job.ts | 14 +- ...sage-cleaner-connected-account.listener.ts | 8 +- .../messaging-message-cleaner.module.ts | 5 +- ...ssaging-message-list-fetch.cron.command.ts | 5 +- .../messaging-messages-import.cron.command.ts | 5 +- .../messaging-message-list-fetch.cron.job.ts | 16 +- .../messaging-messages-import.cron.job.ts | 18 +- .../jobs/messaging-message-list-fetch.job.ts | 14 +- .../jobs/messaging-messages-import.job.ts | 14 +- .../messaging-import-manager.module.ts | 20 +- ...eate-company-and-contact-after-sync.job.ts | 16 +- .../messaging-participants-manager.module.ts | 5 +- .../create-audit-log-from-internal-event.ts | 14 +- .../timeline/jobs/timeline-job.module.ts | 10 +- ...meline-activity-from-internal-event.job.ts | 14 +- .../src/queue-worker/queue-worker.module.ts | 11 +- .../src/queue-worker/queue-worker.ts | 32 --- yarn.lock | 18 ++ 92 files changed, 952 insertions(+), 521 deletions(-) create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/decorators/process.decorator.ts create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/decorators/processor.decorator.ts rename packages/twenty-server/src/engine/integrations/message-queue/interfaces/{message-queue.interface.ts => message-queue-module-options.interface.ts} (72%) create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface.ts create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/message-queue-core.module.ts create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/message-queue-metadata.accessor.ts create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/message-queue.module-definition.ts create mode 100644 packages/twenty-server/src/engine/integrations/message-queue/utils/get-queue-token.util.ts diff --git a/packages/twenty-server/@types/express.d.ts b/packages/twenty-server/@types/express.d.ts index 9f5ffd96da42..09c6bfbfef80 100644 --- a/packages/twenty-server/@types/express.d.ts +++ b/packages/twenty-server/@types/express.d.ts @@ -5,6 +5,7 @@ declare module 'express-serve-static-core' { interface Request { user?: User; workspace?: Workspace; + workspaceId?: string; cacheVersion?: string | null; } } diff --git a/packages/twenty-server/package.json b/packages/twenty-server/package.json index b8d7cc58f722..9884c04db4d8 100644 --- a/packages/twenty-server/package.json +++ b/packages/twenty-server/package.json @@ -26,6 +26,7 @@ "jsdom": "~22.1.0", "jwt-decode": "^4.0.0", "lodash.differencewith": "^4.5.0", + "lodash.omitby": "^4.6.0", "lodash.uniq": "^4.5.0", "lodash.uniqby": "^4.7.0", "passport": "^0.7.0", @@ -40,6 +41,7 @@ "@types/lodash.isequal": "^4.5.8", "@types/lodash.isobject": "^3.0.7", "@types/lodash.omit": "^4.5.9", + "@types/lodash.omitby": "^4.6.9", "@types/lodash.snakecase": "^4.1.7", "@types/lodash.uniq": "^4.5.9", "@types/lodash.uniqby": "^4.7.9", diff --git a/packages/twenty-server/src/app.module.ts b/packages/twenty-server/src/app.module.ts index 95e8b31a0199..914b5b295f6c 100644 --- a/packages/twenty-server/src/app.module.ts +++ b/packages/twenty-server/src/app.module.ts @@ -21,9 +21,11 @@ import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graph import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service'; import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module'; import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware'; +import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module'; +import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces'; -import { CoreEngineModule } from './engine/core-modules/core-engine.module'; import { IntegrationsModule } from './engine/integrations/integrations.module'; +import { CoreEngineModule } from './engine/core-modules/core-engine.module'; @Module({ imports: [ @@ -72,6 +74,13 @@ export class AppModule { ); } + // Messaque Queue explorer only for sync driver + // Maybe we don't need to conditionaly register the explorer, because we're creating a jobs module + // that will expose classes that are only used in the queue worker + if (process.env.MESSAGE_QUEUE_TYPE === MessageQueueDriverType.Sync) { + modules.push(MessageQueueModule.registerExplorer()); + } + return modules; } diff --git a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts index e9a8df6d35f2..bdf120095b23 100644 --- a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts +++ b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts @@ -1,9 +1,8 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; import { dataSeedDemoWorkspaceCronPattern } from 'src/database/commands/data-seed-demo-workspace/crons/data-seed-demo-workspace-cron-pattern'; import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; @@ -13,7 +12,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi }) export class StartDataSeedDemoWorkspaceCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/stop-data-seed-demo-workspace.cron.command.ts b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/stop-data-seed-demo-workspace.cron.command.ts index 84cd4a28df16..99dd1558bf7f 100644 --- a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/stop-data-seed-demo-workspace.cron.command.ts +++ b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/stop-data-seed-demo-workspace.cron.command.ts @@ -1,9 +1,8 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; import { dataSeedDemoWorkspaceCronPattern } from 'src/database/commands/data-seed-demo-workspace/crons/data-seed-demo-workspace-cron-pattern'; import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; @@ -13,7 +12,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi }) export class StopDataSeedDemoWorkspaceCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job.ts b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job.ts index e7b963d62008..14655209c009 100644 --- a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job.ts +++ b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job.ts @@ -1,15 +1,15 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { DataSeedDemoWorkspaceService } from 'src/database/commands/data-seed-demo-workspace/services/data-seed-demo-workspace.service'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -@Injectable() -export class DataSeedDemoWorkspaceJob implements MessageQueueJob { +@Processor(MessageQueue.cronQueue) +export class DataSeedDemoWorkspaceJob { constructor( private readonly dataSeedDemoWorkspaceService: DataSeedDemoWorkspaceService, ) {} + @Process(DataSeedDemoWorkspaceJob.name) async handle(): Promise { await this.dataSeedDemoWorkspaceService.seedDemo(); } diff --git a/packages/twenty-server/src/database/typeorm/typeorm.module.ts b/packages/twenty-server/src/database/typeorm/typeorm.module.ts index b24fe05bad94..bda843eae6f4 100644 --- a/packages/twenty-server/src/database/typeorm/typeorm.module.ts +++ b/packages/twenty-server/src/database/typeorm/typeorm.module.ts @@ -3,6 +3,7 @@ import { TypeOrmModule, TypeOrmModuleOptions } from '@nestjs/typeorm'; import { typeORMCoreModuleOptions } from 'src/database/typeorm/core/core.datasource'; import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { TypeORMService } from './typeorm.service'; @@ -28,6 +29,9 @@ const coreTypeORMFactory = async (): Promise => ({ useFactory: coreTypeORMFactory, name: 'core', }), + TwentyORMModule.register({ + workspaceEntities: ['dist/src/**/*.workspace-entity{.ts,.js}'], + }), EnvironmentModule, ], providers: [TypeORMService], diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts index b5709a895481..ddc2f06a4dbf 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts @@ -1,11 +1,10 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner, Option } from 'nest-commander'; import { RecordPositionBackfillJob, RecordPositionBackfillJobData, } from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; @@ -20,7 +19,7 @@ export type RecordPositionBackfillCommandOptions = { }) export class RecordPositionBackfillCommand extends CommandRunner { constructor( - @Inject(MessageQueue.recordPositionBackfillQueue) + @InjectMessageQueue(MessageQueue.recordPositionBackfillQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job.ts index e3b7e68a1982..818be2a60d65 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job.ts @@ -1,6 +1,5 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; @@ -11,6 +10,9 @@ import { CallWebhookJob, CallWebhookJobData, } from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export enum CallWebhookJobsJobOperation { create = 'create', @@ -25,19 +27,18 @@ export type CallWebhookJobsJobData = { operation: CallWebhookJobsJobOperation; }; -@Injectable() -export class CallWebhookJobsJob - implements MessageQueueJob -{ +@Processor(MessageQueue.webhookQueue) +export class CallWebhookJobsJob { private readonly logger = new Logger(CallWebhookJobsJob.name); constructor( private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly dataSourceService: DataSourceService, - @Inject(MessageQueue.webhookQueue) + @InjectMessageQueue(MessageQueue.webhookQueue) private readonly messageQueueService: MessageQueueService, ) {} + @Process(CallWebhookJobsJob.name) async handle(data: CallWebhookJobsJobData): Promise { const dataSourceMetadata = await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job.ts index 5ce75b717f4b..d18686e96aba 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job.ts @@ -1,7 +1,9 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type CallWebhookJobData = { targetUrl: string; @@ -13,12 +15,13 @@ export type CallWebhookJobData = { record: any; }; -@Injectable() -export class CallWebhookJob implements MessageQueueJob { +@Processor(MessageQueue.webhookQueue) +export class CallWebhookJob { private readonly logger = new Logger(CallWebhookJob.name); constructor(private readonly httpService: HttpService) {} + @Process(CallWebhookJob.name) async handle(data: CallWebhookJobData): Promise { try { await this.httpService.axiosRef.post(data.targetUrl, data); diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts index e0c589d9bdff..8619f1182814 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts @@ -1,22 +1,20 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type RecordPositionBackfillJobData = { workspaceId: string; dryRun: boolean; }; -@Injectable() -export class RecordPositionBackfillJob - implements MessageQueueJob -{ +@Processor(MessageQueue.recordPositionBackfillQueue) +export class RecordPositionBackfillJob { constructor( private readonly recordPositionBackfillService: RecordPositionBackfillService, ) {} + @Process(RecordPositionBackfillJob.name) async handle(data: RecordPositionBackfillJobData): Promise { this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun); } diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/workspace-query-runner-job.module.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/workspace-query-runner-job.module.ts index fc4b7c6d52cb..8104cf075cf3 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/workspace-query-runner-job.module.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/workspace-query-runner-job.module.ts @@ -15,19 +15,6 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works RecordPositionBackfillModule, HttpModule, ], - providers: [ - { - provide: CallWebhookJobsJob.name, - useClass: CallWebhookJobsJob, - }, - { - provide: CallWebhookJob.name, - useClass: CallWebhookJob, - }, - { - provide: RecordPositionBackfillJob.name, - useClass: RecordPositionBackfillJob, - }, - ], + providers: [CallWebhookJobsJob, CallWebhookJob, RecordPositionBackfillJob], }) export class WorkspaceQueryRunnerJobModule {} diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts index 755173ba63e1..04e015de1f25 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -8,12 +8,13 @@ import { objectRecordChangedValues } from 'src/engine/integrations/event-emitter import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event'; @Injectable() export class EntityEventsToDbListener { constructor( - @Inject(MessageQueue.entityEventsToDbQueue) + @InjectMessageQueue(MessageQueue.entityEventsToDbQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts index a1cab54beeba..d5ea042a1606 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.service.ts @@ -1,6 +1,5 @@ import { BadRequestException, - Inject, Injectable, Logger, RequestTimeoutException, @@ -52,6 +51,7 @@ import { assertMutationNotOnRemoteObject } from 'src/engine/metadata-modules/obj import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; import { assertIsValidUuid } from 'src/engine/api/graphql/workspace-query-runner/utils/assert-is-valid-uuid.util'; import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { WorkspaceQueryRunnerOptions } from './interfaces/query-runner-option.interface'; import { @@ -72,7 +72,7 @@ export class WorkspaceQueryRunnerService { private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly queryRunnerArgsFactory: QueryRunnerArgsFactory, private readonly queryResultGettersFactory: QueryResultGettersFactory, - @Inject(MessageQueue.webhookQueue) + @InjectMessageQueue(MessageQueue.webhookQueue) private readonly messageQueueService: MessageQueueService, private readonly eventEmitter: EventEmitter2, private readonly workspacePreQueryHookService: WorkspacePreQueryHookService, diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index ed4b58e59f9b..f580a34df358 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Inject } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { EntityManager } from 'typeorm'; import { v4 } from 'uuid'; @@ -34,15 +34,16 @@ import { MessagingMessageListFetchJob, MessagingMessageListFetchJobData, } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; @Injectable() export class GoogleAPIsService { constructor( private readonly dataSourceService: DataSourceService, private readonly typeORMService: TypeORMService, - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @Inject(MessageQueue.calendarQueue) + @InjectMessageQueue(MessageQueue.calendarQueue) private readonly calendarQueueService: MessageQueueService, private readonly environmentService: EnvironmentService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) diff --git a/packages/twenty-server/src/engine/core-modules/billing/jobs/update-subscription.job.ts b/packages/twenty-server/src/engine/core-modules/billing/jobs/update-subscription.job.ts index cf3ff0367a8d..33f690fdddbe 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/jobs/update-subscription.job.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/jobs/update-subscription.job.ts @@ -1,22 +1,24 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; import { BillingService } from 'src/engine/core-modules/billing/billing.service'; import { UserWorkspaceService } from 'src/engine/core-modules/user-workspace/user-workspace.service'; import { StripeService } from 'src/engine/core-modules/billing/stripe/stripe.service'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type UpdateSubscriptionJobData = { workspaceId: string }; -@Injectable() -export class UpdateSubscriptionJob - implements MessageQueueJob -{ + +@Processor(MessageQueue.billingQueue) +export class UpdateSubscriptionJob { protected readonly logger = new Logger(UpdateSubscriptionJob.name); + constructor( private readonly billingService: BillingService, private readonly userWorkspaceService: UserWorkspaceService, private readonly stripeService: StripeService, ) {} + @Process(UpdateSubscriptionJob.name) async handle(data: UpdateSubscriptionJobData): Promise { const workspaceMembersCount = await this.userWorkspaceService.getWorkspaceMemberCount(data.workspaceId); diff --git a/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts b/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts index 77365b74aa8e..ae945d309b8b 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-workspace-member.listener.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -10,11 +10,12 @@ import { UpdateSubscriptionJobData, } from 'src/engine/core-modules/billing/jobs/update-subscription.job'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; @Injectable() export class BillingWorkspaceMemberListener { constructor( - @Inject(MessageQueue.billingQueue) + @InjectMessageQueue(MessageQueue.billingQueue) private readonly messageQueueService: MessageQueueService, private readonly environmentService: EnvironmentService, ) {} diff --git a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts index 2df9de73b593..2a706f024dd6 100644 --- a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts +++ b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts @@ -10,7 +10,6 @@ import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timel import { TimelineCalendarEventModule } from 'src/engine/core-modules/calendar/timeline-calendar-event.module'; import { BillingModule } from 'src/engine/core-modules/billing/billing.module'; import { HealthModule } from 'src/engine/core-modules/health/health.module'; -import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module'; import { AnalyticsModule } from './analytics/analytics.module'; @@ -19,9 +18,6 @@ import { ClientConfigModule } from './client-config/client-config.module'; @Module({ imports: [ - TwentyORMModule.register({ - workspaceEntities: ['dist/src/**/*.workspace-entity{.ts,.js}'], - }), HealthModule, AnalyticsModule, AuthModule, diff --git a/packages/twenty-server/src/engine/core-modules/workspace/handle-workspace-member-deleted.job.ts b/packages/twenty-server/src/engine/core-modules/workspace/handle-workspace-member-deleted.job.ts index 8ec5dad7ded7..7e3bce067751 100644 --- a/packages/twenty-server/src/engine/core-modules/workspace/handle-workspace-member-deleted.job.ts +++ b/packages/twenty-server/src/engine/core-modules/workspace/handle-workspace-member-deleted.job.ts @@ -1,19 +1,18 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { WorkspaceService } from 'src/engine/core-modules/workspace/services/workspace.service'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type HandleWorkspaceMemberDeletedJobData = { workspaceId: string; userId: string; }; -@Injectable() -export class HandleWorkspaceMemberDeletedJob - implements MessageQueueJob -{ + +@Processor(MessageQueue.workspaceQueue) +export class HandleWorkspaceMemberDeletedJob { constructor(private readonly workspaceService: WorkspaceService) {} + @Process(HandleWorkspaceMemberDeletedJob.name) async handle(data: HandleWorkspaceMemberDeletedJobData): Promise { const { workspaceId, userId } = data; diff --git a/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts b/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts index dbb717fcfd15..240b4eb1cf5f 100644 --- a/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts +++ b/packages/twenty-server/src/engine/core-modules/workspace/workspace-workspace-member.listener.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -9,11 +9,12 @@ import { HandleWorkspaceMemberDeletedJob, HandleWorkspaceMemberDeletedJobData, } from 'src/engine/core-modules/workspace/handle-workspace-member-deleted.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; @Injectable() export class WorkspaceWorkspaceMemberListener { constructor( - @Inject(MessageQueue.workspaceQueue) + @InjectMessageQueue(MessageQueue.workspaceQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/engine/integrations/email/email-sender.job.ts b/packages/twenty-server/src/engine/integrations/email/email-sender.job.ts index 7b284d4b26c3..d97117d5063c 100644 --- a/packages/twenty-server/src/engine/integrations/email/email-sender.job.ts +++ b/packages/twenty-server/src/engine/integrations/email/email-sender.job.ts @@ -1,15 +1,15 @@ -import { Injectable } from '@nestjs/common'; - import { SendMailOptions } from 'nodemailer'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { EmailSenderService } from 'src/engine/integrations/email/email-sender.service'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -@Injectable() -export class EmailSenderJob implements MessageQueueJob { +@Processor(MessageQueue.emailQueue) +export class EmailSenderJob { constructor(private readonly emailSenderService: EmailSenderService) {} + @Process(EmailSenderJob.name) async handle(data: SendMailOptions): Promise { await this.emailSenderService.send(data); } diff --git a/packages/twenty-server/src/engine/integrations/email/email.service.ts b/packages/twenty-server/src/engine/integrations/email/email.service.ts index a149c78023b2..79217adb8d1e 100644 --- a/packages/twenty-server/src/engine/integrations/email/email.service.ts +++ b/packages/twenty-server/src/engine/integrations/email/email.service.ts @@ -1,15 +1,16 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { SendMailOptions } from 'nodemailer'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { EmailSenderJob } from 'src/engine/integrations/email/email-sender.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; @Injectable() export class EmailService { constructor( - @Inject(MessageQueue.emailQueue) + @InjectMessageQueue(MessageQueue.emailQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/engine/integrations/integrations.module.ts b/packages/twenty-server/src/engine/integrations/integrations.module.ts index df855478af6e..fff43cdfd2a6 100644 --- a/packages/twenty-server/src/engine/integrations/integrations.module.ts +++ b/packages/twenty-server/src/engine/integrations/integrations.module.ts @@ -30,7 +30,7 @@ import { MessageQueueModule } from './message-queue/message-queue.module'; useFactory: loggerModuleFactory, inject: [EnvironmentService], }), - MessageQueueModule.forRoot({ + MessageQueueModule.registerAsync({ useFactory: messageQueueModuleFactory, inject: [EnvironmentService], }), diff --git a/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts b/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts index 69fa50ad5ef7..5a275d3d5b82 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts @@ -1,7 +1,8 @@ import { Inject } from '@nestjs/common'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { getQueueToken } from 'src/engine/integrations/message-queue/utils/get-queue-token.util'; -export const InjectMessageQueue = (messageQueueName: MessageQueue) => { - return Inject(messageQueueName); +export const InjectMessageQueue = (queueName: MessageQueue) => { + return Inject(getQueueToken(queueName)); }; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/decorators/process.decorator.ts b/packages/twenty-server/src/engine/integrations/message-queue/decorators/process.decorator.ts new file mode 100644 index 000000000000..214742cb0af0 --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/decorators/process.decorator.ts @@ -0,0 +1,21 @@ +import { SetMetadata } from '@nestjs/common'; +import { isString } from '@nestjs/common/utils/shared.utils'; + +import { PROCESS_METADATA } from 'src/engine/integrations/message-queue/message-queue.constants'; + +export interface MessageQueueProcessOptions { + jobName: string; + concurrency?: number; +} + +export function Process(jobName: string): MethodDecorator; +export function Process(options: MessageQueueProcessOptions): MethodDecorator; +export function Process( + nameOrOptions: string | MessageQueueProcessOptions, +): MethodDecorator { + const options = isString(nameOrOptions) + ? { jobName: nameOrOptions } + : nameOrOptions; + + return SetMetadata(PROCESS_METADATA, options || {}); +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/decorators/processor.decorator.ts b/packages/twenty-server/src/engine/integrations/message-queue/decorators/processor.decorator.ts new file mode 100644 index 000000000000..a244dc4d14ce --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/decorators/processor.decorator.ts @@ -0,0 +1,69 @@ +import { Scope, SetMetadata } from '@nestjs/common'; +import { SCOPE_OPTIONS_METADATA } from '@nestjs/common/constants'; + +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; + +import { + MessageQueue, + PROCESSOR_METADATA, + WORKER_METADATA, +} from 'src/engine/integrations/message-queue/message-queue.constants'; + +export interface MessageQueueProcessorOptions { + /** + * Specifies the name of the queue to subscribe to. + */ + queueName: MessageQueue; + /** + * Specifies the lifetime of an injected Processor. + */ + scope?: Scope; +} + +/** + * Represents a worker that is able to process jobs from the queue. + * @param queueName name of the queue to process + */ +export function Processor(queueName: string): ClassDecorator; +/** + * Represents a worker that is able to process jobs from the queue. + * @param queueName name of the queue to process + * @param workerOptions additional worker options + */ +export function Processor( + queueName: string, + workerOptions: MessageQueueWorkerOptions, +): ClassDecorator; +/** + * Represents a worker that is able to process jobs from the queue. + * @param processorOptions processor options + */ +export function Processor( + processorOptions: MessageQueueProcessorOptions, +): ClassDecorator; +/** + * Represents a worker that is able to process jobs from the queue. + * @param processorOptions processor options (Nest-specific) + * @param workerOptions additional Bull worker options + */ +export function Processor( + processorOptions: MessageQueueProcessorOptions, + workerOptions: MessageQueueWorkerOptions, +): ClassDecorator; +export function Processor( + queueNameOrOptions?: string | MessageQueueProcessorOptions, + maybeWorkerOptions?: MessageQueueWorkerOptions, +): ClassDecorator { + const options = + queueNameOrOptions && typeof queueNameOrOptions === 'object' + ? queueNameOrOptions + : { queueName: queueNameOrOptions }; + + // eslint-disable-next-line @typescript-eslint/ban-types + return (target: Function) => { + SetMetadata(SCOPE_OPTIONS_METADATA, options)(target); + SetMetadata(PROCESSOR_METADATA, options)(target); + maybeWorkerOptions && + SetMetadata(WORKER_METADATA, maybeWorkerOptions)(target); + }; +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts index fc78eaabb064..c3324a500604 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts @@ -1,9 +1,14 @@ +import { OnModuleDestroy } from '@nestjs/common'; + +import omitBy from 'lodash.omitby'; import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq'; import { QueueCronJobOptions, QueueJobOptions, } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -11,7 +16,7 @@ import { MessageQueueDriver } from './interfaces/message-queue-driver.interface' export type BullMQDriverOptions = QueueOptions; -export class BullMQDriver implements MessageQueueDriver { +export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy { private queueMap: Record = {} as Record< MessageQueue, Queue @@ -27,7 +32,7 @@ export class BullMQDriver implements MessageQueueDriver { this.queueMap[queueName] = new Queue(queueName, this.options); } - async stop() { + async onModuleDestroy() { const workers = Object.values(this.workerMap); const queues = Object.values(this.queueMap); @@ -39,14 +44,22 @@ export class BullMQDriver implements MessageQueueDriver { async work( queueName: MessageQueue, - handler: ({ data, id }: { data: T; id: string }) => Promise, + handler: (job: MessageQueueJob) => Promise, + options?: MessageQueueWorkerOptions, ) { const worker = new Worker( queueName, async (job) => { - await handler(job as { data: T; id: string }); + // TODO: Correctly support for job.id + await handler({ data: job.data, id: job.id ?? '', name: job.name }); }, - this.options, + omitBy( + { + ...this.options, + concurrency: options?.concurrency, + }, + (value) => value === undefined, + ), ); this.workerMap[queueName] = worker; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts index cdd30913f439..1d53837d359a 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts @@ -3,6 +3,7 @@ import { QueueJobOptions, } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueueJobData } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -16,6 +17,7 @@ export interface MessageQueueDriver { work( queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string }) => Promise | void, + options?: MessageQueueWorkerOptions, ); addCron( queueName: MessageQueue, @@ -24,6 +26,5 @@ export interface MessageQueueDriver { options?: QueueCronJobOptions, ); removeCron(queueName: MessageQueue, jobName: string, pattern?: string); - stop?(): Promise; register?(queueName: MessageQueue): void; } diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts index 5210593c3ff0..e02ec650720c 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts @@ -1,9 +1,13 @@ +import { OnModuleDestroy, OnModuleInit } from '@nestjs/common'; + import PgBoss from 'pg-boss'; import { QueueCronJobOptions, QueueJobOptions, } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -13,26 +17,37 @@ export type PgBossDriverOptions = PgBoss.ConstructorOptions; const DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED = '*/1 * * * *'; -export class PgBossDriver implements MessageQueueDriver { +export class PgBossDriver + implements MessageQueueDriver, OnModuleInit, OnModuleDestroy +{ private pgBoss: PgBoss; constructor(options: PgBossDriverOptions) { this.pgBoss = new PgBoss(options); } - async stop() { - await this.pgBoss.stop(); + async onModuleInit() { + await this.pgBoss.start(); } - async init(): Promise { - await this.pgBoss.start(); + async onModuleDestroy() { + await this.pgBoss.stop(); } async work( queueName: string, - handler: ({ data, id }: { data: T; id: string }) => Promise, + handler: (job: MessageQueueJob) => Promise, + options?: MessageQueueWorkerOptions, ) { - return this.pgBoss.work(`${queueName}.*`, handler); + return this.pgBoss.work( + `${queueName}.*`, + { + teamConcurrency: options?.concurrency, + }, + async (job) => { + await handler({ data: job.data, id: job.id, name: job.name }); + }, + ); } async addCron( diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts index ca48a0236510..4b750c39c6bb 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts @@ -1,57 +1,66 @@ -import { ModuleRef } from '@nestjs/core'; import { Logger } from '@nestjs/common'; -import { MessageQueueDriver } from 'src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; import { - MessageQueueCronJobData, - MessageQueueJob, MessageQueueJobData, + MessageQueueJob, } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { getJobClassName } from 'src/engine/integrations/message-queue/utils/get-job-class-name.util'; + +import { MessageQueueDriver } from './interfaces/message-queue-driver.interface'; export class SyncDriver implements MessageQueueDriver { private readonly logger = new Logger(SyncDriver.name); - constructor(private readonly jobsModuleRef: ModuleRef) {} + private workersMap: { + [queueName: string]: (job: MessageQueueJob) => Promise | void; + } = {}; + + constructor() {} async add( - _queueName: MessageQueue, + queueName: MessageQueue, jobName: string, data: T, ): Promise { - const jobClassName = getJobClassName(jobName); - const job: MessageQueueJob = this.jobsModuleRef.get( - jobClassName, - { strict: false }, - ); - - await job.handle(data); + await this.processJob(queueName, { id: '', name: jobName, data }); } async addCron( - _queueName: MessageQueue, + queueName: MessageQueue, jobName: string, data: T, ): Promise { this.logger.log(`Running cron job with SyncDriver`); - - const jobClassName = getJobClassName(jobName); - const job: MessageQueueCronJobData = - this.jobsModuleRef.get(jobClassName, { - strict: true, - }); - - await job.handle(data); + await this.processJob(queueName, { + id: '', + name: jobName, + // TODO: Fix this type issue + data: data as any, + }); } - async removeCron(_queueName: MessageQueue, jobName: string) { - this.logger.log(`Removing '${jobName}' cron job with SyncDriver`); + async removeCron(queueName: MessageQueue, jobName: string) { + this.logger.log(`Removing '${queueName}' cron job with SyncDriver`); + } - return; + work( + queueName: MessageQueue, + handler: (job: MessageQueueJob) => Promise | void, + ) { + this.logger.log(`Registering handler for queue: ${queueName}`); + this.workersMap[queueName] = handler; } - work() { - return; + async processJob( + queueName: string, + job: MessageQueueJob, + ) { + const worker = this.workersMap[queueName]; + + if (worker) { + await worker(job); + } else { + this.logger.error(`No handler found for job: ${queueName}`); + } } } diff --git a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/index.ts b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/index.ts index 300859d017d2..4fdb52388cd2 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/index.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/index.ts @@ -1 +1 @@ -export * from './message-queue.interface'; +export * from './message-queue-module-options.interface'; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-job.interface.ts b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-job.interface.ts index 87423ffd2bee..8a1ced80ec50 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-job.interface.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-job.interface.ts @@ -1,5 +1,7 @@ -export interface MessageQueueJob { - handle(data: T): Promise | void; +export interface MessageQueueJob { + id: string; + name: string; + data: T; } export interface MessageQueueCronJobData< diff --git a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue.interface.ts b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-module-options.interface.ts similarity index 72% rename from packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue.interface.ts rename to packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-module-options.interface.ts index 568e0819fc8e..b98990385204 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue.interface.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-module-options.interface.ts @@ -1,5 +1,3 @@ -import { FactoryProvider, ModuleMetadata } from '@nestjs/common'; - import { BullMQDriverOptions } from 'src/engine/integrations/message-queue/drivers/bullmq.driver'; import { PgBossDriverOptions } from 'src/engine/integrations/message-queue/drivers/pg-boss.driver'; @@ -28,10 +26,3 @@ export type MessageQueueModuleOptions = | PgBossDriverFactoryOptions | BullMQDriverFactoryOptions | SyncDriverFactoryOptions; - -export type MessageQueueModuleAsyncOptions = { - useFactory: ( - ...args: any[] - ) => MessageQueueModuleOptions | Promise; -} & Pick & - Pick; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface.ts b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface.ts new file mode 100644 index 000000000000..c4def7121eb0 --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface.ts @@ -0,0 +1,3 @@ +export interface MessageQueueWorkerOptions { + concurrency?: number; +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts index 5447402d603b..c2e7ecf0928a 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts @@ -23,9 +23,9 @@ import { CalendarMessagingParticipantJobModule } from 'src/modules/calendar-mess import { CalendarCronJobModule } from 'src/modules/calendar/crons/jobs/calendar-cron-job.module'; import { CalendarJobModule } from 'src/modules/calendar/jobs/calendar-job.module'; import { AutoCompaniesAndContactsCreationJobModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/auto-companies-and-contacts-creation-job.module'; -import { CalendarModule } from 'src/modules/calendar/calendar.module'; import { MessagingModule } from 'src/modules/messaging/messaging.module'; import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module'; +import { CalendarModule } from 'src/modules/calendar/calendar.module'; @Module({ imports: [ @@ -39,11 +39,10 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module UserWorkspaceModule, WorkspaceModule, MessagingModule, + CalendarModule, CalendarEventParticipantModule, TimelineActivityModule, StripeModule, - CalendarModule, - // JobsModules WorkspaceQueryRunnerJobModule, CalendarMessagingParticipantJobModule, CalendarCronJobModule, @@ -52,20 +51,11 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module TimelineJobModule, ], providers: [ - { - provide: CleanInactiveWorkspaceJob.name, - useClass: CleanInactiveWorkspaceJob, - }, - { provide: EmailSenderJob.name, useClass: EmailSenderJob }, - { - provide: DataSeedDemoWorkspaceJob.name, - useClass: DataSeedDemoWorkspaceJob, - }, - { provide: UpdateSubscriptionJob.name, useClass: UpdateSubscriptionJob }, - { - provide: HandleWorkspaceMemberDeletedJob.name, - useClass: HandleWorkspaceMemberDeletedJob, - }, + CleanInactiveWorkspaceJob, + EmailSenderJob, + DataSeedDemoWorkspaceJob, + UpdateSubscriptionJob, + HandleWorkspaceMemberDeletedJob, ], }) export class JobsModule { diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue-core.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue-core.module.ts new file mode 100644 index 000000000000..c22b0abce39c --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue-core.module.ts @@ -0,0 +1,121 @@ +import { + DynamicModule, + Global, + Logger, + Module, + Provider, +} from '@nestjs/common'; + +import { MessageQueueDriver } from 'src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; + +import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces'; +import { + MessageQueue, + QUEUE_DRIVER, +} from 'src/engine/integrations/message-queue/message-queue.constants'; +import { PgBossDriver } from 'src/engine/integrations/message-queue/drivers/pg-boss.driver'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { BullMQDriver } from 'src/engine/integrations/message-queue/drivers/bullmq.driver'; +import { SyncDriver } from 'src/engine/integrations/message-queue/drivers/sync.driver'; +import { getQueueToken } from 'src/engine/integrations/message-queue/utils/get-queue-token.util'; +import { + ASYNC_OPTIONS_TYPE, + ConfigurableModuleClass, + OPTIONS_TYPE, +} from 'src/engine/integrations/message-queue/message-queue.module-definition'; + +@Global() +@Module({}) +export class MessageQueueCoreModule extends ConfigurableModuleClass { + private static readonly logger = new Logger(MessageQueueCoreModule.name); + + static register(options: typeof OPTIONS_TYPE): DynamicModule { + const dynamicModule = super.register(options); + + const driverProvider: Provider = { + provide: QUEUE_DRIVER, + useFactory: () => { + return this.createDriver(options); + }, + }; + + const queueProviders = this.createQueueProviders(); + + return { + ...dynamicModule, + providers: [ + ...(dynamicModule.providers ?? []), + driverProvider, + ...queueProviders, + ], + exports: [ + ...(dynamicModule.exports ?? []), + ...Object.values(MessageQueue).map((queueName) => + getQueueToken(queueName), + ), + ], + }; + } + + static registerAsync(options: typeof ASYNC_OPTIONS_TYPE): DynamicModule { + const dynamicModule = super.registerAsync(options); + + const driverProvider: Provider = { + provide: QUEUE_DRIVER, + useFactory: async (...args: any[]) => { + const config = await options.useFactory!(...args); + + return this.createDriver(config); + }, + inject: options.inject || [], + }; + + const queueProviders = MessageQueueCoreModule.createQueueProviders(); + + return { + ...dynamicModule, + providers: [ + ...(dynamicModule.providers ?? []), + driverProvider, + ...queueProviders, + ], + exports: [ + ...(dynamicModule.exports ?? []), + ...Object.values(MessageQueue).map((queueName) => + getQueueToken(queueName), + ), + ], + }; + } + + static async createDriver({ type, options }: typeof OPTIONS_TYPE) { + switch (type) { + case MessageQueueDriverType.PgBoss: { + return new PgBossDriver(options); + } + case MessageQueueDriverType.BullMQ: { + return new BullMQDriver(options); + } + case MessageQueueDriverType.Sync: { + return new SyncDriver(); + } + default: { + this.logger.warn( + `Unsupported message queue driver type: ${type}. Using SyncDriver by default.`, + ); + + return new SyncDriver(); + } + } + } + + static createQueueProviders(): Provider[] { + return Object.values(MessageQueue).map((queueName) => ({ + provide: getQueueToken(queueName), + useFactory: (driver: MessageQueueDriver) => { + return new MessageQueueService(driver, queueName); + }, + inject: [QUEUE_DRIVER], + })); + } +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue-metadata.accessor.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue-metadata.accessor.ts new file mode 100644 index 000000000000..e509e1fd8718 --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue-metadata.accessor.ts @@ -0,0 +1,54 @@ +/* eslint-disable @typescript-eslint/ban-types */ +import { Injectable, Type } from '@nestjs/common'; +import { Reflector } from '@nestjs/core'; + +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; + +import { MessageQueueProcessOptions } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { MessageQueueProcessorOptions } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { + PROCESSOR_METADATA, + PROCESS_METADATA, + WORKER_METADATA, +} from 'src/engine/integrations/message-queue/message-queue.constants'; + +@Injectable() +export class MessageQueueMetadataAccessor { + constructor(private readonly reflector: Reflector) {} + + isProcessor(target: Type | Function): boolean { + if (!target) { + return false; + } + + return !!this.reflector.get(PROCESSOR_METADATA, target); + } + + isProcess(target: Type | Function): boolean { + if (!target) { + return false; + } + + return !!this.reflector.get(PROCESS_METADATA, target); + } + + getProcessorMetadata( + target: Type | Function, + ): MessageQueueProcessorOptions | undefined { + return this.reflector.get(PROCESSOR_METADATA, target); + } + + getProcessMetadata( + target: Type | Function, + ): MessageQueueProcessOptions | undefined { + const metadata = this.reflector.get(PROCESS_METADATA, target); + + return metadata; + } + + getWorkerOptionsMetadata( + target: Type | Function, + ): MessageQueueWorkerOptions { + return this.reflector.get(WORKER_METADATA, target) ?? {}; + } +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.constants.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.constants.ts index 7576b8693a01..e78cc50939bf 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.constants.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.constants.ts @@ -1,4 +1,7 @@ -export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER'); +export const PROCESSOR_METADATA = Symbol('message-queue:processor_metadata'); +export const PROCESS_METADATA = Symbol('message-queue:process_metadata'); +export const WORKER_METADATA = Symbol('bullmq:worker_metadata'); +export const QUEUE_DRIVER = Symbol('message-queue:queue_driver'); export enum MessageQueue { taskAssignedQueue = 'task-assigned-queue', @@ -12,4 +15,5 @@ export enum MessageQueue { workspaceQueue = 'workspace-queue', recordPositionBackfillQueue = 'record-position-backfill-queue', entityEventsToDbQueue = 'entity-events-to-db-queue', + testQueue = 'test-queue', } diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts new file mode 100644 index 000000000000..838143f09d2f --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts @@ -0,0 +1,209 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { + DiscoveryService, + MetadataScanner, + ModuleRef, + createContextId, +} from '@nestjs/core'; +import { Module } from '@nestjs/core/injector/module'; +import { Injector } from '@nestjs/core/injector/injector'; +import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; + +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; +import { + MessageQueueJob, + MessageQueueJobData, +} from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { getQueueToken } from 'src/engine/integrations/message-queue/utils/get-queue-token.util'; +import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; +import { shouldFilterException } from 'src/engine/utils/global-exception-handler.util'; + +import { MessageQueueMetadataAccessor } from './message-queue-metadata.accessor'; + +interface ProcessorGroup { + instance: object; + host: Module; + processMethodNames: string[]; + isRequestScoped: boolean; +} + +@Injectable() +export class MessageQueueExplorer implements OnModuleInit { + private readonly logger = new Logger('MessageQueueModule'); + private readonly injector = new Injector(); + + constructor( + private readonly moduleRef: ModuleRef, + private readonly discoveryService: DiscoveryService, + private readonly metadataAccessor: MessageQueueMetadataAccessor, + private readonly metadataScanner: MetadataScanner, + private readonly exceptionHandlerService: ExceptionHandlerService, + ) {} + + onModuleInit() { + this.explore(); + } + + explore() { + const processors = this.discoveryService + .getProviders() + .filter((wrapper) => + this.metadataAccessor.isProcessor( + !wrapper.metatype || wrapper.inject + ? wrapper.instance?.constructor + : wrapper.metatype, + ), + ); + + const groupedProcessors = this.groupProcessorsByQueueName(processors); + + for (const [queueName, processorGroupCollection] of Object.entries( + groupedProcessors, + )) { + const queueToken = getQueueToken(queueName); + const messageQueueService = this.getQueueService(queueToken); + + this.handleProcessorGroupCollection( + processorGroupCollection, + messageQueueService, + ); + } + } + + private groupProcessorsByQueueName(processors: InstanceWrapper[]) { + return processors.reduce( + (acc, wrapper) => { + const { instance, metatype } = wrapper; + const methodNames = this.metadataScanner.getAllMethodNames(instance); + const { queueName } = + this.metadataAccessor.getProcessorMetadata( + instance.constructor || metatype, + ) ?? {}; + + const processMethodNames = methodNames.filter((name) => + this.metadataAccessor.isProcess(instance[name]), + ); + + if (!queueName) { + this.logger.error( + `Processor ${wrapper.name} is missing queue name metadata`, + ); + + return acc; + } + + if (!wrapper.host) { + this.logger.error( + `Processor ${wrapper.name} is missing host metadata`, + ); + + return acc; + } + + if (!acc[queueName]) { + acc[queueName] = []; + } + + acc[queueName].push({ + instance, + host: wrapper.host, + processMethodNames, + isRequestScoped: !wrapper.isDependencyTreeStatic(), + }); + + return acc; + }, + {} as Record, + ); + } + + private getQueueService(queueToken: string): MessageQueueService { + try { + return this.moduleRef.get(queueToken, { + strict: false, + }); + } catch (err) { + this.logger.error(`No queue found for token ${queueToken}`); + throw err; + } + } + + private async handleProcessorGroupCollection( + processorGroupCollection: ProcessorGroup[], + queue: MessageQueueService, + options?: MessageQueueWorkerOptions, + ) { + queue.work(async (job) => { + for (const processorGroup of processorGroupCollection) { + await this.handleProcessor(processorGroup, job); + } + }, options); + } + + private async handleProcessor( + { instance, host, processMethodNames, isRequestScoped }: ProcessorGroup, + job: MessageQueueJob, + ) { + const processMetadataCollection = new Map( + processMethodNames.map((name) => { + const metadata = this.metadataAccessor.getProcessMetadata( + instance[name], + ); + + return [name, metadata]; + }), + ); + + if (isRequestScoped) { + const contextId = createContextId(); + + if (this.moduleRef.registerRequestByContextId) { + this.moduleRef.registerRequestByContextId( + { + // Add workspaceId to the request object + req: { + workspaceId: job.data.workspaceId, + }, + }, + contextId, + ); + } + + const contextInstance = await this.injector.loadPerContext( + instance, + host, + host.providers, + contextId, + ); + + await this.invokeProcessMethods( + contextInstance, + processMetadataCollection, + job, + ); + } else { + await this.invokeProcessMethods(instance, processMetadataCollection, job); + } + } + + private async invokeProcessMethods( + instance: object, + processMetadataCollection: Map, + job: MessageQueueJob, + ) { + for (const [methodName, metadata] of processMetadataCollection) { + if (job.name === metadata?.jobName) { + try { + await instance[methodName].call(instance, job.data); + } catch (err) { + if (!shouldFilterException(err)) { + this.exceptionHandlerService.captureExceptions([err]); + } + throw err; + } + } + } + } +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module-definition.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module-definition.ts new file mode 100644 index 000000000000..b7a437032f3e --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module-definition.ts @@ -0,0 +1,20 @@ +import { ConfigurableModuleBuilder } from '@nestjs/common'; + +import { MessageQueueModuleOptions } from 'src/engine/integrations/message-queue/interfaces'; + +export const { + ConfigurableModuleClass, + OPTIONS_TYPE, + ASYNC_OPTIONS_TYPE, + MODULE_OPTIONS_TOKEN, +} = new ConfigurableModuleBuilder() + .setExtras( + { + isGlobal: true, + }, + (definition, extras) => ({ + ...definition, + global: extras.isGlobal, + }), + ) + .build(); diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module.ts index 1f7cebba3e3b..ee7fb385b1fb 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.module.ts @@ -1,62 +1,36 @@ -import { DynamicModule, Global } from '@nestjs/common'; +import { DynamicModule, Global, Module } from '@nestjs/common'; +import { DiscoveryModule } from '@nestjs/core'; -import { MessageQueueDriver } from 'src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; - -import { - MessageQueueDriverType, - MessageQueueModuleAsyncOptions, -} from 'src/engine/integrations/message-queue/interfaces'; +import { MessageQueueCoreModule } from 'src/engine/integrations/message-queue/message-queue-core.module'; +import { MessageQueueMetadataAccessor } from 'src/engine/integrations/message-queue/message-queue-metadata.accessor'; +import { MessageQueueExplorer } from 'src/engine/integrations/message-queue/message-queue.explorer'; import { - MessageQueue, - QUEUE_DRIVER, -} from 'src/engine/integrations/message-queue/message-queue.constants'; -import { PgBossDriver } from 'src/engine/integrations/message-queue/drivers/pg-boss.driver'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { BullMQDriver } from 'src/engine/integrations/message-queue/drivers/bullmq.driver'; -import { SyncDriver } from 'src/engine/integrations/message-queue/drivers/sync.driver'; -import { JobsModule } from 'src/engine/integrations/message-queue/jobs.module'; + ASYNC_OPTIONS_TYPE, + OPTIONS_TYPE, +} from 'src/engine/integrations/message-queue/message-queue.module-definition'; @Global() +@Module({}) export class MessageQueueModule { - static forRoot(options: MessageQueueModuleAsyncOptions): DynamicModule { - const providers = [ - ...Object.values(MessageQueue).map((queue) => ({ - provide: queue, - useFactory: (driver: MessageQueueDriver) => { - return new MessageQueueService(driver, queue); - }, - inject: [QUEUE_DRIVER], - })), - { - provide: QUEUE_DRIVER, - useFactory: async (...args: any[]) => { - const config = await options.useFactory(...args); - - switch (config.type) { - case MessageQueueDriverType.PgBoss: { - const boss = new PgBossDriver(config.options); - - await boss.init(); + static register(options: typeof OPTIONS_TYPE): DynamicModule { + return { + module: MessageQueueModule, + imports: [MessageQueueCoreModule.register(options)], + }; + } - return boss; - } - case MessageQueueDriverType.BullMQ: { - return new BullMQDriver(config.options); - } - default: { - return new SyncDriver(JobsModule.moduleRef); - } - } - }, - inject: options.inject || [], - }, - ]; + static registerExplorer(): DynamicModule { + return { + module: MessageQueueModule, + imports: [DiscoveryModule], + providers: [MessageQueueExplorer, MessageQueueMetadataAccessor], + }; + } + static registerAsync(options: typeof ASYNC_OPTIONS_TYPE): DynamicModule { return { module: MessageQueueModule, - imports: [JobsModule, ...(options.imports || [])], - providers, - exports: Object.values(MessageQueue), + imports: [MessageQueueCoreModule.registerAsync(options)], }; } } diff --git a/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts b/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts index 87899696f907..6460e112d974 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts @@ -1,11 +1,15 @@ -import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { QueueCronJobOptions, QueueJobOptions, } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueueDriver } from 'src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; -import { MessageQueueJobData } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { + MessageQueueJobData, + MessageQueueJob, +} from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface'; import { MessageQueue, @@ -13,7 +17,7 @@ import { } from 'src/engine/integrations/message-queue/message-queue.constants'; @Injectable() -export class MessageQueueService implements OnModuleDestroy { +export class MessageQueueService { constructor( @Inject(QUEUE_DRIVER) protected driver: MessageQueueDriver, protected queueName: MessageQueue, @@ -23,12 +27,6 @@ export class MessageQueueService implements OnModuleDestroy { } } - async onModuleDestroy() { - if (typeof this.driver.stop === 'function') { - await this.driver.stop(); - } - } - add( jobName: string, data: T, @@ -50,8 +48,9 @@ export class MessageQueueService implements OnModuleDestroy { } work( - handler: ({ data, id }: { data: T; id: string }) => Promise | void, + handler: (job: MessageQueueJob) => Promise | void, + options?: MessageQueueWorkerOptions, ) { - return this.driver.work(this.queueName, handler); + return this.driver.work(this.queueName, handler, options); } } diff --git a/packages/twenty-server/src/engine/integrations/message-queue/utils/get-queue-token.util.ts b/packages/twenty-server/src/engine/integrations/message-queue/utils/get-queue-token.util.ts new file mode 100644 index 000000000000..e73faea901a5 --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/utils/get-queue-token.util.ts @@ -0,0 +1,2 @@ +export const getQueueToken = (queueName: string) => + `MESSAGE_QUEUE_${queueName}`; diff --git a/packages/twenty-server/src/engine/middlewares/graphql-hydrate-request-from-token.middleware.ts b/packages/twenty-server/src/engine/middlewares/graphql-hydrate-request-from-token.middleware.ts index b9d3d2cd5078..cbe7f3092963 100644 --- a/packages/twenty-server/src/engine/middlewares/graphql-hydrate-request-from-token.middleware.ts +++ b/packages/twenty-server/src/engine/middlewares/graphql-hydrate-request-from-token.middleware.ts @@ -55,6 +55,7 @@ export class GraphQLHydrateRequestFromTokenMiddleware req.user = data.user; req.workspace = data.workspace; + req.workspaceId = data.workspace.id; req.cacheVersion = cacheVersion; } catch (error) { res.writeHead(200, { 'Content-Type': 'application/json' }); diff --git a/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts b/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts index 0c21b6e74b6d..62166e536c51 100644 --- a/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts +++ b/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts @@ -3,7 +3,6 @@ import { REQUEST } from '@nestjs/core'; import { EntitySchema } from 'typeorm'; -import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { WorkspaceDatasourceFactory } from 'src/engine/twenty-orm/factories/workspace-datasource.factory'; @Injectable({ scope: Scope.REQUEST }) @@ -14,12 +13,13 @@ export class ScopedWorkspaceDatasourceFactory { ) {} public async create(entities: EntitySchema[]) { - const workspace: Workspace | undefined = this.request['req']?.['workspace']; + const workspaceId: string | undefined = + this.request['req']?.['workspaceId']; - if (!workspace) { + if (!workspaceId) { return null; } - return this.workspaceDataSourceFactory.create(entities, workspace.id); + return this.workspaceDataSourceFactory.create(entities, workspaceId); } } diff --git a/packages/twenty-server/src/engine/twenty-orm/twenty-orm-core.module.ts b/packages/twenty-server/src/engine/twenty-orm/twenty-orm-core.module.ts index befe7c28a717..353c3e5a5005 100644 --- a/packages/twenty-server/src/engine/twenty-orm/twenty-orm-core.module.ts +++ b/packages/twenty-server/src/engine/twenty-orm/twenty-orm-core.module.ts @@ -7,10 +7,6 @@ import { Provider, Type, } from '@nestjs/common'; -import { - ConfigurableModuleClass, - MODULE_OPTIONS_TOKEN, -} from '@nestjs/common/cache/cache.module-definition'; import { importClassesFromDirectories } from 'typeorm/util/DirectoryExportedClassesLoader'; import { Logger as TypeORMLogger } from 'typeorm/logger/Logger'; @@ -30,6 +26,10 @@ import { ScopedWorkspaceDatasourceFactory } from 'src/engine/twenty-orm/factorie import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; import { splitClassesAndStrings } from 'src/engine/twenty-orm/utils/split-classes-and-strings.util'; import { CustomWorkspaceEntity } from 'src/engine/twenty-orm/custom.workspace-entity'; +import { + ConfigurableModuleClass, + MODULE_OPTIONS_TOKEN, +} from 'src/engine/twenty-orm/twenty-orm.module-definition'; @Global() @Module({ @@ -46,7 +46,6 @@ export class TwentyORMCoreModule static register(options: TwentyORMOptions): DynamicModule { const dynamicModule = super.register(options); - console.log('register', options); const providers: Provider[] = [ { provide: TWENTY_ORM_WORKSPACE_DATASOURCE, diff --git a/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts b/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts index 3cf38e38c3f0..cd94d01298c4 100644 --- a/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts +++ b/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts @@ -1,5 +1,4 @@ import { DynamicModule, Global, Module } from '@nestjs/common'; -import { ConfigurableModuleClass } from '@nestjs/common/cache/cache.module-definition'; import { EntityClassOrSchema } from '@nestjs/typeorm/dist/interfaces/entity-class-or-schema.type'; import { @@ -12,7 +11,7 @@ import { TwentyORMCoreModule } from 'src/engine/twenty-orm/twenty-orm-core.modul @Global() @Module({}) -export class TwentyORMModule extends ConfigurableModuleClass { +export class TwentyORMModule { static register(options: TwentyORMOptions): DynamicModule { return { module: TwentyORMModule, diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/clean-inactive-workspaces.command.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/clean-inactive-workspaces.command.ts index 822fd2e94fd1..79b805c607cf 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/clean-inactive-workspaces.command.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/clean-inactive-workspaces.command.ts @@ -1,7 +1,6 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner, Option } from 'nest-commander'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job'; @@ -16,7 +15,7 @@ export type CleanInactiveWorkspacesCommandOptions = { }) export class CleanInactiveWorkspacesCommand extends CommandRunner { constructor( - @Inject(MessageQueue.taskAssignedQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts index 62b7f0fa0e4d..daee64136210 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts @@ -1,7 +1,6 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { cleanInactiveWorkspaceCronPattern } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.cron.pattern'; @@ -13,7 +12,7 @@ import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspac }) export class StartCleanInactiveWorkspacesCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/stop-clean-inactive-workspaces.cron.command.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/stop-clean-inactive-workspaces.cron.command.ts index 1dd6dd85e650..0fa47c6fb59c 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/stop-clean-inactive-workspaces.cron.command.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/stop-clean-inactive-workspaces.cron.command.ts @@ -1,7 +1,6 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { cleanInactiveWorkspaceCronPattern } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.cron.pattern'; @@ -13,7 +12,7 @@ import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspac }) export class StopCleanInactiveWorkspacesCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job.ts index 7d74648dccf9..95f9fc9a4f17 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { render } from '@react-email/render'; import { In } from 'typeorm'; @@ -7,8 +7,6 @@ import { DeleteInactiveWorkspaceEmail, } from 'twenty-emails'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service'; import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; @@ -20,6 +18,9 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat import { computeObjectTargetTable } from 'src/engine/utils/compute-object-target-table.util'; import { CleanInactiveWorkspacesCommandOptions } from 'src/engine/workspace-manager/workspace-cleaner/commands/clean-inactive-workspaces.command'; import { getDryRunLogHeader } from 'src/utils/get-dry-run-log-header'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; const MILLISECONDS_IN_ONE_DAY = 1000 * 3600 * 24; @@ -28,10 +29,8 @@ type WorkspaceToDeleteData = { daysSinceInactive: number; }; -@Injectable() -export class CleanInactiveWorkspaceJob - implements MessageQueueJob -{ +@Processor(MessageQueue.cronQueue) +export class CleanInactiveWorkspaceJob { private readonly logger = new Logger(CleanInactiveWorkspaceJob.name); private readonly inactiveDaysBeforeDelete; private readonly inactiveDaysBeforeEmail; @@ -193,6 +192,7 @@ export class CleanInactiveWorkspaceJob }); } + @Process(CleanInactiveWorkspaceJob.name) async handle(data: CleanInactiveWorkspacesCommandOptions): Promise { const isDryRun = data.dryRun || false; diff --git a/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module.ts b/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module.ts index a2c338dace65..4208f5b3d5f1 100644 --- a/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module.ts +++ b/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module.ts @@ -7,15 +7,6 @@ import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-co @Module({ imports: [CalendarEventParticipantModule, MessagingCommonModule], - providers: [ - { - provide: MatchParticipantJob.name, - useClass: MatchParticipantJob, - }, - { - provide: UnmatchParticipantJob.name, - useClass: UnmatchParticipantJob, - }, - ], + providers: [MatchParticipantJob, UnmatchParticipantJob], }) export class CalendarMessagingParticipantJobModule {} diff --git a/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/match-participant.job.ts b/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/match-participant.job.ts index 8f9015bfcd24..97f639e8d7a2 100644 --- a/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/match-participant.job.ts +++ b/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/match-participant.job.ts @@ -1,7 +1,6 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service'; import { MessagingMessageParticipantService } from 'src/modules/messaging/common/services/messaging-message-participant.service'; @@ -12,15 +11,14 @@ export type MatchParticipantJobData = { workspaceMemberId?: string; }; -@Injectable() -export class MatchParticipantJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class MatchParticipantJob { constructor( private readonly messageParticipantService: MessagingMessageParticipantService, private readonly calendarEventParticipantService: CalendarEventParticipantService, ) {} + @Process(MatchParticipantJob.name) async handle(data: MatchParticipantJobData): Promise { const { workspaceId, email, personId, workspaceMemberId } = data; diff --git a/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/unmatch-participant.job.ts b/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/unmatch-participant.job.ts index 11fee2cda22f..34b0e06f31ef 100644 --- a/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/unmatch-participant.job.ts +++ b/packages/twenty-server/src/modules/calendar-messaging-participant/jobs/unmatch-participant.job.ts @@ -1,9 +1,8 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service'; import { MessagingMessageParticipantService } from 'src/modules/messaging/common/services/messaging-message-participant.service'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type UnmatchParticipantJobData = { workspaceId: string; @@ -12,15 +11,14 @@ export type UnmatchParticipantJobData = { workspaceMemberId?: string; }; -@Injectable() -export class UnmatchParticipantJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class UnmatchParticipantJob { constructor( private readonly messageParticipantService: MessagingMessageParticipantService, private readonly calendarEventParticipantService: CalendarEventParticipantService, ) {} + @Process(UnmatchParticipantJob.name) async handle(data: UnmatchParticipantJobData): Promise { const { workspaceId, email, personId, workspaceMemberId } = data; diff --git a/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-person.listener.ts b/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-person.listener.ts index 67fa7e7dfa86..0435438a9a38 100644 --- a/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-person.listener.ts +++ b/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-person.listener.ts @@ -1,9 +1,10 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { @@ -19,7 +20,7 @@ import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/perso @Injectable() export class ParticipantPersonListener { constructor( - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-workspace-member.listener.ts b/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-workspace-member.listener.ts index 6c8de2e44652..5a708c3874df 100644 --- a/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-workspace-member.listener.ts +++ b/packages/twenty-server/src/modules/calendar-messaging-participant/listeners/participant-workspace-member.listener.ts @@ -1,9 +1,10 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { @@ -19,7 +20,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta @Injectable() export class ParticipantWorkspaceMemberListener { constructor( - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/modules/calendar/crons/commands/google-calendar-sync.cron.command.ts b/packages/twenty-server/src/modules/calendar/crons/commands/google-calendar-sync.cron.command.ts index 90eeed4a0596..3d0c110ad666 100644 --- a/packages/twenty-server/src/modules/calendar/crons/commands/google-calendar-sync.cron.command.ts +++ b/packages/twenty-server/src/modules/calendar/crons/commands/google-calendar-sync.cron.command.ts @@ -1,7 +1,6 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { GoogleCalendarSyncCronJob } from 'src/modules/calendar/crons/jobs/google-calendar-sync.cron.job'; @@ -14,7 +13,7 @@ const GOOGLE_CALENDAR_SYNC_CRON_PATTERN = '*/5 * * * *'; }) export class GoogleCalendarSyncCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/modules/calendar/crons/jobs/calendar-cron-job.module.ts b/packages/twenty-server/src/modules/calendar/crons/jobs/calendar-cron-job.module.ts index 04f57299fa88..2545118d5d74 100644 --- a/packages/twenty-server/src/modules/calendar/crons/jobs/calendar-cron-job.module.ts +++ b/packages/twenty-server/src/modules/calendar/crons/jobs/calendar-cron-job.module.ts @@ -13,11 +13,6 @@ import { WorkspaceGoogleCalendarSyncModule } from 'src/modules/calendar/services TypeOrmModule.forFeature([DataSourceEntity], 'metadata'), WorkspaceGoogleCalendarSyncModule, ], - providers: [ - { - provide: GoogleCalendarSyncCronJob.name, - useClass: GoogleCalendarSyncCronJob, - }, - ], + providers: [GoogleCalendarSyncCronJob], }) export class CalendarCronJobModule {} diff --git a/packages/twenty-server/src/modules/calendar/crons/jobs/google-calendar-sync.cron.job.ts b/packages/twenty-server/src/modules/calendar/crons/jobs/google-calendar-sync.cron.job.ts index 7c2072c41e1c..f9b39cfaba8d 100644 --- a/packages/twenty-server/src/modules/calendar/crons/jobs/google-calendar-sync.cron.job.ts +++ b/packages/twenty-server/src/modules/calendar/crons/jobs/google-calendar-sync.cron.job.ts @@ -1,17 +1,17 @@ -import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { WorkspaceGoogleCalendarSyncService } from 'src/modules/calendar/services/workspace-google-calendar-sync/workspace-google-calendar-sync.service'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -@Injectable() -export class GoogleCalendarSyncCronJob implements MessageQueueJob { +@Processor(MessageQueue.cronQueue) +export class GoogleCalendarSyncCronJob { constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @@ -21,6 +21,7 @@ export class GoogleCalendarSyncCronJob implements MessageQueueJob { private readonly environmentService: EnvironmentService, ) {} + @Process(GoogleCalendarSyncCronJob.name) async handle(): Promise { const workspaceIds = ( await this.workspaceRepository.find({ diff --git a/packages/twenty-server/src/modules/calendar/jobs/blocklist-item-delete-calendar-events.job.ts b/packages/twenty-server/src/modules/calendar/jobs/blocklist-item-delete-calendar-events.job.ts index a3196a02a45d..5b9b9ddc9d7c 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/blocklist-item-delete-calendar-events.job.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/blocklist-item-delete-calendar-events.job.ts @@ -1,7 +1,7 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { CalendarChannelEventAssociationRepository } from 'src/modules/calendar/repositories/calendar-channel-event-association.repository'; import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository'; @@ -10,16 +10,15 @@ import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/cale import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/standard-objects/calendar-channel.workspace-entity'; import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type BlocklistItemDeleteCalendarEventsJobData = { workspaceId: string; blocklistItemId: string; }; -@Injectable() -export class BlocklistItemDeleteCalendarEventsJob - implements MessageQueueJob -{ +@Processor(MessageQueue.calendarQueue) +export class BlocklistItemDeleteCalendarEventsJob { private readonly logger = new Logger( BlocklistItemDeleteCalendarEventsJob.name, ); @@ -36,6 +35,7 @@ export class BlocklistItemDeleteCalendarEventsJob private readonly calendarEventCleanerService: CalendarEventCleanerService, ) {} + @Process(BlocklistItemDeleteCalendarEventsJob.name) async handle(data: BlocklistItemDeleteCalendarEventsJobData): Promise { const { workspaceId, blocklistItemId } = data; diff --git a/packages/twenty-server/src/modules/calendar/jobs/blocklist-reimport-calendar-events.job.ts b/packages/twenty-server/src/modules/calendar/jobs/blocklist-reimport-calendar-events.job.ts index 85660dae7d5a..73069a36a88b 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/blocklist-reimport-calendar-events.job.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/blocklist-reimport-calendar-events.job.ts @@ -1,11 +1,12 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type BlocklistReimportCalendarEventsJobData = { workspaceId: string; @@ -13,10 +14,8 @@ export type BlocklistReimportCalendarEventsJobData = { handle: string; }; -@Injectable() -export class BlocklistReimportCalendarEventsJob - implements MessageQueueJob -{ +@Processor(MessageQueue.calendarQueue) +export class BlocklistReimportCalendarEventsJob { private readonly logger = new Logger(BlocklistReimportCalendarEventsJob.name); constructor( @@ -25,6 +24,7 @@ export class BlocklistReimportCalendarEventsJob private readonly googleCalendarSyncService: GoogleCalendarSyncService, ) {} + @Process(BlocklistReimportCalendarEventsJob.name) async handle(data: BlocklistReimportCalendarEventsJobData): Promise { const { workspaceId, workspaceMemberId, handle } = data; diff --git a/packages/twenty-server/src/modules/calendar/jobs/calendar-create-company-and-contact-after-sync.job.ts b/packages/twenty-server/src/modules/calendar/jobs/calendar-create-company-and-contact-after-sync.job.ts index 3c1b9046b9a5..0dec9f8e8f99 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/calendar-create-company-and-contact-after-sync.job.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/calendar-create-company-and-contact-after-sync.job.ts @@ -1,7 +1,8 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository'; import { CalendarEventParticipantRepository } from 'src/modules/calendar/repositories/calendar-event-participant.repository'; @@ -16,10 +17,8 @@ export type CalendarCreateCompanyAndContactAfterSyncJobData = { calendarChannelId: string; }; -@Injectable() -export class CalendarCreateCompanyAndContactAfterSyncJob - implements MessageQueueJob -{ +@Processor(MessageQueue.calendarQueue) +export class CalendarCreateCompanyAndContactAfterSyncJob { private readonly logger = new Logger( CalendarCreateCompanyAndContactAfterSyncJob.name, ); @@ -33,6 +32,7 @@ export class CalendarCreateCompanyAndContactAfterSyncJob private readonly connectedAccountRepository: ConnectedAccountRepository, ) {} + @Process(CalendarCreateCompanyAndContactAfterSyncJob.name) async handle( data: CalendarCreateCompanyAndContactAfterSyncJobData, ): Promise { diff --git a/packages/twenty-server/src/modules/calendar/jobs/calendar-job.module.ts b/packages/twenty-server/src/modules/calendar/jobs/calendar-job.module.ts index 4888d98fc0f8..1c3abdaa0593 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/calendar-job.module.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/calendar-job.module.ts @@ -32,26 +32,11 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s GoogleCalendarSyncModule, ], providers: [ - { - provide: BlocklistItemDeleteCalendarEventsJob.name, - useClass: BlocklistItemDeleteCalendarEventsJob, - }, - { - provide: BlocklistReimportCalendarEventsJob.name, - useClass: BlocklistReimportCalendarEventsJob, - }, - { - provide: GoogleCalendarSyncJob.name, - useClass: GoogleCalendarSyncJob, - }, - { - provide: CalendarCreateCompanyAndContactAfterSyncJob.name, - useClass: CalendarCreateCompanyAndContactAfterSyncJob, - }, - { - provide: DeleteConnectedAccountAssociatedCalendarDataJob.name, - useClass: DeleteConnectedAccountAssociatedCalendarDataJob, - }, + BlocklistItemDeleteCalendarEventsJob, + BlocklistReimportCalendarEventsJob, + GoogleCalendarSyncJob, + CalendarCreateCompanyAndContactAfterSyncJob, + DeleteConnectedAccountAssociatedCalendarDataJob, ], }) export class CalendarJobModule {} diff --git a/packages/twenty-server/src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job.ts b/packages/twenty-server/src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job.ts index 2d41ada2fa43..bd8e5ab9e2c7 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job.ts @@ -1,7 +1,8 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { CalendarEventCleanerService } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.service'; export type DeleteConnectedAccountAssociatedCalendarDataJobData = { @@ -9,11 +10,8 @@ export type DeleteConnectedAccountAssociatedCalendarDataJobData = { connectedAccountId: string; }; -@Injectable() -export class DeleteConnectedAccountAssociatedCalendarDataJob - implements - MessageQueueJob -{ +@Processor(MessageQueue.calendarQueue) +export class DeleteConnectedAccountAssociatedCalendarDataJob { private readonly logger = new Logger( DeleteConnectedAccountAssociatedCalendarDataJob.name, ); @@ -22,6 +20,7 @@ export class DeleteConnectedAccountAssociatedCalendarDataJob private readonly calendarEventCleanerService: CalendarEventCleanerService, ) {} + @Process(DeleteConnectedAccountAssociatedCalendarDataJob.name) async handle( data: DeleteConnectedAccountAssociatedCalendarDataJobData, ): Promise { diff --git a/packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts b/packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts index dd7a3362d3a6..46240cca0762 100644 --- a/packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts +++ b/packages/twenty-server/src/modules/calendar/jobs/google-calendar-sync.job.ts @@ -1,19 +1,18 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; import { GoogleCalendarSyncService } from 'src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type GoogleCalendarSyncJobData = { workspaceId: string; connectedAccountId: string; }; -@Injectable() -export class GoogleCalendarSyncJob - implements MessageQueueJob -{ +@Processor(MessageQueue.calendarQueue) +export class GoogleCalendarSyncJob { private readonly logger = new Logger(GoogleCalendarSyncJob.name); constructor( @@ -21,6 +20,7 @@ export class GoogleCalendarSyncJob private readonly googleCalendarSyncService: GoogleCalendarSyncService, ) {} + @Process(GoogleCalendarSyncJob.name) async handle(data: GoogleCalendarSyncJobData): Promise { this.logger.log( `google calendar sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, diff --git a/packages/twenty-server/src/modules/calendar/listeners/calendar-blocklist.listener.ts b/packages/twenty-server/src/modules/calendar/listeners/calendar-blocklist.listener.ts index c1ed4f2d98eb..b34441c9a193 100644 --- a/packages/twenty-server/src/modules/calendar/listeners/calendar-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/calendar/listeners/calendar-blocklist.listener.ts @@ -1,9 +1,10 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { @@ -19,7 +20,7 @@ import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard @Injectable() export class CalendarBlocklistListener { constructor( - @Inject(MessageQueue.calendarQueue) + @InjectMessageQueue(MessageQueue.calendarQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/modules/calendar/listeners/calendar-channel.listener.ts b/packages/twenty-server/src/modules/calendar/listeners/calendar-channel.listener.ts index 3d1e28a557ab..9e43de7124b1 100644 --- a/packages/twenty-server/src/modules/calendar/listeners/calendar-channel.listener.ts +++ b/packages/twenty-server/src/modules/calendar/listeners/calendar-channel.listener.ts @@ -1,8 +1,9 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { objectRecordChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { @@ -14,7 +15,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/stan @Injectable() export class CalendarChannelListener { constructor( - @Inject(MessageQueue.calendarQueue) + @InjectMessageQueue(MessageQueue.calendarQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts index d0c8663d86d4..17681f17e350 100644 --- a/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts +++ b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts @@ -64,7 +64,7 @@ export class GoogleCalendarSyncService { private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly calendarEventCleanerService: CalendarEventCleanerService, private readonly calendarEventParticipantsService: CalendarEventParticipantService, - @InjectMessageQueue(MessageQueue.emailQueue) + @InjectMessageQueue(MessageQueue.contactCreationQueue) private readonly messageQueueService: MessageQueueService, private readonly eventEmitter: EventEmitter2, ) {} diff --git a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/auto-companies-and-contacts-creation-job.module.ts b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/auto-companies-and-contacts-creation-job.module.ts index f65e1965b7f0..63f90db2ee6e 100644 --- a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/auto-companies-and-contacts-creation-job.module.ts +++ b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/auto-companies-and-contacts-creation-job.module.ts @@ -5,11 +5,6 @@ import { CreateCompanyAndContactJob } from 'src/modules/connected-account/auto-c @Module({ imports: [AutoCompaniesAndContactsCreationModule], - providers: [ - { - provide: CreateCompanyAndContactJob.name, - useClass: CreateCompanyAndContactJob, - }, - ], + providers: [CreateCompanyAndContactJob], }) export class AutoCompaniesAndContactsCreationJobModule {} diff --git a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job.ts b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job.ts index 7c31e16e81ab..744dcf294c2b 100644 --- a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job.ts +++ b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job.ts @@ -1,7 +1,6 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; import { CreateCompanyAndContactService } from 'src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -15,14 +14,13 @@ export type CreateCompanyAndContactJobData = { }[]; }; -@Injectable() -export class CreateCompanyAndContactJob - implements MessageQueueJob -{ +@Processor(MessageQueue.contactCreationQueue) +export class CreateCompanyAndContactJob { constructor( private readonly createCompanyAndContactService: CreateCompanyAndContactService, ) {} + @Process(CreateCompanyAndContactJob.name) async handle(data: CreateCompanyAndContactJobData): Promise { const { workspaceId, connectedAccount, contactsToCreate } = data; diff --git a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/listeners/messaging-message-channel.listener.ts b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/listeners/messaging-message-channel.listener.ts index 6b3cde3987be..01a047653d80 100644 --- a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/listeners/messaging-message-channel.listener.ts +++ b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/listeners/messaging-message-channel.listener.ts @@ -1,8 +1,9 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { objectRecordChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -14,7 +15,7 @@ import { @Injectable() export class MessagingMessageChannelListener { constructor( - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index 3a52e42deb17..89b7200406af 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -1,7 +1,8 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity'; @@ -16,10 +17,8 @@ export type BlocklistItemDeleteMessagesJobData = { blocklistItemId: string; }; -@Injectable() -export class BlocklistItemDeleteMessagesJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class BlocklistItemDeleteMessagesJob { private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name); constructor( @@ -34,6 +33,7 @@ export class BlocklistItemDeleteMessagesJob private readonly threadCleanerService: MessagingMessageCleanerService, ) {} + @Process(BlocklistItemDeleteMessagesJob.name) async handle(data: BlocklistItemDeleteMessagesJobData): Promise { const { workspaceId, blocklistItemId } = data; diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index c852bb9edb41..73545c5262af 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -1,19 +1,20 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { BlocklistItemDeleteMessagesJobData, BlocklistItemDeleteMessagesJob, } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; -import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -21,7 +22,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/stan @Injectable() export class MessagingBlocklistListener { constructor( - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service.ts index 163c4427b7c4..9b288566a995 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Inject } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { EventEmitter2 } from '@nestjs/event-emitter'; @@ -25,13 +25,14 @@ import { } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; import { MessagingMessageService } from 'src/modules/messaging/common/services/messaging-message.service'; import { MessagingMessageParticipantService } from 'src/modules/messaging/common/services/messaging-message-participant.service'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; @Injectable() export class MessagingSaveMessagesAndEnqueueContactCreationService { constructor( private readonly workspaceDataSourceService: WorkspaceDataSourceService, - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.contactCreationQueue) private readonly messageQueueService: MessageQueueService, private readonly messageService: MessagingMessageService, private readonly messageParticipantService: MessagingMessageParticipantService, diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts index 5cba9355cb84..fbeb4f88a615 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts @@ -1,18 +1,17 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; export type MessagingConnectedAccountDeletionCleanupJobData = { workspaceId: string; connectedAccountId: string; }; -@Injectable() -export class MessagingConnectedAccountDeletionCleanupJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class MessagingConnectedAccountDeletionCleanupJob { private readonly logger = new Logger( MessagingConnectedAccountDeletionCleanupJob.name, ); @@ -21,6 +20,7 @@ export class MessagingConnectedAccountDeletionCleanupJob private readonly messageCleanerService: MessagingMessageCleanerService, ) {} + @Process(MessagingConnectedAccountDeletionCleanupJob.name) async handle( data: MessagingConnectedAccountDeletionCleanupJobData, ): Promise { diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts index 24d5deb2974c..c3d0149f7192 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/listeners/messaging-message-cleaner-connected-account.listener.ts @@ -1,4 +1,4 @@ -import { Inject } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; @@ -13,12 +13,14 @@ import { MessagingConnectedAccountDeletionCleanupJob, MessagingConnectedAccountDeletionCleanupJobData, } from 'src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +@Injectable() export class MessagingMessageCleanerConnectedAccountListener { constructor( - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @Inject(MessageQueue.calendarQueue) + @InjectMessageQueue(MessageQueue.calendarQueue) private readonly calendarQueueService: MessageQueueService, ) {} diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts index 4562bd75bad6..ef28ce0efb77 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts @@ -16,10 +16,7 @@ import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cl ], providers: [ MessagingMessageCleanerService, - { - provide: MessagingConnectedAccountDeletionCleanupJob.name, - useClass: MessagingConnectedAccountDeletionCleanupJob, - }, + MessagingConnectedAccountDeletionCleanupJob, MessagingMessageCleanerConnectedAccountListener, ], exports: [MessagingMessageCleanerService], diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command.ts index 585762d2cb98..d8e2253bd1d1 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command.ts @@ -1,7 +1,6 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job'; @@ -15,7 +14,7 @@ const MESSAGING_MESSAGE_LIST_FETCH_CRON_PATTERN = '*/5 * * * *'; }) export class MessagingMessageListFetchCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command.ts index aae9df3723b2..4e878d75f597 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command.ts @@ -1,7 +1,6 @@ -import { Inject } from '@nestjs/common'; - import { Command, CommandRunner } from 'nest-commander'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job'; @@ -12,7 +11,7 @@ import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-im }) export class MessagingMessagesImportCronCommand extends CommandRunner { constructor( - @Inject(MessageQueue.cronQueue) + @InjectMessageQueue(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, ) { super(); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 9a2cb54740df..9a3adfa50bf6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -1,10 +1,8 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -20,11 +18,12 @@ import { MessagingMessageListFetchJobData, MessagingMessageListFetchJob, } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -@Injectable() -export class MessagingMessageListFetchCronJob - implements MessageQueueJob -{ +@Processor(MessageQueue.cronQueue) +export class MessagingMessageListFetchCronJob { private readonly logger = new Logger(MessagingMessageListFetchCronJob.name); constructor( @@ -32,13 +31,14 @@ export class MessagingMessageListFetchCronJob private readonly workspaceRepository: Repository, @InjectRepository(DataSourceEntity, 'metadata') private readonly dataSourceRepository: Repository, - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) private readonly messageChannelRepository: MessageChannelRepository, private readonly environmentService: EnvironmentService, ) {} + @Process(MessagingMessageListFetchCronJob.name) async handle(): Promise { const workspaceIds = ( await this.workspaceRepository.find({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index 2faafc14a8c5..3c4352a92153 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -1,10 +1,8 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; @@ -14,6 +12,9 @@ import { MessagingMessagesImportJobData, MessagingMessagesImportJob, } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { @@ -21,22 +22,23 @@ import { MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -@Injectable() -export class MessagingMessagesImportCronJob - implements MessageQueueJob -{ +@Processor(MessageQueue.cronQueue) +export class MessagingMessagesImportCronJob { + private readonly logger = new Logger(MessagingMessagesImportCronJob.name); + constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @InjectRepository(DataSourceEntity, 'metadata') private readonly dataSourceRepository: Repository, private readonly environmentService: EnvironmentService, - @Inject(MessageQueue.messagingQueue) + @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) private readonly messageChannelRepository: MessageChannelRepository, ) {} + @Process(MessagingMessagesImportCronJob.name) async handle(): Promise { const workspaceIds = ( await this.workspaceRepository.find({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 4cd4b40e5651..a22126de9e01 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -1,7 +1,8 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; +import { Logger } from '@nestjs/common'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -20,10 +21,8 @@ export type MessagingMessageListFetchJobData = { workspaceId: string; }; -@Injectable() -export class MessagingMessageListFetchJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class MessagingMessageListFetchJob { private readonly logger = new Logger(MessagingMessageListFetchJob.name); constructor( @@ -36,6 +35,7 @@ export class MessagingMessageListFetchJob private readonly messagingTelemetryService: MessagingTelemetryService, ) {} + @Process(MessagingMessageListFetchJob.name) async handle(data: MessagingMessageListFetchJobData): Promise { const { messageChannelId, workspaceId } = data; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index 246f5f8b1f5e..f0a543d8070f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -1,7 +1,6 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -19,10 +18,8 @@ export type MessagingMessagesImportJobData = { workspaceId: string; }; -@Injectable() -export class MessagingMessagesImportJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class MessagingMessagesImportJob { constructor( @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, @@ -32,6 +29,7 @@ export class MessagingMessagesImportJob private readonly messagingTelemetryService: MessagingTelemetryService, ) {} + @Process(MessagingMessagesImportJob.name) async handle(data: MessagingMessagesImportJobData): Promise { const { messageChannelId, workspaceId } = data; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index b0b6362c0f35..494edba8fb6d 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -22,22 +22,10 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import providers: [ MessagingMessageListFetchCronCommand, MessagingMessagesImportCronCommand, - { - provide: MessagingMessageListFetchJob.name, - useClass: MessagingMessageListFetchJob, - }, - { - provide: MessagingMessagesImportJob.name, - useClass: MessagingMessagesImportJob, - }, - { - provide: MessagingMessageListFetchCronJob.name, - useClass: MessagingMessageListFetchCronJob, - }, - { - provide: MessagingMessagesImportCronJob.name, - useClass: MessagingMessagesImportCronJob, - }, + MessagingMessageListFetchJob, + MessagingMessagesImportJob, + MessagingMessageListFetchCronJob, + MessagingMessagesImportCronJob, ], exports: [], }) diff --git a/packages/twenty-server/src/modules/messaging/message-participants-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts b/packages/twenty-server/src/modules/messaging/message-participants-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts index 268fd74ccebf..2091db8690f7 100644 --- a/packages/twenty-server/src/modules/messaging/message-participants-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-participants-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts @@ -1,10 +1,8 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { FeatureFlagEntity, FeatureFlagKeys, @@ -15,18 +13,19 @@ import { MessageChannelRepository } from 'src/modules/messaging/common/repositor import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; export type MessagingCreateCompanyAndContactAfterSyncJobData = { workspaceId: string; messageChannelId: string; }; -@Injectable() -export class MessagingCreateCompanyAndContactAfterSyncJob - implements MessageQueueJob -{ +@Processor(MessageQueue.messagingQueue) +export class MessagingCreateCompanyAndContactAfterSyncJob { private readonly logger = new Logger( MessagingCreateCompanyAndContactAfterSyncJob.name, ); @@ -42,6 +41,7 @@ export class MessagingCreateCompanyAndContactAfterSyncJob private readonly connectedAccountRepository: ConnectedAccountRepository, ) {} + @Process(MessagingCreateCompanyAndContactAfterSyncJob.name) async handle( data: MessagingCreateCompanyAndContactAfterSyncJobData, ): Promise { diff --git a/packages/twenty-server/src/modules/messaging/message-participants-manager/messaging-participants-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-participants-manager/messaging-participants-manager.module.ts index 589d3eb47794..6d04a306ce15 100644 --- a/packages/twenty-server/src/modules/messaging/message-participants-manager/messaging-participants-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-participants-manager/messaging-participants-manager.module.ts @@ -25,10 +25,7 @@ import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-o TypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), ], providers: [ - { - provide: MessagingCreateCompanyAndContactAfterSyncJob.name, - useClass: MessagingCreateCompanyAndContactAfterSyncJob, - }, + MessagingCreateCompanyAndContactAfterSyncJob, MessageParticipantListener, ], }) diff --git a/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts b/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts index 32d8b7637005..edf4c3d9e488 100644 --- a/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts +++ b/packages/twenty-server/src/modules/timeline/jobs/create-audit-log-from-internal-event.ts @@ -1,18 +1,15 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; import { AuditLogWorkspaceEntity } from 'src/modules/timeline/standard-objects/audit-log.workspace-entity'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -@Injectable() -export class CreateAuditLogFromInternalEvent - implements MessageQueueJob -{ +@Processor(MessageQueue.entityEventsToDbQueue) +export class CreateAuditLogFromInternalEvent { constructor( @InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity) private readonly workspaceMemberService: WorkspaceMemberRepository, @@ -20,6 +17,7 @@ export class CreateAuditLogFromInternalEvent private readonly auditLogRepository: AuditLogRepository, ) {} + @Process(CreateAuditLogFromInternalEvent.name) async handle(data: ObjectRecordBaseEvent): Promise { let workspaceMemberId: string | null = null; diff --git a/packages/twenty-server/src/modules/timeline/jobs/timeline-job.module.ts b/packages/twenty-server/src/modules/timeline/jobs/timeline-job.module.ts index 3fd03709ce2b..416e81aed53f 100644 --- a/packages/twenty-server/src/modules/timeline/jobs/timeline-job.module.ts +++ b/packages/twenty-server/src/modules/timeline/jobs/timeline-job.module.ts @@ -16,14 +16,8 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta TimelineActivityModule, ], providers: [ - { - provide: CreateAuditLogFromInternalEvent.name, - useClass: CreateAuditLogFromInternalEvent, - }, - { - provide: UpsertTimelineActivityFromInternalEvent.name, - useClass: UpsertTimelineActivityFromInternalEvent, - }, + CreateAuditLogFromInternalEvent, + UpsertTimelineActivityFromInternalEvent, ], }) export class TimelineJobModule {} diff --git a/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts b/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts index 2f8490764963..5abcb2e10da6 100644 --- a/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts +++ b/packages/twenty-server/src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job.ts @@ -1,23 +1,21 @@ -import { Injectable } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { TimelineActivityService } from 'src/modules/timeline/services/timeline-activity.service'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -@Injectable() -export class UpsertTimelineActivityFromInternalEvent - implements MessageQueueJob -{ +@Processor(MessageQueue.entityEventsToDbQueue) +export class UpsertTimelineActivityFromInternalEvent { constructor( @InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity) private readonly workspaceMemberService: WorkspaceMemberRepository, private readonly timelineActivityService: TimelineActivityService, ) {} + @Process(UpsertTimelineActivityFromInternalEvent.name) async handle(data: ObjectRecordBaseEvent): Promise { if (data.userId) { const workspaceMember = await this.workspaceMemberService.getByIdOrFail( diff --git a/packages/twenty-server/src/queue-worker/queue-worker.module.ts b/packages/twenty-server/src/queue-worker/queue-worker.module.ts index d6c87b882383..1cf307ffefee 100644 --- a/packages/twenty-server/src/queue-worker/queue-worker.module.ts +++ b/packages/twenty-server/src/queue-worker/queue-worker.module.ts @@ -2,8 +2,17 @@ import { Module } from '@nestjs/common'; import { JobsModule } from 'src/engine/integrations/message-queue/jobs.module'; import { IntegrationsModule } from 'src/engine/integrations/integrations.module'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; +import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module'; @Module({ - imports: [IntegrationsModule, JobsModule], + imports: [ + TwentyORMModule.register({ + workspaceEntities: ['dist/src/**/*.workspace-entity{.ts,.js}'], + }), + IntegrationsModule, + MessageQueueModule.registerExplorer(), + JobsModule, + ], }) export class QueueWorkerModule {} diff --git a/packages/twenty-server/src/queue-worker/queue-worker.ts b/packages/twenty-server/src/queue-worker/queue-worker.ts index fd62872d47bd..0b2af24eccd6 100644 --- a/packages/twenty-server/src/queue-worker/queue-worker.ts +++ b/packages/twenty-server/src/queue-worker/queue-worker.ts @@ -1,17 +1,8 @@ import { NestFactory } from '@nestjs/core'; -import { - MessageQueueJob, - MessageQueueJobData, -} from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - import { shouldFilterException } from 'src/engine/utils/global-exception-handler.util'; import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; import { LoggerService } from 'src/engine/integrations/logger/logger.service'; -import { JobsModule } from 'src/engine/integrations/message-queue/jobs.module'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { getJobClassName } from 'src/engine/integrations/message-queue/utils/get-job-class-name.util'; import { QueueWorkerModule } from 'src/queue-worker/queue-worker.module'; async function bootstrap() { @@ -28,29 +19,6 @@ async function bootstrap() { // Inject our logger app.useLogger(loggerService!); - - for (const queueName of Object.values(MessageQueue)) { - const messageQueueService: MessageQueueService = app.get(queueName); - - await messageQueueService.work(async (jobData: MessageQueueJobData) => { - const jobClassName = getJobClassName(jobData.name); - const job: MessageQueueJob = app - .select(JobsModule) - .get(jobClassName, { strict: false }); - - try { - await job.handle(jobData.data); - } catch (err) { - exceptionHandlerService?.captureExceptions([ - new Error( - `Error occurred while processing job ${jobClassName} #${jobData.id}`, - ), - err, - ]); - throw err; - } - }); - } } catch (err) { loggerService?.error(err?.message, err?.name); diff --git a/yarn.lock b/yarn.lock index 58f27c922580..ce824dc233a5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -17382,6 +17382,15 @@ __metadata: languageName: node linkType: hard +"@types/lodash.omitby@npm:^4.6.9": + version: 4.6.9 + resolution: "@types/lodash.omitby@npm:4.6.9" + dependencies: + "@types/lodash": "npm:*" + checksum: e8850219326634c5b531e3398d24701000328e4366504d9315c1660c2fe2a0d4fc9aa2983b8c652ee7239921cd16103b37b1e44efdf25658de2a36f64b76888a + languageName: node + linkType: hard + "@types/lodash.pick@npm:^4.3.7": version: 4.4.9 resolution: "@types/lodash.pick@npm:4.4.9" @@ -34759,6 +34768,13 @@ __metadata: languageName: node linkType: hard +"lodash.omitby@npm:^4.6.0": + version: 4.6.0 + resolution: "lodash.omitby@npm:4.6.0" + checksum: 4608b1d8c4063b63349a3462852465fbe74781d737fbb26a0a7f00b0e65f6ccbc13fa490a38f9380103d93fc398e3873983038efadfafc67ccafbb25d9bc7bf4 + languageName: node + linkType: hard + "lodash.once@npm:^4.0.0": version: 4.1.1 resolution: "lodash.once@npm:4.1.1" @@ -47176,6 +47192,7 @@ __metadata: "@types/lodash.isequal": "npm:^4.5.8" "@types/lodash.isobject": "npm:^3.0.7" "@types/lodash.omit": "npm:^4.5.9" + "@types/lodash.omitby": "npm:^4.6.9" "@types/lodash.snakecase": "npm:^4.1.7" "@types/lodash.uniq": "npm:^4.5.9" "@types/lodash.uniqby": "npm:^4.7.9" @@ -47188,6 +47205,7 @@ __metadata: jsdom: "npm:~22.1.0" jwt-decode: "npm:^4.0.0" lodash.differencewith: "npm:^4.5.0" + lodash.omitby: "npm:^4.6.0" lodash.uniq: "npm:^4.5.0" lodash.uniqby: "npm:^4.7.0" passport: "npm:^0.7.0"